Python RabbitMQ基础知识
2021/9/19 11:06:16
本文主要是介绍Python RabbitMQ基础知识,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
rabbitmq
-
概念
消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,在很多生产环境中需要控制并发量的场景下用到。消息队列可为这些分布式应用程序提供通信和协调。当前使用较多的消息队列有RabbitMQ、RocketMQ、ActivateMQ、Kafka等。
- Broker:简单来说就是消息队列服务器实体
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
- producer:消息生产者,就是投递消息的程序
- consumer:消息消费者,就是接受消息的程序
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
-
应用场景
- 应用解耦:多用用间通过消息队列对同一个消息处理,避免调用接口失败导致整个过程失败。
- 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
- 限流削峰:双十一,618等抢购活动。
- 消息驱动系统:业务体量不断扩大,采用微服务设计思想,分布式的部署方式。
成本:
- 应用复杂度:需要对消息队列进行管理
- 暂时不一致性
使用消息队列满足条件:
- 生产者不需要立刻从消费者出获得反馈
- 容许短暂的不一致性
- 起到解耦,提速,广播,削峰等作用
消息队列的使用场景是怎样的?
Docker安装
rabbitmq是在portain平台上安装rabbitmq-management。
- 访问dockerhub,搜索rabbitmq,点击进去rabbitmq获取rabbitmq-management的dockerfile链接。
FROM rabbitmq:3.9 RUN set eux; \ rabbitmq-plugins enable --offline rabbitmq_management; \ # make sure the metrics collector is re-enabled (disabled in the base image for Prometheus-style metrics by default) rm -f /etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf; \ # grab "rabbitmqadmin" from inside the "rabbitmq_management-X.Y.Z" plugin folder # see https://github.com/docker-library/rabbitmq/issues/207 cp /plugins/rabbitmq_management-*/priv/www/cli/rabbitmqadmin /usr/local/bin/rabbitmqadmin; \ [ -s /usr/local/bin/rabbitmqadmin ]; \ chmod +x /usr/local/bin/rabbitmqadmin; \ apt-get update; \ apt-get install -y --no-install-recommends python3; \ rm -rf /var/lib/apt/lists/*; \ rabbitmqadmin --version EXPOSE 15671 15672
- 在Protainer上创建镜像。
-
运行镜像,主要是将容器的15672(management端口)和5672(amqp端口映射出来)。
-
最后访问http://[服务器ip]:15672即可到rabbitmq管理界面,输入默认账号密码guest/guest即可访问。
-
关于rabbitmq管理界面
点击任意一个Exchange:
点击任意一个queue:
用一个邮局的例子来说明各自的作用。首先邮局表示一个队列,邮筒就是一个channel。channel的作用是建立会话任务。每个地方建立一个邮局很“贵”类似每次建立TCP/IP链接非常“贵”且耗时,用户也无需每次跑到邮局,只需要把信放在邮筒即可。邮局收到用户的信后,根据信封上的地址(exchange)投递给收信方。
python实现
- 简单消费者生产者模式
import pika import json credentials = pika.PlainCredentials(user, user) # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = host,port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 声明消息队列,消息将在这个队列传递,如不存在,则创建 result = channel.queue_declare(queue = 'python-test') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向队列插入数值 routing_key是队列名 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message) print('send:'+message) connection.close() credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.3.130',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列 channel.queue_declare(queue = 'python-test', durable = False) # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print('receive:'+body.decode()) # 告诉rabbitmq,用callback来接收消息 channel.basic_consume('python-test',callback) # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()
把消费的代码注释掉,我们在rabbitmq management看看
import pika import json credentials = pika.PlainCredentials(user, user) # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = host,port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 声明消息队列,消息将在这个队列传递,如不存在,则创建 result = channel.queue_declare(queue = 'python-test') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向队列插入数值 routing_key是队列名 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message) print('send:'+message) connection.close() # credentials = pika.PlainCredentials('guest', 'guest') # connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.3.130',port = 5672,virtual_host = '/',credentials = credentials)) # channel = connection.channel() # # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列 # channel.queue_declare(queue = 'python-test', durable = False) # # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 # def callback(ch, method, properties, body): # ch.basic_ack(delivery_tag = method.delivery_tag) # print('receive:'+body.decode()) # # 告诉rabbitmq,用callback来接收消息 # channel.basic_consume('python-test',callback) # # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 # channel.start_consuming()
Ack mode选择Automatic Ack模式后,消费消息自动确认
- 工作模式
一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。有两种消息分发机制,轮询分发和公平分发:轮询分发的特点是将消息轮流发送给每个消费者,在实际情况中,多个消费者,难免有的处理得快,有的处理得慢,如果都要等到一个消费者处理完,才把消息发送给下一个消费者,效率就大大降低了。而公平分发的特点是,只要有消费者处理完,就会把消息发送给目前空闲的消费者,这样就提高消费效率了。
# producer import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close() # consumer import pika import time connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) # 公平分发 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
- Publish/Subscribe模式
publish.py
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
Subscribe.py
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
- 路由模式
producer.py
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
consumer.py
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
- Topic模式
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind( exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
- 路由模式
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
参考
RabbbitMq官网
消息队列rabbitmq
python实现rabbitmq六种模式
pika-Api文档
这篇关于Python RabbitMQ基础知识的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-26Python基础编程
- 2024-11-25Python编程基础:变量与类型
- 2024-11-25Python编程基础与实践
- 2024-11-24Python编程基础详解
- 2024-11-21Python编程基础教程
- 2024-11-20Python编程基础与实践
- 2024-11-20Python编程基础与高级应用
- 2024-11-19Python 基础编程教程
- 2024-11-19Python基础入门教程
- 2024-11-17在FastAPI项目中添加一个生产级别的数据库——本地环境搭建指南