kafka[低阶api & 高阶api & 新api区别|auto.offset.reset参数选择]_CodingPark编程公园
2021/4/13 20:29:12
本文主要是介绍kafka[低阶api & 高阶api & 新api区别|auto.offset.reset参数选择]_CodingPark编程公园,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章介绍
本文主要介绍了kafka低阶api & 高阶api & 新api区别以及auyo.offset.reset的参数如何使用
文章开头展示这张图的意义在于让读者清楚看到Broker |Partition |rep 三者关系
低阶api & 高阶api & 新api区别
低阶 API 的特点
优点
● 开发者自己控制offset,想从哪里读取就从哪里读取。
● 自行控制连接分区,对分区自定义进行负载均衡
● 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储offset 即可,比如存在文件或者内存中)
缺点
● 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等
package com.csdn.kafka.consumer;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import util.ZkUtil;
import java.nio.ByteBuffer;
import java.util.*;
/**
* Created by ag on 2020/5/9.
*/
public class SimpleConsumerAPI {
public static void main(String[] args)throws Exception {
String zkString = "192.168.1.115:2181,192.168.1.116:2181,192.168.1.117:2181";
String broker = "192.168.1.115";
int port = 9092;
int buffersize = 64*1024;
String clientId = "clientId";
String topic = "test";
long whichTime = kafka.api.OffsetRequest.EarliestTime();
int timeout = 6000;
ZooKeeper zk = new ZooKeeper(zkString, timeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent);
}
});
List<String> partitions = zk.getChildren("/brokers/topics/test/partitions", true);
System.out.println(partitions);
for(String p:partitions){
int partition = Integer.valueOf(p);
String leader = getLeader(timeout, broker, port, partition,
buffersize, clientId, topic);
byte[] data = ZkUtil.getData("/consumers/test/testgroup/" + partition);
long readOffset = Long.valueOf(new String(data).trim());
System.out.println(readOffset);
new Thread(new ReadDataTask(timeout, port, partition, buffersize,
clientId, topic, leader, readOffset,14110)).start();
}
}
private static void fetchData(int timeout, int port, int partition, int buffersize,
String clientId, String topic, String leader, long readOffset) {
SimpleConsumer simpleConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
kafka.api.FetchRequest request = new FetchRequestBuilder()
.addFetch(topic,partition,readOffset,100000)
.clientId(clientId)
.build();
FetchResponse fetch = simpleConsumer.fetch(request);
ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic, partition);
Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
while(iterator.hasNext()){
MessageAndOffset next = iterator.next();
long offset = next.offset();
long nextoffset = next.nextOffset();
Message message = next.message();
ByteBuffer payload = message.payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(partition
+"\t"+new String (bytes)+"\t"+offset+"\t"+nextoffset);
}
}
private static long getReadOffset(long whichTime,int timeout, int port,
int partition, int buffersize, String clientId, String topic, String leader) {
SimpleConsumer offsetConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo
= new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(whichTime,1);
requestInfo.put(topicAndPartition,partitionOffsetRequestInfo);
OffsetRequest offsetRequest =
new OffsetRequest(requestInfo,kafka.api.OffsetRequest.CurrentVersion(),clientId);
OffsetResponse offsetsBefore = offsetConsumer.getOffsetsBefore(offsetRequest);
long[] offsets = offsetsBefore.offsets(topic, partition);
return offsets[0];
}
private static String getLeader(int timeout, String broker, int port,
int partition, int buffersize, String clientId, String topic) {
String leader = "";
SimpleConsumer leaderConsumer = new SimpleConsumer(broker,port,timeout,buffersize,clientId);
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic));
TopicMetadataResponse topicMetadataResponse = leaderConsumer.send(topicMetadataRequest);
List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
for(TopicMetadata topicMetadata:topicMetadatas){
List<PartitionMetadata> partitionMetadatas = topicMetadata.partitionsMetadata();
for(PartitionMetadata partitionMetadata:partitionMetadatas){
if(partitionMetadata.partitionId() == partition){
leader = partitionMetadata.leader().host();
}
}
}
return leader;
}
}
class ReadDataTask implements Runnable{
int timeout;
int port;
int partition;
int buffersize;
String clientId;
String topic;
String leader;
long readOffset;
long stopOffset;
public ReadDataTask(int timeout, int port, int partition,
int buffersize, String clientId,
String topic, String leader, long readOffset,long stopOffset) {
this.timeout = timeout;
this.port = port;
this.partition = partition;
this.buffersize = buffersize;
this.clientId = clientId;
this.topic = topic;
this.leader = leader;
this.readOffset = readOffset;
this.stopOffset= stopOffset;
}
public void run() {
boolean flag = true;
int count = 0;
while(flag){
SimpleConsumer simpleConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
kafka.api.FetchRequest request = new FetchRequestBuilder()
.addFetch(topic,partition,readOffset,100000)
.clientId(clientId)
.build();
FetchResponse fetch = simpleConsumer.fetch(request);
ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic, partition);
Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
while(iterator.hasNext()){
count ++;
MessageAndOffset next = iterator.next();
long offset = next.offset();
if(offset>stopOffset){
flag = false;
break;
}
long nextoffset = next.nextOffset();
readOffset = nextoffset;
Message message = next.message();
ByteBuffer payload = message.payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(Thread.currentThread().getName()+"\t"+partition
+"\t"+new String (bytes)+"\t"+offset+"\t"+nextoffset);
}
try {
ZkUtil.setData("/consumers/test/testgroup/"+partition,readOffset+"");
} catch (Exception e) {
e.printStackTrace();
}
if(count ==0){
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+":sleep 1000ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count = 0;
}
}
}
线程与分区的关系
3个分区 2个线程 ——> 一个线程消费两个,另一个线程消费一个(线程一旦消费了某分区则不再中途变换)
3个分区 3个线程 ——> 每一个线程消费一个
3个分区 5个线程 ——> 5个线程都进来了,但是只有3个干活
高阶 API 的特点
优点
● 高级API写起来简单
● 不需要去自行去管理offset,系统通过zookeeper自行管理
● 不需要管理分区,副本等情况,系统自动管理
● 消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据(默认设置5s更新一下 zookeeper 中存的的offset),版本为0.10.2
● 可以使用group来区分对访问同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
缺点
● 不能自行控制 offset(对于某些特殊需求来说)
● 不能细化控制如分区、副本、zk 等
package com.csdn.kafka.consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
set /consumers/testgroup2/offsets/test/0 14000
set /consumers/testgroup2/offsets/test/1 14000
set /consumers/testgroup2/offsets/test/2 14000
*/
public class HighLevelConsumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.1.115:2181,192.168.1.116:2181,192.168.1.117:2181");
props.put("group.id", "last");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "5000");
props.put("auto.offset.reset", "largest");//largest,smallest
props.put("auto.commit.enable", "true");
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector javaConsumerConnector =
Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
String topic = "test";
topicCountMap.put(topic,3);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams =
javaConsumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> kafkaStreams = messageStreams.get(topic);
for(KafkaStream<byte[], byte[]> kafkaStream:kafkaStreams){
new Thread(new ReadDataHigh(kafkaStream)).start();
}
// Thread.sleep(12000);
// javaConsumerConnector.shutdown();
// System.out.println("------------------------------------------------------------");
}
}
class ReadDataHigh implements Runnable{
KafkaStream<byte[], byte[]> kafkaStream;
public ReadDataHigh(KafkaStream<byte[], byte[]> kafkaStream) {
this.kafkaStream = kafkaStream;
}
public void run() {
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
System.out.println(Thread.currentThread().getName()+"==============================");
while(iterator.hasNext()){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
MessageAndMetadata<byte[], byte[]> next = iterator.next();
int partition = next.partition();
long offset = next.offset();
byte[] message = next.message();
System.out.println(Thread.currentThread().getName()+"\t"
+partition+"\t"+offset+"\t"+new String(message));
}
}
}
package com.csdn.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class NewConsumer {
public static void main(String[] rgs) throws InterruptedException {
// autoOffset();
manualOffset();
// int partition = Math.abs("testasdf1".hashCode()) % 50 ;
// System.out.println(partition);
}
private static void manualOffset() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
props.put("group.id", "testasdf1");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");//默认是lastest; latest, earliest
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
String topic = "test";
// kafkaConsumer.subscribe(Collections.singletonList(topic));
TopicPartition topicPartiton0 = new TopicPartition(topic,0);
TopicPartition topicPartiton1 = new TopicPartition(topic,1);
TopicPartition topicPartiton2 = new TopicPartition(topic,2);
kafkaConsumer.assign(Arrays.asList(topicPartiton0,topicPartiton1,topicPartiton2));
kafkaConsumer.seek(topicPartiton0,21000);
kafkaConsumer.seek(topicPartiton1,21000);
kafkaConsumer.seek(topicPartiton2,21000);
int count = 100;
List<String> values = new ArrayList<String>();
while(true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for(ConsumerRecord<String,String> record:records){
int partition = record.partition();
long offset = record.offset();
String value = record.value();
values.add(value);
System.out.println(partition+"\t"+offset+"\t"+value);
}
if(values.size()>count){
kafkaConsumer.commitSync();
values.clear();
System.out.println("manual commit offset");
}
}
}
private static void autoOffset() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
props.put("group.id", "testasdfg");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");//默认是lastest; latest, earliest
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
String topic = "test";
kafkaConsumer.subscribe(Collections.singletonList(topic));
while(true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for(ConsumerRecord<String,String> record:records){
int partition = record.partition();
long offset = record.offset();
String value = record.value();
System.out.println(partition+"\t"+offset+"\t"+value);
}
}
}
}
相关资料
这篇关于kafka[低阶api & 高阶api & 新api区别|auto.offset.reset参数选择]_CodingPark编程公园的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享