阿里云RocketMQ定时/延迟消息队列实现
2021/8/4 23:09:52
本文主要是介绍阿里云RocketMQ定时/延迟消息队列实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
新的阅读体验:http://www.zhouhong.icu/post/157
一、业务需求
需要实现一个提前二十分钟通知用户去做某件事的一个业务,拿到这个业务首先想到的最简单得方法就是使用Redis监控Key值:在排计划时候计算当前时间与提前二十分钟这个时间差,然后使用一个唯一的业务Key压入Redis中并设定好过期时间,然后只需要让Redis监控这个Key值即可,当这个Key过期后就可以直接拿到这个Key的值然后实现发消息等业务。
关于Redis实现该业务的具体实现在之前我已经记过一篇笔记,有兴趣的可以直接去瞅瞅,但是现在感觉有好多不足之处。
Redis实现定时: http://www.zhouhong.icu/post/144
二、Redis实现定时推送等功能的不足之处
由于Redis不止你一个使用,其他业务也会使用Redis,那么最容易想到的一个缺点就是:1、如果在提醒的那一刻有大量的其他业务的Key也过期了,那么就会很长时间都轮不到你的这个Key,就会出现消息推送延迟等缺点;2、还有一个缺点就是像阿里云他们的Redis根本就不支持对 Redis 的 Key值得监控(我也是因为公司使用阿里云的Redis没法对Key监控才从之前使用Redis监控转移到使用RocketMQ的延时消息推送的。。。)
三、阿里云RocketMQ定时/延迟消息队列实现
其实在实现上非常简单
1、首先去阿里云控制台创建所需消息队列资源,包括消息队列 RocketMQ 的实例、Topic、Group ID (GID),以及鉴权需要的 AccessKey(AK),一般公司都有现成的可以直接使用。
2、在springboot项目pom.xml添加需要的依赖。
<!--阿里云MQ TCP--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.7.1.Final</version> </dependency>
3、在对应环境的application.properties文件配置参数
console: rocketmq: tcp: accessKey: XXXXXXXX使用自己的 secretKey: XXXXXXXXXXXXX使用自己的 nameSrvAddr: XXXXXXXXXXXXXXXX使用自己的 topic: XXXXXXX使用自己的 groupId: XXXXXXX使用自己的 tag: XXXXXXXXX使用自己的
4、封装MQ配置类
import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import java.util.Properties; /** * @Description: MQ配置类 * @Author: zhouhong * @Date: 2021/8/4 */ @Configuration @EnableConfigurationProperties({PatrolMqConfig.class}) @ConfigurationProperties(prefix = "console.rocketmq.tcp") @Primary public class PatrolMqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; private String topic; private String groupId; private String tag; private String orderTopic; private String orderGroupId; private String orderTag; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public String getNameSrvAddr() { return nameSrvAddr; } public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getOrderTopic() { return orderTopic; } public void setOrderTopic(String orderTopic) { this.orderTopic = orderTopic; } public String getOrderGroupId() { return orderGroupId; } public void setOrderGroupId(String orderGroupId) { this.orderGroupId = orderGroupId; } public String getOrderTag() { return orderTag; } public void setOrderTag(String orderTag) { this.orderTag = orderTag; } }
5、配置生产者
import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class PatrolProducerClient { @Autowired private PatrolMqConfig mqConfig; @Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } }
6、消费者订阅
import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; //项目中加上 @Configuration 注解,这样服务启动时consumer也启动了 @Configuration @Slf4j public class PatrolConsumerClient { @Autowired private PatrolMqConfig mqConfig; @Autowired private MqTimeMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.set); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); System.err.println("订阅成功!"); return consumerBean; } }
7、定时延时MQ消息监听消费
/** * @Description: 定时/延时MQ消息监听消费 * @Author: zhouhong * @Create: 2021-08-03 09:16 **/ @Component public class MqTimeMessageListener implements MessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public Action consume(Message message, ConsumeContext context) { System.err.println("收到消息啦!!"); logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}", message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody())); try { String msgTag = message.getTag(); // 消息类型 String msgKey = message.getKey(); // 业务唯一id switch (msgTag) { case "XXXX": // TODO 具体业务实现,比如发消息等操作 System.err.println("推送成功!!!!"); break; } return Action.CommitMessage; } catch (Exception e) { logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage()); //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息 return Action.ReconsumeLater; } } }
8、封装一个发延时/定时消息的工具类
/** * @Description: MQ发送消息助手 * @Author: zhouhong * @Create: 2021-08-03 09:06 **/ @Component public class ProducerUtil { private Logger logger = LoggerFactory.getLogger(ProducerUtil.class); @Autowired private PatrolMqConfig config; @Resource(name = "ConsoleProducer") ProducerBean producerBean; public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) { Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody); msg.setStartDeliverTime(delayTime); return this.send(msg,Boolean.FALSE); } /** * 普通消息发送发放 * @param msg 消息 * @param isOneWay 是否单向发送 */ private SendResult send(Message msg,Boolean isOneWay) { try { if(isOneWay) { //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。 //若数据不可丢,建议选用同步或异步发送方式。 producerBean.sendOneway(msg); success(msg, "单向消息MsgId不返回"); return null; }else { //可靠同步发送 SendResult sendResult = producerBean.send(msg); //获取发送结果,不抛异常即发送成功 if (sendResult != null) { success(msg, sendResult.getMessageId()); return sendResult; }else { error(msg,null); return null; } } } catch (Exception e) { error(msg,e); return null; } } private ExecutorService threads = Executors.newFixedThreadPool(3); private void error(Message msg,Exception e) { logger.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}" ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody())); logger.error("errorMsg --- {}",e.getMessage()); } private void success(Message msg,String messageId) { logger.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}" ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody())); } }
9、接口测试(10000表示延迟10秒,可以根据自己的业务计算出)
// 测试MQ延时 @Autowired ProducerUtil producerUtil; @PostMapping("/patrolTaskTemp/mqtest") public void mqTime(){ producerUtil.sendTimeMsg( "SMARTPATROL", "你好鸭!!!".getBytes(), "红红火火恍恍惚惚!!", System.currentTimeMillis() + 10000 ); }
10、结果
2021-08-04 22:07:12.677 INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil : 发送MQ消息成功 -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key:红红火火恍恍惚惚!!, tag:SMARTPATROL, body:你好鸭!!! 收到消息啦!! 推送成功!!!! 2021-08-04 22:07:22.179 INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener : 接收到MQ消息 -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key:红红火火恍恍惚惚!!, body:你好鸭!!!
这篇关于阿里云RocketMQ定时/延迟消息队列实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享