kafka07-spring整合kafka
2021/8/15 23:37:52
本文主要是介绍kafka07-spring整合kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
版本信息kafka 1.0.2
spring-kafka高版本兼容低版本
pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
KafkaProducerController
package com.lew.sp.controller; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; /** * @Author llewcg * @Description 生产者生产消息 */ @RestController public class KafkaProducerController { @Autowired KafkaTemplate<Integer, String> kafkaTemplate; @RequestMapping("/asyncSendMess/{msg}") public String asyncSendMess(@PathVariable String msg) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, msg)); try { SendResult<Integer, String> sendResult = future.get(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return "success"; } @RequestMapping("/noAsyncSendMess/{msg}") public String noAsyncSendMess(@PathVariable String msg) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, "gc_well_spring")); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable throwable) { System.out.println("发送失败"); } @Override public void onSuccess(SendResult<Integer, String> sendResult) { RecordMetadata recordMetadata = sendResult.getRecordMetadata(); System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset()); } }); return "success"; } }
CusConsumer
package com.lew.sp.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * @Author llewcg * @Description */ @Component public class CusConsumer { @KafkaListener(topics = "gc-spring-02") public void consumerMess(ConsumerRecord<Integer, String> consumerRecord){ Optional<ConsumerRecord<Integer, String>> consumerRecordOptional = Optional.ofNullable(consumerRecord); if(consumerRecordOptional.isPresent()){ System.out.println( consumerRecord.topic() + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.key() + "\t" + consumerRecord.value()); } } }
KafkaConfig
修改自动注入的配置
package com.lew.sp.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author lewcg * @Description 修改默认配置 */ @Configuration public class KafkaConfig { @Bean public NewTopic topic1() { return new NewTopic("ntp-1", 2, (short) 1); } @Bean public NewTopic topic2() { return new NewTopic("ntp-02", 3, (short) 1); } /* @Bean public KafkaAdmin newAdmin(){ Map<String, Object> config = new HashMap<>(); config.put("xxx","xxx"); return new KafkaAdmin(config); }*/ /* @Bean public KafkaTemplate<Integer, String> newTemplate(ProducerFactory<Integer, String> producerFactory){ Map<String, Object> config = new HashMap<>(); //覆盖原有设置 config.put("xxx","xxx"); return new KafkaTemplate<Integer, String>(producerFactory, config); }*/ }
演示效果
这篇关于kafka07-spring整合kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-05小米13T Pro系统合集:性能与摄影的极致融合,值得你升级的系统ROM
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求
- 2024-10-01使用 golang 将ETH账户的资产平均分散到其他账户
- 2024-10-01JWT用户校验课程:从入门到实践
- 2024-10-01Server Component课程入门指南