RabbitMQ快速使用代码手册
2023/6/17 1:22:50
本文主要是介绍RabbitMQ快速使用代码手册,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~
内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/Ab7Cjiu
参考文档
csdn博客:
基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134
高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012
application.yml
server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: root password: root #虚拟host 可以不设置,使用server默认host virtual-host: JCcccHost #确认消息已发送到交换机(Exchange) #publisher-confirms: true publisher-confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true
完善更多信息
spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: guest password: guest publisher-confirm-type: correlated publisher-returns: true template: mandatory: true retry: #发布重试,默认false enabled: true #重试时间 默认1000ms initial-interval: 1000 #重试最大次数 最大3 max-attempts: 3 #重试最大间隔时间 max-interval: 10000 #重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s multiplier: 1 listener: \# 默认配置是simple type: simple simple: \# 手动ack Acknowledge mode of container. auto none acknowledge-mode: manual #消费者调用程序线程的最小数量 concurrency: 10 #消费者最大数量 max-concurrency: 10 #限制消费者每次只处理一条信息,处理完在继续下一条 prefetch: 1 #启动时是否默认启动容器 auto-startup: true #被拒绝时重新进入队列 default-requeue-rejected: true
相关注解说明
@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue
里面的消息。
@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。
\@Component public class PointConsumer { //监听的队列名 \@RabbitListener(queues = \"point.to.point\") public void processOne(String name) { System.out.println(\"point.to.point:\" + name); } }
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给
@RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
\@Component \@RabbitListener(queues = \"consumer_queue\") public class Receiver { \@RabbitHandler public void processMessage1(String message) { System.out.println(message); } \@RabbitHandler public void processMessage2(byte\[\] message) { System.out.println(new String(message)); } }
@Payload
可以获取消息中的 body 信息
\@RabbitListener(queues = \"debug\") public void processMessage1(@Payload String body) { System.out.println(\"body:\"+body); }
@Header,@Headers
可以获得消息中的 headers 信息
\@RabbitListener(queues = \"debug\") public void processMessage1(@Payload String body, \@Header String token) { System.out.println(\"body:\"+body); System.out.println(\"token:\"+token); } \@RabbitListener(queues = \"debug\") public void processMessage1(@Payload String body, \@Headers Map\<String,Object\> headers) { System.out.println(\"body:\"+body); System.out.println(\"Headers:\"+headers); }
快速使用
配置xml文件
<dependency\> \<groupId\>org.springframework.boot\</groupId\> \<artifactId\>spring-boot-starter-amqp\</artifactId\> \</dependency\>
配置exchange、queue
注解快速创建版本
\@Configuration public class RabbitmqConfig { //创建交换机 //通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机 \@Bean(\"bootExchange\") public Exchange bootExchange() { return ExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build(); } //创建队列 \@Bean(\"bootQueue\") public Queue bootQueue() { return QueueBuilder.durable(\"zx_queue\").build(); } /\*\* \* 将队列与交换机绑定 \* \* \@param queue \* \@param exchange \* \@return \*/ \@Bean public Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue, \@Qualifier(\"bootExchange\") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs(); } }
Direct
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /\*\* \* \@Author : JCccc \* \@CreateTime : 2019/9/3 \* \@Description : \*\*/ \@Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue \@Bean public Queue TestDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue(\"TestDirectQueue\",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue(\"TestDirectQueue\",true); } //Direct交换机 起名:TestDirectExchange \@Bean DirectExchange TestDirectExchange() { // return new DirectExchange(\"TestDirectExchange\",true,true); return new DirectExchange(\"TestDirectExchange\",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting \@Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\"); } \@Bean DirectExchange lonelyDirectExchange() { return new DirectExchange(\"lonelyDirectExchange\"); } }
Fanout
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /\*\* \* \@Author : JCccc \* \@CreateTime : 2019/9/3 \* \@Description : \*\*/ \@Configuration public class FanoutRabbitConfig { /\*\* \* 创建三个队列 :fanout.A fanout.B fanout.C \* 将三个队列都绑定在交换机 fanoutExchange 上 \* 因为是扇型交换机, 路由键无需配置,配置也不起作用 \*/ \@Bean public Queue queueA() { return new Queue(\"fanout.A\"); } \@Bean public Queue queueB() { return new Queue(\"fanout.B\"); } \@Bean public Queue queueC() { return new Queue(\"fanout.C\"); } \@Bean FanoutExchange fanoutExchange() { return new FanoutExchange(\"fanoutExchange\"); } \@Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } \@Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } \@Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
Topic
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /\*\* \* \@Author : JCccc \* \@CreateTime : 2019/9/3 \* \@Description : \*\*/ \@Configuration public class TopicRabbitConfig { //绑定键 public final static String man = \"topic.man\"; public final static String woman = \"topic.woman\"; \@Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } \@Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } \@Bean TopicExchange exchange() { return new TopicExchange(\"topicExchange\"); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 \@Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 \@Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\"); } }
生产者发送消息
直接发送给队列
//指定消息队列的名字,直接发送消息到消息队列中 \@Test public void testSimpleQueue() { // 队列名称 String queueName = \"simple.queue\"; // 消息 String message = \"hello, spring amqp!\"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); }
发送给交换机,然后走不同的模式
////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息 \@Test public void testSendDirectExchange() { // 交换机名称,有三种类型 String exchangeName = \"itcast.direct\"; // 消息 String message = \"红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!\"; // 发送消息,red为队列的key,因此此队列会得到消息 rabbitTemplate.convertAndSend(exchangeName, \"red\", message); }
也可以将发送的消息封装到HashMap中然后发送给交换机
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; /\*\* \* \@Author : JCccc \* \@CreateTime : 2019/9/3 \* \@Description : \*\*/ \@RestController public class SendMessageController { \@Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 \@GetMapping(\"/sendDirectMessage\") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = \"test message, hello!\"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss\")); Map\<String,Object\> map=new HashMap\<\>(); map.put(\"messageId\",messageId); map.put(\"messageData\",messageData); map.put(\"createTime\",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend(\"TestDirectExchange\", \"TestDirectRouting\", map); return \"ok\"; } }
消费者接收消息
//使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。 \@Component public class MessageListener { \@RabbitListener(queues = \"direct_queue\") public void receive(String id){ System.out.println(\"已完成短信发送业务(rabbitmq direct),id:\"+id); } } 参数用Map接收也可以 \@Component \@RabbitListener(queues = \"TestDirectQueue\")//监听的队列名称 TestDirectQueue public class DirectReceiver { \@RabbitHandler public void process(Map testMessage) { System.out.println(\"DirectReceiver消费者收到消息 : \" + testMessage.toString()); } }
高级特性
消息可靠性传递
有confirm和return两种
在application.yml中添加以下配置项:
server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: root password: root #虚拟host 可以不设置,使用server默认host virtual-host: JCcccHost #确认消息已发送到交换机(Exchange) #publisher-confirms: true publisher-confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true
有两种配置方法:
写到配置类中
写到工具类或者普通类中,但是这个类得实现那两个接口
写法一
编写消息确认回调函数
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; \@Configuration public class RabbitConfig { \@Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { \@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(\"ConfirmCallback: \"+\"相关数据:\"+correlationData); System.out.println(\"ConfirmCallback: \"+\"确认情况:\"+ack); System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { \@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(\"ReturnCallback: \"+\"消息:\"+message); System.out.println(\"ReturnCallback: \"+\"回应码:\"+replyCode); System.out.println(\"ReturnCallback: \"+\"回应信息:\"+replyText); System.out.println(\"ReturnCallback: \"+\"交换机:\"+exchange); System.out.println(\"ReturnCallback: \"+\"路由键:\"+routingKey); } }); return rabbitTemplate; } }
写法二
\@Component \@Slf4j public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { \@Resource private RedisTemplate\<String, String\> redisTemplate; \@Resource private RabbitTemplate rabbitTemplate; private String finalId = null; private SmsDTO smsDTO = null; /\*\* \* 发布者确认的回调 \* \* \@param correlationData 回调的相关数据。 \* \@param b ack为真,nack为假 \* \@param s 一个可选的原因,用于nack,如果可用,否则为空。 \*/ \@Override public void confirm(CorrelationData correlationData, boolean b, String s) { // 消息发送成功,将redis中消息的状态(status)修改为1 if (b) { redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX + finalId, \"status\", 1); } else { // 发送失败,放入redis失败集合中,并删除集合数据 log.error(\"短信消息投送失败:{}\--\>{}\", correlationData, s); redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId); redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId, this.smsDTO); } } /\*\* \* 发生异常时的消息返回提醒 \* \* \@param returnedMessage \*/ \@Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error(\"发生异常,返回消息回调:{}\", returnedMessage); // 发送失败,放入redis失败集合中,并删除集合数据 redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId); redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId, this.smsDTO); } \@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } }
消息确认机制
手动确认
yml配置 #手动确认 manual listener: simple: acknowledge-mode: manual
写法一
首先在消费者项目中创建MessageListenerConfig
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; \@Configuration public class MessageListenerConfig { \@Autowired private CachingConnectionFactory connectionFactory; \@Autowired private MyAckReceiver myAckReceiver;//消息接收处理类 \@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 //设置一个队列 container.setQueueNames(\"TestDirectQueue\"); //如果同时设置多个如下: 前提是队列都是必须已经创建存在的 // container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\"); //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues //container.setQueues(new Queue(\"TestDirectQueue\",true)); //container.addQueues(new Queue(\"TestDirectQueue2\",true)); //container.addQueues(new Queue(\"TestDirectQueue3\",true)); container.setMessageListener(myAckReceiver); return container; } }
然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; \@Component public class MyAckReceiver implements ChannelAwareMessageListener { \@Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { byte\[\] body = message.getBody(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body)); Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject(); String messageId = msgMap.get(\"messageId\"); String messageData = msgMap.get(\"messageData\"); String createTime = msgMap.get(\"createTime\"); ois.close(); System.out.println(\" MyAckReceiver messageId:\"+messageId+\" messageData:\"+messageData+\" createTime:\"+createTime); System.out.println(\"消费的主题消息来自:\"+message.getMessageProperties().getConsumerQueue()); channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 //channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝 } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:
首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; \@Component public class MyAckReceiver implements ChannelAwareMessageListener { \@Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { byte\[\] body = message.getBody(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body)); Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject(); String messageId = msgMap.get(\"messageId\"); String messageData = msgMap.get(\"messageData\"); String createTime = msgMap.get(\"createTime\"); ois.close(); if (\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue()); System.out.println(\"消息成功消费到 messageId:\"+messageId+\" messageData:\"+messageData+\" createTime:\"+createTime); System.out.println(\"执行TestDirectQueue中的消息的业务处理流程\...\...\"); } if (\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue()); System.out.println(\"消息成功消费到 messageId:\"+messageId+\" messageData:\"+messageData+\" createTime:\"+createTime); System.out.println(\"执行fanout.A中的消息的业务处理流程\...\...\"); } channel.basicAck(deliveryTag, true); //channel.basicReject(deliveryTag, true);//为true会重新放回队列 } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
写法二
\@Component \@Slf4j public class SendSmsListener { \@Resource private RedisTemplate\<String, String\> redisTemplate; \@Resource private SendSmsUtils sendSmsUtils; /\*\* \* 监听发送短信普通队列 \* \@param smsDTO \* \@param message \* \@param channel \* \@throws IOException \*/ \@RabbitListener(queues = SMS_QUEUE_NAME) public void sendSmsListener(SmsDTO smsDTO, Message message, Channel channel) throws IOException { String messageId = message.getMessageProperties().getMessageId(); int retryCount = (int) redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX + messageId, \"retryCount\"); if (retryCount \> 3) { //重试次数大于3,直接放到死信队列 log.error(\"短信消息重试超过3次:{}\", messageId); //basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。 //该方法reject后,该消费者还是会消费到该条被reject的消息。 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId); return; } try { String phoneNum = smsDTO.getPhoneNum(); String code = smsDTO.getCode(); if(StringUtils.isAnyBlank(phoneNum,code)){ throw new RuntimeException(\"sendSmsListener参数为空\"); } // 发送消息 SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum, code); SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet(); SendStatus sendStatus = sendStatusSet\[0\]; if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"send success\".equals(sendStatus.getMessage())){ throw new RuntimeException(\"发送验证码失败\"); } //手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info(\"短信发送成功:{}\",smsDTO); redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId); } catch (Exception e) { redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1); channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } /\*\* \* 监听到发送短信死信队列 \* \@param sms \* \@param message \* \@param channel \* \@throws IOException \*/ \@RabbitListener(queues = SMS_DELAY_QUEUE_NAME) public void smsDelayQueueListener(SmsDTO sms, Message message, Channel channel) throws IOException { try{ log.error(\"监听到死信队列消息==\>{}\",sms); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } }
消费端限流
#配置RabbitMQ spring: rabbitmq: host: 192.168.126.3 port: 5672 username: guest password: guest virtual-host: / #开启自动确认 none 手动确认 manual listener: simple: #消费端限流机制必须开启手动确认 acknowledge-mode: manual #消费端最多拉取的消息条数,签收后不满该条数才会继续拉取 prefetch: 5
消息存活时间TTL
可以设置队列的存活时间,也可以设置具体消息的存活时间
设置队列中所有消息的存活时间
return QueueBuilder
.durable(QUEUE_NAME)//队列持久化
.ttl(10000)//设置队列的所有消息存活10s
.build();
即在创建队列时,设置存活时间
设置某条消息的存活时间
//发送消息,并设置该消息的存活时间
\@Test public void testSendMessage() { //1.创建消息属性 MessageProperties messageProperties = new MessageProperties(); //2.设置存活时间 messageProperties.setExpiration(\"10000\"); //3.创建消息对象 Message message = new Message(\"sendMessage\...\".getBytes(),messageProperties); //4.发送消息 rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message); }
若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。
在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准
死信队列
死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能
import org.springframework.amqp.core.\*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; \@Configuration//Rabbit配置类 public class RabbitConfig4 { private final String DEAD_EXCHANGE = \"dead_exchange\"; private final String DEAD_QUEUE = \"dead_queue\"; private final String NORMAL_EXCHANGE = \"normal_exchange\"; private final String NORMAL_QUEUE = \"normal_queue\"; //创建死信交换机 \@Bean(DEAD_EXCHANGE) public Exchange deadExchange() { return ExchangeBuilder .topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机 .durable(true)//是否持久化,true即存到磁盘,false只在内存上 .build(); } //创建死信队列 \@Bean(DEAD_QUEUE) public Queue deadQueue() { return QueueBuilder .durable(DEAD_QUEUE)//队列持久化 //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源 .build(); } //死信交换机绑定死信队列 \@Bean //@Qualifier注解,使用名称装配进行使用 public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, \@Qualifier(DEAD_QUEUE) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with(\"dead_routing\") .noargs(); } //创建普通交换机 \@Bean(NORMAL_EXCHANGE) public Exchange normalExchange() { return ExchangeBuilder .topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机 .durable(true)//是否持久化,true即存到磁盘,false只在内存上 .build(); } //创建普通队列 \@Bean(NORMAL_QUEUE) public Queue normalQueue() { return QueueBuilder .durable(NORMAL_QUEUE)//队列持久化 //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源 .deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机 .deadLetterRoutingKey(\"dead_routing\")//死信队列路由关键字 .ttl(10000)//消息存活10s .maxLength(10)//队列最大长度为10 .build(); } //普通交换机绑定普通队列 \@Bean //@Qualifier注解,使用名称装配进行使用 public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange, \@Qualifier(NORMAL_QUEUE) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with(\"my_routing\") .noargs(); } }
延迟队列
RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能
即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。
消费者监听延迟队列
\@Component public class ExpireOrderConsumer { //监听过期订单队列 \@RabbitListener(queues = \"expire_queue\") public void listenMessage(String orderId) { //模拟处理数据库等业务 System.out.println(\"查询\"+orderId+\"号订单的状态,如果已支付无需处理,如果未支付则回退库存\"); } } 控制层代码 \@RestController public class OrderController { \@Autowired private RabbitTemplate rabbitTemplate; \@RequestMapping(value = \"/place/{orderId}\",method = RequestMethod.GET) public String placeOrder(@PathVariable String orderId) { //模拟service层处理 System.out.println(\"处理订单数据\...\"); //将订单id发送到订单队列 rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId); return \"下单成功,修改库存\"; } }
这篇关于RabbitMQ快速使用代码手册的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-27[开源] 一款轻量级的kafka可视化管理平台
- 2024-10-23Kafka消息丢失资料详解:初学者必看教程
- 2024-10-23Kafka资料新手入门指南
- 2024-10-23Kafka解耦入门:新手必读教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka消息丢失入门:新手必读指南
- 2024-10-23Kafka消息队列入门:新手必看的简单教程
- 2024-10-23Kafka消息队列入门与应用
- 2024-10-23Kafka重复消费入门:轻松掌握Kafka重复消息处理技巧