RabbitMQ-任务模式
2022/4/26 23:43:30
本文主要是介绍RabbitMQ-任务模式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
概述
Work Queues,也被称为(Task Queues)任务模型
。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work
模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
以上的角色分别为如下所解释的:
- P:生产者:任务的发布者
- C1:消费者1,领取任务并且完成任务,假设完成速度较慢
- C2:消费者2:领取任务并完成任务,假设完成速度较快
创建生产者
代码如下所示:
java/** * @author: BNZeng **/ public class Producer { @Test public void sendMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); for (int i = 1; i <= 100; i++) { channel.basicPublish("", "hello", null, ("hello rabbitmq Work Queue → " + i).getBytes()); } // 关闭通道和连接 RabbitMQUtil.closeChannelAndConnection(channel, connection); System.out.println("消息发送成功"); } }
创建消费者 1
代码如下所示:
java/** * @author BNZeng */ public class Consumer1 { @Test public void receiveMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【1】收到消息 → " + new String(body)); } }); System.out.println("消费者【1】启动成功"); // 不能让程序结束 System.in.read(); // 释放资源 RabbitMQUtil.closeChannelAndConnection(channel, connection); } }
创建消费者 2
代码如下所示:
java/** * @author BNZeng */ public class Consumer2 { @Test public void receiveMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); // 把签收模式变成 false channel.basicConsume("hello", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【2】收到消息 → " + new String(body)); } }); System.out.println("消费者【2】启动成功"); // 不能让程序结束 System.in.read(); // 释放资源 RabbitMQUtil.closeChannelAndConnection(channel, connection); } }
进行测试
先启动消费者1和消费者2,再启动消息生产者发送消息,发现结果如下:
他们是平均消费的,官网有说明:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
那么实际开发中可能有消费者处理的慢,有的处理的快,那么如何配置呢,引入自动确认机制,
消息的自动确认机制
- 官方的说明
完成一项任务可能需要几秒钟。您可能想知道,如果一个消费者开始了一项很长的任务,但只完成了一部分就去世了,会发生什么。在我们当前的代码中,一旦 RabbitMQ 向使用者传递了一条消息,它就会立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们就会丢失它正在处理的信息。我们还将丢失所有发送给这个特定 worker 但尚未处理的消息。
但我们不想失去任何任务。如果一个工人死亡,我们希望任务被交付给另一个工人。
接下来改造消费者2用来模拟一下某一个消费者消费慢的情况下会怎么样,改造之后的代码如下:
java/** * @author BNTang */ public class Consumer2 { @Test public void receiveMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 一次只处理一条消息 channel.basicQos(1); channel.queueDeclare("hello", false, false, false, null); // 把签收模式变成 false channel.basicConsume("hello", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 手动签收 channel.basicAck(envelope.getDeliveryTag(), false); System.out.println("消费者【2】收到消息 → " + new String(body)); } }); System.out.println("消费者【2】启动成功"); // 不能让程序结束 System.in.read(); // 释放资源 RabbitMQUtil.closeChannelAndConnection(channel, connection); } }
运行起来进行测试,结果如下所示:
这篇关于RabbitMQ-任务模式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-27[开源] 一款轻量级的kafka可视化管理平台
- 2024-10-23Kafka消息丢失资料详解:初学者必看教程
- 2024-10-23Kafka资料新手入门指南
- 2024-10-23Kafka解耦入门:新手必读教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka消息丢失入门:新手必读指南
- 2024-10-23Kafka消息队列入门:新手必看的简单教程
- 2024-10-23Kafka消息队列入门与应用
- 2024-10-23Kafka重复消费入门:轻松掌握Kafka重复消息处理技巧