用扩展的方式在 PHP 中使用 Kafka
2023/5/13 23:22:11
本文主要是介绍用扩展的方式在 PHP 中使用 Kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
前言:
由于之前在 PHP 中使用 Kafka 是通过 composer 包的方式,由于 nmred/kafka-php 很久没有维护,并且网上相关问题的文章也比较少。所以我这次换成 PHP 扩展 RdKafka 继续使用,主要介绍扩展安装和这种方式的基本操作。
安装:
- 下载
- 目录
由于 php-rdkafka 依赖 librdkafka,linux 就需要先安装 librdkafka 后安装 php-rdkafka,而 windows 版本是如下几个文件,安装方法如下:
(1). 将 librdkafka.dll 和 librdkafka.pdb 放入 PHP 安装的根目录下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 安装目录的 ext 下。
(2). php.ini 配置文件添加 extension=php_rdkafka.dll,最后重启 PHP。
(3). php-m 或这 phpinfo (); 就可以查看到扩展了。
通过 get_declared_classes() 也可以查看到扩展里预设的函数了。
使用:
- 生产
public function kafkaTest() { $rk = new \RdKafka\Producer(); $rk->addBrokers("127.0.0.1:9092"); $topic = $rk->newTopic("shop"); $ret = []; for ($i = 0; $i < 5; $i++) { $content = "第" . $i . "次发送失败"; $message = ["mobile" => "15623652142", "content" => $content]; $payload = json_encode($message); // 指定向0号partition生产数据 $ret[]['produce_res'] = $topic->produce(0, 0, $payload, "sms_$i"); // 随机选择partition //$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload); if ($rk->getOutQLen() > 0) { $ret[]['produce_poll'] = $rk->poll(500); } else { $ret[]['produce_poll'] = $rk->poll(0); } } dump($ret); }
- 消费(从指定的 partition 消费)
protected function execute(Input $input, Output $output) { $output->writeln("!!!hello kafka!!!"); $conf = new \RdKafka\Conf(); $conf->set('group.id', 'sms-consumer-group'); $rk = new \RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1:9092"); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', sys_get_temp_dir()); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("shop", $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while(true) { // 设置消费时的时间间隔,单位毫秒,以下表示5秒消费一个 $message = $topic->consume(0, 5000); if ($message) { echo "读取到消息\n\r"; // 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等 var_dump($message); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "读取消息成功:\n\r"; var_dump($message->payload); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "读取消息失败\n\r"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "请求超时\n\r"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } else { echo "未读取到消息\n\r"; } } $output->writeln("!!!the end!!!"); }
附加:
在执行消费过程中,发现 kafka 停止服务,抛出的异常:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。
解决方法:
删除 kafka-logs 下的所有日志,再重新启动 Kafaka, kafka-server-start.bat …\config\server.properties &
这篇关于用扩展的方式在 PHP 中使用 Kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-19php8的协程和hyperf的协程有什么区别?-icode9专业技术文章分享
- 2024-12-19php8 的fiber是什么?-icode9专业技术文章分享
- 2024-12-05怎么在php8,1 里面开启 debug?-icode9专业技术文章分享
- 2024-12-05怎么在php8,1 里面开启 debug?-icode9专业技术文章分享
- 2024-11-29使用PHP 将ETH账户的资产汇集到一个账户
- 2024-11-23怎么实现安卓+php 热更新方案?-icode9专业技术文章分享
- 2024-11-22PHP 中怎么实现判断多个值是否为空、null 或者为 false?-icode9专业技术文章分享
- 2024-11-11开源 PHP 商城项目 CRMEB 二次开发和部署教程
- 2024-11-09怎么使用php在kaufland平台刊登商品?-icode9专业技术文章分享
- 2024-11-05PHP的抽象类和接口是什么,有什么区别-icode9专业技术文章分享