kafka java api
2021/6/2 12:22:38
本文主要是介绍kafka java api,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1、消费者package com.asiainfo.group.kafka.consumer; import java.io.FileReader; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; public class ConsumerDemo { private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/consumer/"; private static KafkaConsumer<String, String> consumer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"consumer.properties")); consumer = new KafkaConsumer<String,String>(p); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { List<String> topics = new ArrayList<String>(); topics.add("okk"); consumer.subscribe(topics); try { while(true){ ConsumerRecords<String, String> records = consumer.poll(3000); System.err.println("收到了"+records.count()+"条消息"); for (ConsumerRecord<String, String> record : records) { System.err.println("topic:"+record.topic()); System.err.println("partition:"+record.partition()); System.err.println("offset:"+record.offset()); System.err.println("key:"+record.key()); System.err.println("value:"+record.value()); } //同步提交:会一直尝试直至提交成功,会一直阻塞 //consumer.commitSync(); //异步提交:不会重试,原因是因为重试过程中可能有更大的偏移量提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> arg0, Exception arg1) { System.err.println("异步提交偏移量成功!"); } }); //同步提交结合异步提交 //如果一切正常,就用异步提交,即使此次提交不成功,下次提交总会成功的 //如果关闭消费者,就没有下一次了,则用同步提交,一直尝试到提交成功为止,类似下面的代码 /*try{ consumer.commitSync(); } finally{ consumer.close(); }*/ } } catch (Exception e) { e.printStackTrace(); } finally{ consumer.close(); } } }).start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //wakeup方法是唯一能够打断consumer.poll的方法,并使其抛出异常跳出while(true),然后进入finally关闭consumer consumer.wakeup(); } }
2、生产者
package com.asiainfo.group.kafka.producer; import java.io.FileReader; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class ProductDemo { private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/producer/"; private static KafkaProducer<String, String> producer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"producer.properties")); producer = new KafkaProducer<String,String>(p); } catch (Exception e) { e.printStackTrace(); } } /** * 发送并忘记(不关心是否到达) * @throws ExecutionException * @throws InterruptedException */ public void sendAndForget() throws Exception{ for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("okk", "key"+i, "sendAndForget"+i); producer.send(record); } producer.close(); } public void syncSend() throws Exception{ long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "syncSend"+i); RecordMetadata recordMetadata = producer.send(record).get(); System.err.println("同步发送成功!"); } System.err.println("同步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } public void asyncSend(){ long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "asyncSend"+i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata arg0, Exception arg1) { System.err.println("异步发送成功!"); } }); } System.err.println("异步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ new ProductDemo().sendAndForget(); //new ProductDemo().syncSend(); //new ProductDemo().asyncSend(); } }3、生产者配置文件
bootstrap.servers=192.168.0.108:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer4、消费者配置文件
bootstrap.servers=192.168.0.108:9092 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=groupByJava1 enable.auto.commit=false
这篇关于kafka java api的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-05小米13T Pro系统合集:性能与摄影的极致融合,值得你升级的系统ROM
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求
- 2024-10-01使用 golang 将ETH账户的资产平均分散到其他账户
- 2024-10-01JWT用户校验课程:从入门到实践
- 2024-10-01Server Component课程入门指南