RabbitMQ Fanout交换机代码实现
2021/6/19 0:02:47
本文主要是介绍RabbitMQ Fanout交换机代码实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一般情况下,生产者发送消息,先到先得,一个消费者消费之后,该条消息便消失不会再被消费,抢完即止。
那能否生产者发送的消息每个消费者都能接收到,都能消费呢?
Fanout交换机就可以实现。
代码实现:
生产者:
public class FanoutExchange { public void FanoutPublish() { MQHelper mh = new MQHelper(); using (var conn = mh.GetConnection()) { using (IModel channel = conn.CreateModel()) { //声明队列 channel.QueueDeclare(queue: "FanoutAdu001", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "FanoutAdu002", durable: true, exclusive: false, autoDelete: false, arguments: null); //声明交换机 channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //绑定 channel.QueueBind(queue: "FanoutAdu001", exchange: "FanoutExchange", routingKey: string.Empty,arguments:null); channel.QueueBind(queue: "FanoutAdu002", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); //发布 int i = 0; while (true) { string message = $"通知{i}"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "FanoutExchange", routingKey: string.Empty, basicProperties: null, body: body); Console.WriteLine($"通知{i}已发送到队列"); Thread.Sleep(2000); i++; } } } } }
可以看到,这里和Direct交换机代码相比,类型发生了变化,同时路由键变成了Empty
消费者:
public class FanoutExchangeConsumer { public void FanoutConsume() { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "guest"; factory.Password = "guest"; using (var conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //声明队列 channel.QueueDeclare(queue: "FanoutAdu002", durable: true, exclusive: false, autoDelete: false, arguments: null); //声明交换机 channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //绑定 channel.QueueBind(queue: "FanoutAdu002", exchange: "FanoutExchange", routingKey: string.Empty); //消费消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"接收成功,[{message}]"); }; //处理消息 channel.BasicConsume(queue: "FanoutAdu002", autoAck: true, consumer: consumer); } } } }
这里每个消费者一条路由,都能够接收生产者发送的所有消息
这篇关于RabbitMQ Fanout交换机代码实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-10RabbitMQ教程:初学者指南
- 2024-09-10RabbitMQ教程:初学者指南
- 2024-09-01Kafka事务实现原理
- 2024-08-09KubeSphere 部署 Kafka 集群实战指南
- 2024-07-24百行代码实现 Kafka 运行在 S3 之上
- 2024-07-18如何使用观测云监测 AutoMQ 集群状态
- 2024-07-18活动回顾 | AutoMQ 联合 GreptimeDB 共同探讨新能源汽车数据基础设施
- 2024-07-15AutoMQ vs Kafka: 来自小红书的独立深度评测与对比
- 2024-07-15AutoMQ 生态集成 Kafdrop-ui
- 2024-07-15AutoMQ 与蚂蚁数科达成战略合作