PHP使用Kafka
2022/2/10 12:12:56
本文主要是介绍PHP使用Kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
PHP使用Kafka |
---|
1.前言 |
中文文档地址:https://kafka.apachecn.org/ |
1.1定义 |
Apache Kafka® 是 一个分布式流处理平台 |
1.2基本原理 |
1.3名词解释 |
名词 |
Broker |
Topic |
Partition |
Producer |
Consumer |
ConsumerGroup |
2.安装Java环境 |
# 下载jdk1.8 |
https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html#license-lightbox |
# 解压 |
tar -zxvf jdk-8u281-linux-x64.tar.gz |
# 配置环境变量 |
vim /etc/profile |
# 增加以下配置 |
JAVA_HOME=/usr/local/java/jdk1.8.0_281 |
CLASSPATH=$JAVA_HOME/lib/ |
PATH=$PATH:$JAVA_HOME/bin |
export PATH JAVA_HOME CLASSPATH |
# 重载 |
source /etc/profile |
3.安装Kafka |
# 下载源码 |
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.12-2.7.0.tgz |
# 解压 |
tar -zxvf kafka_2.12-2.7.0.tgz |
#启动 |
# 需先启动zookeeper |
# -daemon 可启动后台守护模式 |
# 如果你已经启动了zookeeper 就不用启动下面这 (kafka默认提供有zookeeper) |
bin/zookeeper-server-start.sh config/zookeeper.properties |
# 启动Kafka服务端 |
bin/kafka-server-start.sh config/server.properties |
# 启动kafka客户端测试 |
# 创建一个话题,test话题2个分区 |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test |
Created topic "test". |
# 显示所有话题 |
bin/kafka-topics.sh --list --zookeeper localhost:2181 |
test |
# 显示话题信息 |
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test |
Topic:test PartitionCount:2 ReplicationFactor:1 Configs: |
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 |
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 |
# 启动一个生产者(输入消息) |
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test |
[等待输入自己的内容 出现>输入即可] |
>i am a new msg ! |
>i am a good msg ? |
# 启动一个消费者(等待消息) |
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果 |
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
[等待消息] |
i am a new msg ! |
i am a good msg ? |
4.安装RdKafka扩展 |
# 下载librdkafka |
git clone https://codechina.csdn.net/mirrors/edenhill/librdkafka.git |
# 编译安装 |
cd librdkafka/ |
./configure |
make && make install |
# 下载RdKafka扩展 |
https://pecl.php.net/get/rdkafka-5.0.0.tgz |
tar -zxvf rdkafka-5.0.0.tgz |
cd rdkafka-5.0.0.tgz |
# 编译安装 |
/usr/local/php/bin/phpize |
./configure --with-config=/usr/local/php/bin/php-config |
make && make install |
# php.ini 追加扩展,重启php-fpm |
extension=rdkafka.so |
systemctl restart php-fpm |
# 验证 |
php -m |
5.使用Kafka |
5.1生产(Producer) |
$config = new \RdKafka\Conf(); |
# 设置broker |
$config->set('metadata.broker.list', $this->brokerList); |
$producer = new \RdKafka\Producer($config); |
# 设置topic |
$topic = $producer->newTopic($topic); |
$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); |
$producer->poll(0); |
$result = $producer->flush(10000); |
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { |
return false; |
} |
return true; |
5.2消费(Consumer) |
$conf = new \RdKafka\Conf(); |
$conf->set('group.id', $this->groupName); |
$conf->set('metadata.broker.list', $this->brokerList); |
$conf->set('auto.offset.reset', 'earliest'); |
$consumer = new \RdKafka\KafkaConsumer($conf); |
$consumer->subscribe([$this->topicName]); |
while (true) { |
$message = $topic->consume(0, 120*10000); |
if ($message->err != RD_KAFKA_RESP_ERR_NO_ERROR) { |
print("err: " . $message->err); |
print("errstr: " . $message->errstr()); |
} else { |
var_dump($message->payload); |
} |
} |
5.3项目中使用 |
# rocket-customer |
application/command/KafkaConsumerByCall.php |
application/command/KafkaConsumerByCustomer.php |
application/command/KafkaConsumerByMemberCall.php |
application/command/KafkaConsumerByProfile.php |
这篇关于PHP使用Kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-19php8的协程和hyperf的协程有什么区别?-icode9专业技术文章分享
- 2024-12-19php8 的fiber是什么?-icode9专业技术文章分享
- 2024-12-05怎么在php8,1 里面开启 debug?-icode9专业技术文章分享
- 2024-12-05怎么在php8,1 里面开启 debug?-icode9专业技术文章分享
- 2024-11-29使用PHP 将ETH账户的资产汇集到一个账户
- 2024-11-23怎么实现安卓+php 热更新方案?-icode9专业技术文章分享
- 2024-11-22PHP 中怎么实现判断多个值是否为空、null 或者为 false?-icode9专业技术文章分享
- 2024-11-11开源 PHP 商城项目 CRMEB 二次开发和部署教程
- 2024-11-09怎么使用php在kaufland平台刊登商品?-icode9专业技术文章分享
- 2024-11-05PHP的抽象类和接口是什么,有什么区别-icode9专业技术文章分享