kafka在python中的使用及结束kafka消费者
2021/12/6 17:17:14
本文主要是介绍kafka在python中的使用及结束kafka消费者,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
先说下问题:
正常使用kafka消费者,接收消息时,会出现消息循环无法结束问题,增加参数 consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka。(例子如下)
consumer.py文件:
from kafka import KafkaProducer, KafkaConsumer import time class KafkaClient(object): topic = "topic" # 使用的kafka的topic client = "0.0.0.0:19823" # kafka所在的服务地址 group_id = "test_consumer_group" # kafka组信息 @staticmethod def log(log_str): t = time.strftime(r"%Y-%m-%d_%H:%M:%S", time.localtime()) print("[%s]%s" % (t, log_str)) def info_send(self, key, info_str): """key: 发送信息的key;info_str:要发送的信息内容""" producer = KafkaProducer(bootstrap_servers=[self.client]) producer.send(self.topic, key=key.encode("utf-8"), value=info_str.encode("utf-8")) # 批量提交可以使用 producer.flush() producer.close() def message_consumer(): # consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka consumer = KafkaConsumer(self.topic, group_id=self.group_id, bootstrap_servers=[self.client], consumer_timeout_ms=3000) for msg in consumer: # partition:消息所在的分区,offset:消息所在分区的位置,key:消息的key,value:消息的内容 print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
这篇关于kafka在python中的使用及结束kafka消费者的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-03用FastAPI掌握Python异步IO:轻松实现高并发网络请求处理
- 2025-01-02封装学习:Python面向对象编程基础教程
- 2024-12-28Python编程基础教程
- 2024-12-27Python编程入门指南
- 2024-12-27Python编程基础
- 2024-12-27Python编程基础教程
- 2024-12-27Python编程基础指南
- 2024-12-24Python编程入门指南
- 2024-12-24Python编程基础入门
- 2024-12-24Python编程基础:变量与数据类型