rocketmq-spring的consumer设置消费失败最大重试次数
2021/6/18 23:28:51
本文主要是介绍rocketmq-spring的consumer设置消费失败最大重试次数,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
说明
rocketmq-spring的consumer的相关属性配置有两种方式:
- 在配置文件中进行配置
- 类上使用@RocketMQMessageListener注解配置相关属性
关于注解中的属性可以查看:org.apache.rocketmq.spring.annotation.RocketMQMessageListener,而在文件中可以配置的属性只有如下几个(并不遵守spring boot自动配置规范,所以在idea中不会有相关提示)
说明如下:
配置项 | 说明 |
---|---|
rocketmq.name-server | rocketmq的name server地址,格式:`主机:端口;主机:端口`,多个地址以英文分号分隔 |
rocketmq.consumer.secret-key | ACL的secret-key属性 |
rocketmq.consumer.access-key | ACL的access-key属性 |
rocketmq.consumer.customized-trace-topic | 自定义消费轨迹topic,不使用忽略 |
rocketmq.access-channe | 枚举类型,值为:【LOCAL, CLOUD】,值为CLOUD表示设置接入阿里云。忽略。 |
如果想要设置最大重试次数等一些相关初始化参数配置,很明显是不支持的。
同时,看一下构造consumer的源码,可以看到只配置了固定的几个属性:
private void initRocketMQPushConsumer() throws MQClientException { RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); } }
但是看代码的最后几行,rocketMQListener如果实现了RocketMQPushConsumerLifecycleListener接口,则会调用RocketMQPushConsumerLifecycleListener的prepareStart(consumer)方法,很明显,可以在这里设置consuemr的参数。
说明:rocketMQListener就是类上带有RocketMQMessageListener的bean。
解决方案
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_topic_consumer", selectorExpression = "*") class StringConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { LOGGER.info("receive message: {}", message); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 设置最大重试次数 consumer.setMaxReconsumeTimes(5); // 如下,设置其它consumer相关属性 consumer.setPullBatchSize(16); } }
末语
我是在翻源码的才想到这个解决方案,我想既然提供有这个接口进行自定义配置,官方文档应该会有示例说明,然后翻了下github,是有类似的使用方式的,源码上还有其它示例,如果有其它问题,建议还是先看官方示例是否提供了相关解决方案。github地址:https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer
这篇关于rocketmq-spring的consumer设置消费失败最大重试次数的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-27消息中间件底层原理资料详解
- 2024-11-27RocketMQ底层原理资料详解:新手入门教程
- 2024-11-27MQ底层原理资料详解:新手入门教程
- 2024-11-27MQ项目开发资料入门教程
- 2024-11-27RocketMQ源码资料详解:新手入门教程
- 2024-11-27本地多文件上传简易教程
- 2024-11-26消息中间件源码剖析教程
- 2024-11-26JAVA语音识别项目资料的收集与应用
- 2024-11-26Java语音识别项目资料:入门级教程与实战指南
- 2024-11-26SpringAI:Java 开发的智能新利器