kafka[低阶api & 高阶api & 新api区别|auto.offset.reset参数选择]_CodingPark编程公园

2021/4/13 20:29:12

本文主要是介绍kafka[低阶api & 高阶api & 新api区别|auto.offset.reset参数选择]_CodingPark编程公园,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!


文章介绍
本文主要介绍了kafka低阶api & 高阶api & 新api区别以及auyo.offset.reset的参数如何使用
文章开头展示这张图的意义在于让读者清楚看到Broker |Partition |rep 三者关系


低阶api & 高阶api & 新api区别

低阶 API 的特点

优点
● 开发者自己控制offset,想从哪里读取就从哪里读取。
● 自行控制连接分区,对分区自定义进行负载均衡
● 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储offset 即可,比如存在文件或者内存中)

缺点
● 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等

package com.csdn.kafka.consumer;

import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import util.ZkUtil;

import java.nio.ByteBuffer;
import java.util.*;

/**
 * Created by ag on 2020/5/9.
 */
public class SimpleConsumerAPI {


    public static void main(String[] args)throws Exception {

        String zkString = "192.168.1.115:2181,192.168.1.116:2181,192.168.1.117:2181";
        String broker = "192.168.1.115";

        int port = 9092;
        int buffersize = 64*1024;
        String clientId = "clientId";
        String topic = "test";

        long whichTime = kafka.api.OffsetRequest.EarliestTime();

        int timeout = 6000;
        ZooKeeper zk = new ZooKeeper(zkString, timeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent);
            }
        });

        List<String> partitions = zk.getChildren("/brokers/topics/test/partitions", true);
        System.out.println(partitions);

        for(String p:partitions){
            int partition = Integer.valueOf(p);
            String leader = getLeader(timeout, broker, port, partition,
                    buffersize, clientId, topic);

            byte[] data = ZkUtil.getData("/consumers/test/testgroup/" + partition);
            long readOffset = Long.valueOf(new String(data).trim());
            System.out.println(readOffset);

            new Thread(new ReadDataTask(timeout, port, partition, buffersize,
                    clientId, topic, leader, readOffset,14110)).start();

        }

    }

    private static void fetchData(int timeout, int port, int partition, int buffersize,
                                  String clientId, String topic, String leader, long readOffset) {
        SimpleConsumer simpleConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
        kafka.api.FetchRequest request = new FetchRequestBuilder()
                .addFetch(topic,partition,readOffset,100000)
                .clientId(clientId)
                .build();
        FetchResponse fetch = simpleConsumer.fetch(request);
        ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic, partition);
        Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
        while(iterator.hasNext()){
            MessageAndOffset next = iterator.next();
            long offset = next.offset();
            long nextoffset = next.nextOffset();
            Message message = next.message();
            ByteBuffer payload = message.payload();
            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(partition
                    +"\t"+new String (bytes)+"\t"+offset+"\t"+nextoffset);
        }
    }


    private static long getReadOffset(long whichTime,int timeout, int port,
                                      int partition, int buffersize, String clientId, String topic, String leader) {
        SimpleConsumer offsetConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo
                = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

        TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);
        PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(whichTime,1);

        requestInfo.put(topicAndPartition,partitionOffsetRequestInfo);

        OffsetRequest offsetRequest =
                new OffsetRequest(requestInfo,kafka.api.OffsetRequest.CurrentVersion(),clientId);
        OffsetResponse offsetsBefore = offsetConsumer.getOffsetsBefore(offsetRequest);
        long[] offsets = offsetsBefore.offsets(topic, partition);
        return offsets[0];
    }

    private static String getLeader(int timeout, String broker, int port,
                                    int partition, int buffersize, String clientId, String topic) {
        String leader = "";
        SimpleConsumer leaderConsumer = new SimpleConsumer(broker,port,timeout,buffersize,clientId);
        TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic));
        TopicMetadataResponse topicMetadataResponse = leaderConsumer.send(topicMetadataRequest);
        List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
        for(TopicMetadata topicMetadata:topicMetadatas){
            List<PartitionMetadata> partitionMetadatas = topicMetadata.partitionsMetadata();
            for(PartitionMetadata partitionMetadata:partitionMetadatas){
                if(partitionMetadata.partitionId() == partition){
                    leader =  partitionMetadata.leader().host();
                }
            }
        }
        return leader;
    }
}


class ReadDataTask implements Runnable{

    int timeout;
    int port;
    int partition;
    int buffersize;
    String clientId;
    String topic;
    String leader;
    long readOffset;
    long stopOffset;

    public ReadDataTask(int timeout, int port, int partition,
                        int buffersize, String clientId,
                        String topic, String leader, long readOffset,long stopOffset) {
        this.timeout = timeout;
        this.port = port;
        this.partition = partition;
        this.buffersize = buffersize;
        this.clientId = clientId;
        this.topic = topic;
        this.leader = leader;
        this.readOffset = readOffset;
        this.stopOffset= stopOffset;
    }

    public void run() {
        boolean flag =  true;
        int count = 0;
        while(flag){
            SimpleConsumer simpleConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
            kafka.api.FetchRequest request = new FetchRequestBuilder()
                    .addFetch(topic,partition,readOffset,100000)
                    .clientId(clientId)
                    .build();
            FetchResponse fetch = simpleConsumer.fetch(request);
            ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic, partition);
            Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
            while(iterator.hasNext()){
                count ++;
                MessageAndOffset next = iterator.next();
                long offset = next.offset();
                if(offset>stopOffset){
                    flag = false;
                    break;
                }
                long nextoffset = next.nextOffset();
                readOffset = nextoffset;
                Message message = next.message();
                ByteBuffer payload = message.payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(Thread.currentThread().getName()+"\t"+partition
                        +"\t"+new String (bytes)+"\t"+offset+"\t"+nextoffset);
            }

            try {
                ZkUtil.setData("/consumers/test/testgroup/"+partition,readOffset+"");
            } catch (Exception e) {
                e.printStackTrace();
            }


            if(count ==0){
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName()+":sleep 1000ms");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            count = 0;
        }

    }
}


 

线程与分区的关系
3个分区 2个线程 ——> 一个线程消费两个,另一个线程消费一个(线程一旦消费了某分区则不再中途变换)
3个分区 3个线程 ——> 每一个线程消费一个
3个分区 5个线程 ——> 5个线程都进来了,但是只有3个干活

高阶 API 的特点

优点
● 高级API写起来简单
● 不需要去自行去管理offset,系统通过zookeeper自行管理
● 不需要管理分区,副本等情况,系统自动管理
● 消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据(默认设置5s更新一下 zookeeper 中存的的offset),版本为0.10.2
● 可以使用group来区分对访问同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

缺点
● 不能自行控制 offset(对于某些特殊需求来说)
● 不能细化控制如分区、副本、zk 等

package com.csdn.kafka.consumer;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 set /consumers/testgroup2/offsets/test/0 14000
 set /consumers/testgroup2/offsets/test/1 14000
 set /consumers/testgroup2/offsets/test/2 14000
 */
public class HighLevelConsumer {


    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.1.115:2181,192.168.1.116:2181,192.168.1.117:2181");
        props.put("group.id", "last");
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "5000");
        props.put("auto.offset.reset", "largest");//largest,smallest
        props.put("auto.commit.enable", "true");

        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector javaConsumerConnector =
                Consumer.createJavaConsumerConnector(config);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        String topic = "test";
        topicCountMap.put(topic,3);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams =
                javaConsumerConnector.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> kafkaStreams = messageStreams.get(topic);
        for(KafkaStream<byte[], byte[]> kafkaStream:kafkaStreams){
            new Thread(new ReadDataHigh(kafkaStream)).start();
        }

//        Thread.sleep(12000);
//        javaConsumerConnector.shutdown();
//        System.out.println("------------------------------------------------------------");


    }
}


class ReadDataHigh implements Runnable{

    KafkaStream<byte[], byte[]> kafkaStream;

    public ReadDataHigh(KafkaStream<byte[], byte[]> kafkaStream) {
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
        System.out.println(Thread.currentThread().getName()+"==============================");
        while(iterator.hasNext()){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            MessageAndMetadata<byte[], byte[]> next = iterator.next();
            int partition = next.partition();
            long offset = next.offset();
            byte[] message = next.message();
            System.out.println(Thread.currentThread().getName()+"\t"
                    +partition+"\t"+offset+"\t"+new String(message));
        }
    }
}


 

package com.csdn.kafka.consumer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class NewConsumer {

    public static void main(String[] rgs) throws InterruptedException {

//        autoOffset();
        manualOffset();

//        int partition = Math.abs("testasdf1".hashCode()) % 50 ;
//        System.out.println(partition);


    }

    private static void manualOffset() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
        props.put("group.id", "testasdf1");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");//默认是lastest; latest, earliest
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        String topic = "test";
//        kafkaConsumer.subscribe(Collections.singletonList(topic));

        TopicPartition topicPartiton0 = new TopicPartition(topic,0);
        TopicPartition topicPartiton1 = new TopicPartition(topic,1);
        TopicPartition topicPartiton2 = new TopicPartition(topic,2);
        kafkaConsumer.assign(Arrays.asList(topicPartiton0,topicPartiton1,topicPartiton2));
        kafkaConsumer.seek(topicPartiton0,21000);
        kafkaConsumer.seek(topicPartiton1,21000);
        kafkaConsumer.seek(topicPartiton2,21000);

        int count = 100;
        List<String> values = new ArrayList<String>();
        while(true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> record:records){
                int partition = record.partition();
                long offset = record.offset();
                String value = record.value();
                values.add(value);
                System.out.println(partition+"\t"+offset+"\t"+value);
            }

            if(values.size()>count){
                kafkaConsumer.commitSync();
                values.clear();
                System.out.println("manual commit offset");
            }
        }
    }

    private static void autoOffset() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
        props.put("group.id", "testasdfg");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");//默认是lastest; latest, earliest
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        String topic = "test";
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        while(true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> record:records){
                int partition = record.partition();
                long offset = record.offset();
                String value = record.value();
                System.out.println(partition+"\t"+offset+"\t"+value);
            }
        }
    }


}


相关资料



这篇关于kafka[低阶api & 高阶api & 新api区别|auto.offset.reset参数选择]_CodingPark编程公园的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程