Java代码测试Kafka集群收发消息
2021/4/15 20:25:30
本文主要是介绍Java代码测试Kafka集群收发消息,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
consumer:
package cn.miaoying.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class TestConsumer { private final ConsumerConnector consumer; private TestConsumer() { Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "20.21.1.xxx:2182,20.21.1.xxx:2183,20.21.1.xxx:2184"); // group 代表一个消费组 props.put("group.id", "jd-group"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map topicCountMap = new HashMap(); //topicCountMap.put(KafkaProducerDemo.TOPIC, new Integer(1)); topicCountMap.put("test_miaoying", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream stream = consumerMap.get("test_miaoying").get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } public static void main(String[] args) { new TestConsumer().consume(); } }
provider:
package cn.miaoying.consumer; import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProvider { public static void main(String[] args) { long events = Long.parseLong("1"); Properties properties = new Properties(); properties.put("metadata.broker.list", "20.21.1.xxx:9093,20.21.1.xxx:9091,20.21.1.xxx:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(properties); Producer producer = new Producer(config); for (int i = 0; i < 5; i++) { long runtime = new Date().getTime(); String ip = "127.0.0.1"; String msg = "test~test~test"; KeyedMessage keyedMessage = new KeyedMessage("test_miaoying", ip, msg); System.out.println(events + "---" + runtime); producer.send(keyedMessage); } producer.close(); } }
这篇关于Java代码测试Kafka集群收发消息的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-05做软件测试需要懂代码吗?
- 2024-06-0514-ShardingSphere的分布式主键实现
- 2024-06-03为什么以及如何要进行架构设计权衡?
- 2024-05-31全网首发第二弹!软考2024年5月《软件设计师》真题+解析+答案!(11-20题)
- 2024-05-31全网首发!软考2024年5月《软件设计师》真题+解析+答案!(21-30题)
- 2024-05-30【Java】百万数据excel导出功能如何实现
- 2024-05-30我们小公司,哪像华为一样,用得上IPD(集成产品开发)?
- 2024-05-30java excel上传--poi
- 2024-05-30安装笔记本应用商店的pycharm,再安排pandas等模块,说是没有打包工具?
- 2024-05-29java11新特性