Python简单使用kafka
2021/8/10 1:06:04
本文主要是介绍Python简单使用kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
在Windows上安装kafka可以参考: https://blog.csdn.net/weixin_38004638/article/details/91893910
简介
Kafka是分布式流处理系统(RabbitMQ仅仅只是消息队列),是一个分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式日志系统。kafka的吞吐量是很高的,至于为什么这么高可以参考这篇文章 https://zhuanlan.zhihu.com/p/183808742
这里是我使用kafka后产生的消息文件
test-0分区
话不多说,上代码
生产者
from kafka import KafkaProducer, TopicPartition import json producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for i in range(3): msg = {'asd':'azxc'} msg1 = "zxc" print(json.dumps(msg).encode()) producer.send(topic='test', value=msg1.encode(),partition=0,key='456'.encode()) producer.close()
消费者(简单使用)
from kafka import KafkaConsumer consumer = KafkaConsumer('test', bootstrap_servers=['127.0.0.1:9092']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv)
消费者(指定分区)
from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092']) # 指定消费的主体与分区 partition = TopicPartition('test', 0) consumer.assign([partition]) # 指定从分区的offset开始消费 consumer.seek(partition, 0) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv)
这篇关于Python简单使用kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-25Python编程基础:变量与类型
- 2024-11-25Python编程基础与实践
- 2024-11-24Python编程基础详解
- 2024-11-21Python编程基础教程
- 2024-11-20Python编程基础与实践
- 2024-11-20Python编程基础与高级应用
- 2024-11-19Python 基础编程教程
- 2024-11-19Python基础入门教程
- 2024-11-17在FastAPI项目中添加一个生产级别的数据库——本地环境搭建指南
- 2024-11-16`PyMuPDF4LLM`:提取PDF数据的神器