Kafka Java API+自定义分区
2021/6/2 20:22:44
本文主要是介绍Kafka Java API+自定义分区,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
kafka的API第一步:导入kafka的开发jar包
Kafka生产者
@Test
public void kafkaProducer() throws Exception {
//1、准备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、创建KafkaProducer
KafkaProducer
for (int i=0;i<100;i++){
//3、发送数据
kafkaProducer.send(new ProducerRecord
}
kafkaProducer.close();
}
Kafka消费者
@Test
public void kafkaConsum() throws Exception {
// 1、准备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2、创建KafkaConsumer
KafkaConsumer
// 3、订阅数据,这里的topic可以是多个
kafkaConsumer.subscribe(Arrays.asList("yun01"));
// 4、获取数据
while (true) {
ConsumerRecords
for (ConsumerRecord
System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
}
}
}
kafka的自定义分区
第一种方式:直接指定分区
kafkaProducer.send(new ProducerRecord
第二种自定义分区
public class KafkaCustomPartitioner implements Partitioner {
@Override
public void configure(Map
}
@Override
public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
List
int partitionNum = partitions.size();
Random random = new Random();
int partition = random.nextInt(partitionNum);
return partition;
}
@Override
public void close() {
}
}
主代码中添加配置
@Test
public void kafkaProducer() throws Exception {
//1、准备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("partitioner.class", "com.gec.kafkaclient.MyCustomerPartitons");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、创建KafkaProducer
KafkaProducer
for (int i=0;i<100;i++){
//3、发送数据
kafkaProducer.send(new ProducerRecord
}
kafkaProducer.close();
}
这篇关于Kafka Java API+自定义分区的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-05小米13T Pro系统合集:性能与摄影的极致融合,值得你升级的系统ROM
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求
- 2024-10-01使用 golang 将ETH账户的资产平均分散到其他账户
- 2024-10-01JWT用户校验课程:从入门到实践
- 2024-10-01Server Component课程入门指南