kafka

2022/8/16 23:23:40

本文主要是介绍kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

 

另外找一个zk,有客户端命令的
.\zkCli.cmd -server 127.0.0.1:2181
ls /brokers 查看注册信息

1,kafka不支持分布式事务消息 不支持消费失败重试
2,kafka的单机TPS能跑到每秒上百万,是因为Producer端将多个小消息合并,批量发向broker
3,RocketMQ写入性能上不如kafka, 主要因为kafka主要应用于日志场景,而RocketMQ应用于业务场景,为了保证消息必达牺牲了性能,且基于线上真实场景没有在RocketMQ层做消息合并,推荐在业务层自己做。
4,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除
https://blog.csdn.net/pengweismile/article/details/117636252

https://kafka.apache.org/quickstart
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-topics.bat --create --topic topic-xmh --bootstrap-server localhost:9092
.\bin\windows\kafka-topics.bat --describe --topic topic-xmh --bootstrap-server localhost:9092

.\bin\windows\kafka-console-consumer.bat --topic topic-xmh --from-beginning --bootstrap-server localhost:9092 
.\bin\windows\kafka-console-producer.bat --topic topic-xmh --bootstrap-server localhost:9092


https://blog.csdn.net/syc0616/article/details/118156641
producer配置
bootstrap.servers: kafka的地址。
acks:消息的确认机制,默认值是0。
acks=0:如果设置为0,生产者不会等待kafka的响应。
acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送。
batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
value.serializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。

public class ProMy {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 1);//16384
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        KafkaProducer producer = new KafkaProducer(props);
        producer.send(new ProducerRecord<String, String>("topic-xmh","xingkey2","xingvalues5"));
        producer.close();
        System.out.println("*************end-procuder");
    }
}

consumer配置
bootstrap.servers: kafka的地址。
group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。
enable.auto.commit:是否自动提交,默认为true。
auto.commit.interval.ms: 从poll(拉)的回话处理时长。
session.timeout.ms:超时时间。
max.poll.records:一次最大拉取的条数。
auto.offset.reset:消费规则,默认earliest 。
earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
        props.put("group.id", "group_x3");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
//        props.put("max.poll.records", 1);
        props.put("auto.offset.reset", "earliest");
//        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("topic-xmh"));
        //for(;;) {
            ConsumerRecords<String, String> msgList = consumer.poll(1000);
//            System.out.println("consumer*******:" + msgList);
            for (ConsumerRecord<String, String> record:msgList){
                System.out.println("**************consumer*******record1:"+record+","+record.key()+","+record.value());
            }
        //}

//        consumer.close();
    }

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>1.0.0</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
</dependency>

  

 



这篇关于kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程