Node.js 操作kafka
2021/8/10 20:06:21
本文主要是介绍Node.js 操作kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Node.js 操作kafka
基础知识可参考:douzixiansheng/MQgithub.com/douzixiansheng/MQ/blob/master/kafka_basic.md
1.准备好kafka环境,没有安装的伙伴可以参考 (讲解了linux如何安装kafka):
douzixiansheng/MQgithub.com/douzixiansheng/MQ/blob/master/kafka_linux_install.md2. 安装依赖 kafka-node 模块,编写package.json
{ "name": "kafka", "private": false, "dependencies": { "kafka-node": "4.1.3" }, "devDependencies": { "mocha": ">=0.0.1" } }
3. 编写生产者与消费者 具体api可以参考官网:
kafka-nodewww.npmjs.com/package/kafka-node生产者:
/** * 生产者 */ const kafka = require('kafka-node'); let conn = {'kafkaHost':'127.0.0.1:9092'}; var MQ = function (){ this.mq_producers = {}; } MQ.prototype.AddProducer = function (conn, handler){ console.log('增加生产者',conn, this); let client = new kafka.KafkaClient(conn); let producer = new kafka.Producer(client); producer.on('ready', function(){ if(!!handler){ handler(producer); } }); producer.on('error', function(err){ console.error('producer error ',err.stack); }); this.mq_producers['common'] = producer; return producer; } console.log(MQ); var mq = new MQ(); mq.AddProducer(conn, function (producer){ producer.createTopics(['broadcast'], function (){ setInterval(function(){ mq.mq_producers['common'].send([{topic:['broadcast'], messages:[JSON.stringify({"cmd":"testRpc","value":"Hello World"})]}], function (){ console.log("..... "); }) }, 2000); }) });
消费者:
/** * 消费者 */ const kafka = require('kafka-node'); let conn = {'kafkaHost':'127.0.0.1:9092'}; let consumers = [ { 'type': 'consumer', 'options': {'autoCommit': true}, 'name':'common', 'topic':[ {'topic': 'broadcast', 'partition': 0} ] } ]; let MQ = function(){ } MQ.prototype.AddConsumer = function (conn, topics, options, handler){ let client = new kafka.KafkaClient(conn); let consumer = new kafka.Consumer(client, topics, options); if(!!handler){ consumer.on('message', handler); } consumer.on('error', function(err){ console.error('consumer error ',err.stack); }); } var mq = new MQ(); mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){ console.log(message.value); });
4.执行生产者脚本
root@FM:/home/MQ/Kafka# node producer.js [Function: MQ] 增加生产者 { kafkaHost: '127.0.0.1:9092' } MQ { mq_producers: {} } ..... ..... ..... ..... ..... ..... ..... ..... ..... ..... .....
执行消费者脚本
root@FM:/home/MQ/Kafka# node consumer.js {"cmd":"testRpc","value":"Hello World"} {"cmd":"testRpc","value":"Hello World"} {"cmd":"testRpc","value":"Hello World"} {"cmd":"testRpc","value":"Hello World"} {"cmd":"testRpc","value":"Hello World"} {"cmd":"testRpc","value":"Hello World"} {"cmd":"testRpc","value":"Hello World"} {"cmd":"testRpc","value":"Hello World"}
可以看到每隔两秒生产者往主题topic(broadcast) 上生产消息,消费者从主题上拉取消息
这篇关于Node.js 操作kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-30React Native常用组件-点击组件
- 2024-05-30uniapp+vue3+uv-ui手机端后台OA管理模板
- 2024-05-29Python网络爬虫的时候json=就是让你少写个json.dumps()
- 2024-05-27React Native常用组件-展示组件
- 2024-05-27React Native常用组件-列表组件
- 2024-05-09vue3开发前端表单缓存自定义指令,移动端h5必备插件
- 2024-05-09React Hooks在class组件中的使用方式
- 2024-03-30[OIDC in Action] 2. 基于OIDC(OpenID Connect)的SSO(纯JS客户端)
- 2024-03-29terraform jsonencode
- 2024-03-13vuex-persist