stream(2)自定义接口生产者、消费者
2021/7/28 13:05:50
本文主要是介绍stream(2)自定义接口生产者、消费者,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
生产者:
新定义一个接口MySource
package com.itmuch.content.rocketmq; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface MySource { String MY_OUTPUT = "my-output"; @Output(MY_OUTPUT) MessageChannel output(); }
在启动类上添加到EnableBinding上
@EnableBinding({Source.class, MySource.class})
在配置文件中
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: output: destination: stream-test-topic #用来指定topic my-output: destination: stream-my-topic #用来指定topic
注意:配置文件中的“my-output”是接口MySource中的MY_OUTPUT的引用,否则注册不上消息队列;
功能代码
@Autowired private MySource mySource; @GetMapping("/test-stream-2") public String testStream2(){ this.mySource.output().send( MessageBuilder.withPayload("消息体").build() ); return "success"; }
消费者:
新建接口MySink
package com.itmuch.usercenter.rocketmq; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface MySink { String MY_INPUT = "my-input"; @Input(MY_INPUT) SubscribableChannel input(); }
在启动类上添加到EnableBinding上
@EnableBinding({Sink.class, MySink.class})
在配置文件中
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: input: destination: stream-test-topic #用来指定topic group: test-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动 其他MQ:可留空 my-input: destination: stream-my-topic #用来指定topic group: my-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动 其他MQ:可留空
注意:配置文件中的“my-input”是接口MySink中的MY_INPUT的引用,否则获取不到消息队列上的信息;
package com.itmuch.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; @Slf4j @Service public class MyTestStreamConsumer { @StreamListener(MySink.MY_INPUT) public void recevice(String mess){ log.info("自定义接口消费:通过stream收到消息{}",mess); } }
当成功访问http://localhost:8010/test-stream-2 时,消费者控制台就会打印日志(意味着消费者已经从消息队列中获取信息并消费了)
消息过滤:https://www.imooc.com/article/290424
stream异常处理手记:https://www.imooc.com/article/290435
eg:
在消费者重定义一个监听器,监听发生的所有异常
package com.itmuch.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.messaging.support.ErrorMessage; import org.springframework.stereotype.Service; @Slf4j @Service public class MyTestStreamConsumer { @StreamListener(MySink.MY_INPUT) public void recevice(String mess){ log.info("自定义接口消费:通过stream收到消息{}",mess); throw new IllegalArgumentException("抛异常"); } /** * 全局异常处理 * @param message */ @StreamListener("errorChannel") public void error(Message<?> message) { ErrorMessage errorMessage = (ErrorMessage) message; // System.out.println("Handling ERROR: " + errorMessage); log.warn("反生异常:{}",errorMessage); } }
这篇关于stream(2)自定义接口生产者、消费者的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-20RabbitMQ教程:新手入门指南
- 2024-11-20Redis教程:新手入门指南
- 2024-11-20SaToken教程:新手入门指南
- 2024-11-20SpringBoot教程:从入门到实践
- 2024-11-20Java全栈教程:从入门到实战
- 2024-11-20Java微服务系统教程:入门与实践指南
- 2024-11-20Less教程:初学者快速上手指南
- 2024-11-20MyBatis教程:新手快速入门指南
- 2024-11-20QLExpress教程:初学者快速入门指南
- 2024-11-20订单系统教程:从入门到实践的全面指南