RabbitMQ路由
2021/6/29 6:20:34
本文主要是介绍RabbitMQ路由,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
在前面的教程中,我们实现了一个简单的日志系统。可以把日志消息广播给多个接受者。
本篇教程中,我们打算新增一个功能--使得它能够只订阅消息的一个子集。例如,我们只需要去把严重的错误日志信息写入日志文件(存储到磁盘), 但同时仍然把所有的日志信息输出到控制台.
绑定(Bindings)
前面的例子,我们已经创建绑定(Bindings), 代码如下
绑定是指交换机和队列的关系,可以简单理解为这个队列(queue)对这个交换机(exchange)的消息感兴趣
绑定的时候可以带上一个额外的routing_key参数, 为了避免与basic_publish的参数混淆, 我们把它叫做绑定键(binding key)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
绑定键的意义取决于交换机(exchange)的类型。我们之前使用过的扇形交换机(fanout exchanges)会忽略这个值
直连交换机(Direct exchange)
我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘, 以免在警告(warning)或者信息(info)日志上浪费磁盘空间
我们使用的扇形交换机没有足够的灵活性--它能做的仅仅是广播
我们将会使用直连交换机(direct exchange)来代替。路由的算法很简单--交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息分发到哪个队列
下图很好的描述这个场景
在这个场景中, 我们可以看到直连交换机X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定, 一个使用black作为绑定键,另外一个使用green
这样,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将被丢弃
例子
发布者:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() exchange_name = 'logs2' channel.exchange_declare(exchange=exchange_name, exchange_type='direct') message = (' '.join(sys.argv[1:]) or "info: Hello World!") channel.basic_publish(exchange=exchange_name, routing_key='black1', body=message) print(" [x] Sent %r" % (message,)) connection.close()
消费者1:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() exchange_name = 'logs2' channel.exchange_declare(exchange=exchange_name, exchange_type='direct') queue_name = 'test1' result = channel.queue_declare(queue=queue_name, exclusive=True) # queue_name = result.method.queue channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % (body,)) channel.basic_consume(queue=queue_name, on_message_callback=callback, exclusive=True, auto_ack=False) # 在用完此队列后立即删除 channel.start_consuming()
消费者2:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() exchange_name = 'logs2' channel.exchange_declare(exchange=exchange_name, exchange_type='direct') queue_name = 'test2' result = channel.queue_declare(queue=queue_name, exclusive=True) channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black1') # 只接收到直连交换机logs2的路由键为black1的消息 print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % (body,)) channel.basic_consume(queue=queue_name, on_message_callback=callback, exclusive=True, auto_ack=False) # 在用完此队列后立即删除 channel.start_consuming()
多个绑定(Multiple bindings)
多个队列使用相同的绑定键是合法的,这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样,直连交换机和扇形交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。
发送日志
我们将会发送消息到一个直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别选择想要处理的日志
订阅
处理接收消息的方式和之前查不到,只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定
代码整合
代码和上面类似
这篇关于RabbitMQ路由的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-29AutoMQ 产品动态 | 企业版正式上线阿里云、AWS 中国区云市场
- 2024-05-29盘点 AutoMQ 深度使用的阿里云云原生技术
- 2024-05-29盘点 AutoMQ 深度使用的阿里云云原生技术
- 2024-05-29AutoMQ 社区双周精选第十期
- 2024-05-08「布道师系列文章」解析 AutoMQ 对象存储中的文件存储格式
- 2024-05-08「布道师系列文章」小红书黄章衡:AutoMQ Serverless 基石-秒级分区迁移
- 2024-05-08AutoMQ 系统测试体系揭秘
- 2024-03-14AutoMQ 携手阿里云共同发布新一代云原生 Kafka,帮助得物有效压缩 85% Kafka 云支出!
- 2024-02-22kafka partitioner
- 2024-01-24AutoMQ生态集成 - 将数据从 AutoMQ Kafka 导入 RisingWave 数据库