confluent-kafka demo
2021/8/4 6:06:48
本文主要是介绍confluent-kafka demo,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
安装:confluent-kafka
pip install confluent-kafka 我直接在PyCharm里面安装
启动zk, 启动kafka server
查看已有topic
./kafka-topics.sh --zookeeper localhost:2181 --list
创建topic test
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
控制台发送topic
sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
消费者, 自动commit消息
from time import sleep from confluent_kafka import Consumer, KafkaError mybroker = "127.0.0.1:9092" c = Consumer({ 'bootstrap.servers': mybroker, 'group.id': 'mygroup', 'client.id': 'gxf', 'enable.auto.commit': True, 'default.topic.config': { 'auto.offset.reset': 'earliest' } }) c.subscribe(['test']) while True: msg = c.poll(1.0) # print("msg:", msg) if msg is None: continue if msg.error(): print("msg error") if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(msg.error()) break print('Received message: {}'.format(msg.value().decode('utf-8'))) sleep(1) c.close()
这篇关于confluent-kafka demo的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-30uniAPP 实现全屏左右滚动滚动的效果-icode9专业技术文章分享
- 2024-06-30如何在本地使用授权或插件-icode9专业技术文章分享
- 2024-06-30伪静态规则配置方法汇总-icode9专业技术文章分享
- 2024-06-29易优CMS安装常见问题汇总-icode9专业技术文章分享
- 2024-06-28易优新手必读安装教程-icode9专业技术文章分享
- 2024-06-28忘记eyoucms后台密码怎么办?-icode9专业技术文章分享
- 2024-06-26终极指南:Scrum中如何设置需求优先级
- 2024-06-26AI大模型企业应用实战(25)-为Langchain Agent添加记忆功能
- 2024-06-26小白家庭 nas 搭建方案-icode9专业技术文章分享
- 2024-06-23AI大模型企业应用实战(14)-langchain的Embedding