mqtt模式--Work 模式--公平分发
2022/7/1 23:21:53
本文主要是介绍mqtt模式--Work 模式--公平分发,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
rabbitmq 使用带有 perfetchCount = 1 设置的 basicQos 方法。当消费者接受处理并确认前一条消息前,不向此消费者发送新消息,会分配给其他空闲的消费者。
package com.tszr.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Productor { public static void main(String[] args){ // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 3、向 Queue1 发布20个消息 for (int i = 0; i < 20; i++) { String msg = "晴天: " + i; channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8)); } System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); } finally { // 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } // 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.tszr.work; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Worker1 { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); Channel finalChannel = channel; // Channel 使用 Qos 机制 finalChannel.basicQos(16); finalChannel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }); System.out.println("Worker1 开始接收消息"); System.in.read(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } // 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.tszr.work; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Worker2 { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); Channel finalChannel = channel; // Channel 使用 Qos 机制 finalChannel.basicQos(5); finalChannel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println("Worker2" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }); System.out.println("Worker2 开始接收消息"); System.in.read(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } // 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息
这篇关于mqtt模式--Work 模式--公平分发的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-27[开源] 一款轻量级的kafka可视化管理平台
- 2024-10-23Kafka消息丢失资料详解:初学者必看教程
- 2024-10-23Kafka资料新手入门指南
- 2024-10-23Kafka解耦入门:新手必读教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka消息丢失入门:新手必读指南
- 2024-10-23Kafka消息队列入门:新手必看的简单教程
- 2024-10-23Kafka消息队列入门与应用
- 2024-10-23Kafka重复消费入门:轻松掌握Kafka重复消息处理技巧