python3使用RabbitMQ
2021/8/8 14:06:28
本文主要是介绍python3使用RabbitMQ,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦。
作用:
- 解耦
- 异步
- 削峰
使用
简单模式(直接使用队列不用交换机)
生产者:
import pika import json import time credentials = pika.PlainCredentials('guest', 'guest') # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 声明消息队列,消息将在这个队列传递,如不存在,则创建 result = channel.queue_declare(queue = 'python-test') result = channel.queue_declare(queue = 'python-test1') # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储 # channel.exchange_declare(exchange = '',durable = True, exchange_type='fanout') for i in range(100): message=json.dumps({'OrderId':"1000%s"%i}) # 向队列插入数值 routing_key是队列名 channel.basic_publish(exchange='', routing_key='python-test', body=message) channel.basic_publish(exchange='', routing_key='python-test1', body=message) print(message) time.sleep(2) #connection.close()
消费者
import pika credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列 channel.queue_declare(queue = 'python-test', durable = False) # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) # 告诉rabbitmq,用callback来接收消息 channel.basic_consume('python-test',callback) # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()
使用exchange
fanout模式
生产者
import pika import json import time credentials = pika.PlainCredentials('guest', 'guest') # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) channel=connection.channel() channel.exchange_declare(exchange='test1', exchange_type='fanout') for i in range(100): message=json.dumps({'OrderId':"1000%s"%i}) # 当使用交换机的时候直接把消息交给交换机就好了 channel.basic_publish(exchange='test1', routing_key='', body=message) body=message) print(message) time.sleep(2) #connection.close()
消费者
import pika import time credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) channel = connection.channel() # 绑定交换机 channel.exchange_declare(exchange='test1', exchange_type='fanout') # 不指定queue名字,rabbit会随机分配一个名字 # exclusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) # 将exchange与queue绑定 channel.queue_bind(exchange='test1', queue=queue_name) # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): #ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #time.sleep(0) # 告诉rabbitmq,用callback来接收对应queue的消息 channel.basic_consume(queue_name, callback) # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()
direct模式
生产者
import pika import json import time credentials = pika.PlainCredentials('guest', 'guest') # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) channel=connection.channel() channel.exchange_declare(exchange='test_direct', exchange_type='direct') for i in range(100): message=json.dumps({'OrderId':"1000%s"%i}) # 当使用交换机的时候直接把消息交给交换机,并指定发布到哪个queue channel.basic_publish(exchange='test_direct', routing_key='con2', body=message) print(message) time.sleep(2) #connection.close()
消费者
import pika import time credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) channel = connection.channel() # 绑定交换机 channel.exchange_declare(exchange='test_direct', exchange_type='direct') # 不指定queue名字,rabbit会随机分配一个名字 # exclusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) # 将exchange与queue绑定 channel.queue_bind(exchange='test_direct', queue=queue_name, routing_key='con1') channel.queue_bind(exchange='test_direct', queue=queue_name, routing_key='con2') # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): #ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #time.sleep(0) # 告诉rabbitmq,用callback来接收对应queue的消息 channel.basic_consume(queue_name, callback) # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()
topic模式
星号(*):匹配一个单词
井号(#):匹配0个或多个单词
direct与topic都是通过消费者指定routing_key来实现绑定
当产生routing_key为con1的消息时,路由到python-test中
当产生routing_key为con1.xxx的消息时,路由到python-test1中
当产生routing_key为con2的消息时,路由到python-test1中
当产生routing_key为con2.xxx的消息时,路由到python-test1中
当产生routing_key为con2.xxx.xxxx的消息时,路由到python-test1中
import pika import json import time credentials = pika.PlainCredentials('guest', 'guest') # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) channel=connection.channel() channel.exchange_declare(exchange='test_topic', exchange_type='topic') for i in range(100): message=json.dumps({'OrderId':"1000%s"%i}) # 当使用交换机的时候直接把消息交给交换机,并指定发布到哪个queue channel.basic_publish(exchange='test_topic', routing_key='con2', body=message) print(message) time.sleep(2) #connection.close()
import pika import time credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) channel = connection.channel() # 绑定交换机 channel.exchange_declare(exchange='test_topic', exchange_type='topic') # 不指定queue名字,rabbit会随机分配一个名字 # exclusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) # 将exchange与queue绑定 channel.queue_bind(exchange='test_topic', queue=queue_name, routing_key='con1.*') channel.queue_bind(exchange='test_topic', queue=queue_name, routing_key='con2.#') # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): #ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #time.sleep(0) # 告诉rabbitmq,用callback来接收对应queue的消息 channel.basic_consume(queue_name, callback) # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()
存在问题
数据的重复消费
数据丢失
相关内容可以参考大佬的这篇文章https://www.jianshu.com/p/5ade5bf0dcd9
这篇关于python3使用RabbitMQ的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-25Python编程基础:变量与类型
- 2024-11-25Python编程基础与实践
- 2024-11-24Python编程基础详解
- 2024-11-21Python编程基础教程
- 2024-11-20Python编程基础与实践
- 2024-11-20Python编程基础与高级应用
- 2024-11-19Python 基础编程教程
- 2024-11-19Python基础入门教程
- 2024-11-17在FastAPI项目中添加一个生产级别的数据库——本地环境搭建指南
- 2024-11-16`PyMuPDF4LLM`:提取PDF数据的神器