PHP使用Kafka

2022/2/10 12:12:56

本文主要是介绍PHP使用Kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

PHP使用Kafka
1.前言
中文文档地址:https://kafka.apachecn.org/
1.1定义
Apache Kafka® 是 一个分布式流处理平台
1.2基本原理
1.3名词解释
名词
Broker
Topic
Partition
Producer
Consumer
ConsumerGroup
2.安装Java环境
# 下载jdk1.8
https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html#license-lightbox
# 解压
tar -zxvf jdk-8u281-linux-x64.tar.gz
# 配置环境变量
vim /etc/profile
# 增加以下配置
JAVA_HOME=/usr/local/java/jdk1.8.0_281
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
# 重载
source /etc/profile
3.安装Kafka
# 下载源码
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.12-2.7.0.tgz
# 解压
tar -zxvf kafka_2.12-2.7.0.tgz
#启动
# 需先启动zookeeper
# -daemon 可启动后台守护模式
# 如果你已经启动了zookeeper 就不用启动下面这 (kafka默认提供有zookeeper)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动Kafka服务端
bin/kafka-server-start.sh config/server.properties
# 启动kafka客户端测试
# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".
# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?
# 启动一个消费者(等待消息)
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?
4.安装RdKafka扩展
# 下载librdkafka
git clone https://codechina.csdn.net/mirrors/edenhill/librdkafka.git
# 编译安装
cd librdkafka/
./configure
make && make install
# 下载RdKafka扩展
https://pecl.php.net/get/rdkafka-5.0.0.tgz
tar -zxvf rdkafka-5.0.0.tgz
cd rdkafka-5.0.0.tgz
# 编译安装
/usr/local/php/bin/phpize
./configure --with-config=/usr/local/php/bin/php-config
make && make install
# php.ini 追加扩展,重启php-fpm
extension=rdkafka.so
systemctl restart php-fpm
# 验证
php -m
5.使用Kafka
5.1生产(Producer)
$config = new \RdKafka\Conf();
# 设置broker
$config->set('metadata.broker.list', $this->brokerList);
$producer = new \RdKafka\Producer($config);
# 设置topic
$topic = $producer->newTopic($topic);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
$producer->poll(0);
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
return false;
}
return true;
5.2消费(Consumer)
$conf = new \RdKafka\Conf();
$conf->set('group.id', $this->groupName);
$conf->set('metadata.broker.list', $this->brokerList);
$conf->set('auto.offset.reset', 'earliest');
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$this->topicName]);
while (true) {
$message = $topic->consume(0, 120*10000);
if ($message->err != RD_KAFKA_RESP_ERR_NO_ERROR) {
print("err: " . $message->err);
print("errstr: " . $message->errstr());
} else {
var_dump($message->payload);
}
}
5.3项目中使用
# rocket-customer
application/command/KafkaConsumerByCall.php
application/command/KafkaConsumerByCustomer.php
application/command/KafkaConsumerByMemberCall.php
application/command/KafkaConsumerByProfile.php


这篇关于PHP使用Kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程