MQ底层原理教程:初学者快速入门指南
2024/11/26 23:03:57
本文主要是介绍MQ底层原理教程:初学者快速入门指南,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
本文详细介绍了消息队列(Message Queue,简称MQ)的工作机制,包括消息模型、发布/订阅模式和请求/响应模式等核心概念。文章深入探讨了MQ系统的核心组件,如消息队列、主题和代理,并解释了消息的发送和接收流程。此外,教程还提供了MQ的常见应用场景和部署配置方法,确保读者全面理解MQ的底层原理。
消息队列(MQ)是一种中间件,用于在不同组件或服务之间进行异步通信。MQ的主要功能是管理和转发消息,以确保消息从发送方传输到接收方,同时提供可靠性和灵活性。MQ在现代应用程序架构中扮演着重要角色,使得系统能够更好地解耦、扩展和处理异步任务。
- 异步通信:MQ使得生产者和消费者之间的通信异步化,生产者发送消息后无需等待消费者做出响应,提高了系统的响应速度和可用性。
- 解耦:通过引入消息队列,可以将不同的服务解耦,使得它们之间不再直接依赖,提升了系统的灵活性和可维护性。
- 流量削峰:在高并发场景下,MQ可以通过缓冲来削峰填谷,避免系统因瞬时流量过大而崩溃。
- 可靠传输:MQ提供了消息持久化和确认机制,确保消息不会因系统崩溃或网络问题而丢失。
- 消息顺序:在需要按顺序处理消息的场景中,MQ可以保证消息的有序传递。
- 负载均衡:MQ可以根据负载情况将消息分发到多个消费者,实现负载均衡。
- 可扩展性:MQ使得系统可以方便地扩展,增加新的生产者或消费者而不会影响现有系统的运行。
- 实时数据处理:MQ可以用于实时数据流处理,比如日志收集、用户行为分析和在线数据分析等场景。
- 系统解耦:在微服务架构中,服务之间通过MQ进行通信,可以实现服务间的解耦,方便各个服务的独立部署和扩展。
- 异步通信:在需要异步通信的场景中,比如用户请求发送邮件或短信,通过MQ可以实现异步处理。
- 流量削峰:在高并发场景下,MQ可以作为缓冲,减少系统压力,避免因瞬时流量过大而导致系统崩溃。
- 日志收集:通过MQ收集各个服务的日志,集中处理和存储,便于日志分析和审计。
- 监控报警:通过MQ发送监控数据或报警信息,确保系统监控的实时性和准确性。
消息队列(MQ)的核心概念包括消息模型、发布/订阅模式、请求/响应模式等,这些概念是理解MQ工作原理的基础。
消息模型描述了消息在MQ中的基本流程。消息是从发送方(生产者)发送到接收方(消费者)的。生产者将消息发布到指定的队列或主题,消费者从队列或主题中订阅并接收消息。消息模型包括以下几个关键部分:
- 消息(Message):消息是携带数据的信息单元,包含消息头和消息体。
- 生产者(Producer):生产者负责生成消息并将其发送到消息队列或主题。
- 消费者(Consumer):消费者从消息队列或主题中接收消息,并对消息进行处理。
- 队列(Queue):队列用于存储消息,确保消息有序传递。
- 主题(Topic):主题用于广播消息,多个消费者可以订阅同一个主题。
示例代码
以下是一个简单的消息模型的示例代码,展示了生产者发送消息和消费者接收消息的过程。
# 生产者发送消息 import pika def send_message(queue_name, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=message) print(f"Sent message: {message}") connection.close() # 消费者接收消息 import pika def callback(ch, method, properties, body): print("Received message:", body.decode()) def consume_messages(queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 使用示例 send_message('example_queue', 'Hello, world!') consume_messages('example_queue')
发布/订阅模式是一种消息传递模式,其中生产者(发布者)将消息发送到一个或多个订阅了该主题的消费者(订阅者)。
发布者
发布者将消息发送到一个主题,所有订阅该主题的消费者都会接收到消息。
订阅者
订阅者通过订阅一个或多个主题来接收消息。一个主题可以有多个订阅者,当发布者发送消息到该主题时,所有订阅者都会接收到消息。
示例代码
以下是一个发布/订阅模式的示例代码,展示了如何创建一个主题并订阅该主题,然后发布消息到该主题。
# 发布者发送消息到主题 import pika def send_message_to_topic(exchange_name, routing_key, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange=exchange_name, exchange_type='topic') channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message) print(f"Sent message to topic {exchange_name} with routing key {routing_key}: {message}") connection.close() # 订阅者订阅主题并接收消息 import pika def callback(ch, method, properties, body): print("Received message:", body.decode()) def subscribe_to_topic(exchange_name, routing_key): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange=exchange_name, exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print(f'Waiting for messages on topic {exchange_name} with routing key {routing_key}. To exit press CTRL+C') channel.start_consuming() # 使用示例 send_message_to_topic('example_exchange', 'user.log', 'User logged in') subscribe_to_topic('example_exchange', 'user.log')
请求/响应模式是一种同步消息传递模式,其中一个生产者发送请求消息,一个或多个消费者处理请求并返回响应消息。
生产者
生产者发送请求消息到一个队列,并等待消费者的响应。
消费者
消费者从队列中接收请求消息,处理请求,并将响应消息发送回生产者。
示例代码
以下是一个请求/响应模式的示例代码,展示了如何发送请求消息并接收响应消息。
# 发送请求消息 import pika def send_request(queue_name, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) correlation_id = '12345' properties = pika.BasicProperties(correlation_id=correlation_id) channel.basic_publish(exchange='', routing_key=queue_name, body=message, properties=properties) print(f"Sent request message: {message}") connection.close() return correlation_id # 处理请求并返回响应 import pika def handle_request(ch, method, properties, body): print(f"Received request: {body.decode()}") response = f"Response to request: {body.decode()}" ch.basic_publish(exchange='', routing_key=properties.reply_to, properties=pika.BasicProperties(correlation_id=properties.correlation_id), body=response) ch.basic_ack(delivery_tag=method.delivery_tag) def consume_requests(queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.basic_consume(queue=queue_name, on_message_callback=handle_request) print(f'Waiting for requests on queue {queue_name}. To exit press CTRL+C') channel.start_consuming() # 返回响应 import pika def receive_response(correlation_id, response_queue): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() method_frame, header_frame, body = channel.basic_get(queue=response_queue, no_ack=True) if method_frame: print(f"Received response: {body.decode()}") channel.basic_ack(method_frame.delivery_tag) else: print("No message received") connection.close() # 使用示例 correlation_id = send_request('request_queue', 'Hello, service') consume_requests('request_queue') receive_response(correlation_id, 'response_queue')
消息队列(MQ)系统由多个核心组件构成,包括消息队列、消息主题和消息代理等。这些组件协同工作,确保消息的可靠传输和处理。
消息队列是消息的暂存容器,消息发送者将消息发送到队列中,消息接收者从队列中读取消息。队列是先进先出(FIFO)的数据结构,确保消息按顺序传递。队列可以设置持久化属性,确保消息在系统崩溃后仍然存在。此外,队列可以设置最大容量,当队列满时,新的消息会被拒绝。
示例代码
以下是一个创建队列并发送消息的示例代码,展示了如何发送和接收消息。
# 创建队列并发送消息 import pika def create_and_send_message(queue_name, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=message) print(f"Sent message to queue {queue_name}: {message}") connection.close() # 接收队列中的消息 import pika def callback(ch, method, properties, body): print("Received message:", body.decode()) def consume_queue(queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print(f'Waiting for messages on queue {queue_name}. To exit press CTRL+C') channel.start_consuming() # 使用示例 create_and_send_message('example_queue', 'Hello, queue!') consume_queue('example_queue')
消息主题用于广播消息到多个订阅者。主题是消息的逻辑分类,多个订阅者可以订阅同一个主题。当发布者发送消息到主题时,所有订阅该主题的订阅者都会接收到消息。
示例代码
以下是一个订阅主题并接收消息的示例代码,展示了如何创建主题并订阅主题。
# 发布者发送消息到主题 import pika def send_message_to_topic(exchange_name, routing_key, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange=exchange_name, exchange_type='topic') channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message) print(f"Sent message to topic {exchange_name} with routing key {routing_key}: {message}") connection.close() # 订阅者订阅主题并接收消息 import pika def callback(ch, method, properties, body): print("Received message:", body.decode()) def subscribe_to_topic(exchange_name, routing_key): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange=exchange_name, exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print(f'Waiting for messages on topic {exchange_name} with routing key {routing_key}. To exit press CTRL+C') channel.start_consuming() # 使用示例 send_message_to_topic('example_exchange', 'user.log', 'User logged in') subscribe_to_topic('example_exchange', 'user.log')
消息代理是MQ系统的核心组件,负责管理和转发消息。代理接收来自生产者的消息,并将消息路由到相应的队列或主题。代理还负责管理和维护队列、主题等资源,并提供消息持久化、负载均衡等功能。
示例代码
以下是一个简单的消息代理的示例代码,展示了代理如何接收消息并将消息路由到队列。
# 简单的消息代理 import pika def handle_message(ch, method, properties, body): print(f"Received message: {body.decode()}") queue_name = 'example_queue' ch.basic_publish(exchange='', routing_key=queue_name, body=body) print(f"Forwarded message to queue {queue_name}: {body.decode()}") def setup_message_agent(): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='input_queue') channel.basic_consume(queue='input_queue', on_message_callback=handle_message, auto_ack=True) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 使用示例 setup_message_agent()
消息队列(MQ)的工作原理包括消息的发送流程、消息的接收流程和消息的可靠性保障机制。这些机制确保消息在传输过程中不会丢失,并且能够可靠地传递到目的地。
- 建立连接:生产者通过网络连接到MQ代理。
- 声明队列或主题:生产者需要声明要发送消息的目标队列或主题。
- 发送消息:生产者将消息发送到指定的队列或主题。
- 确认发送:生产者等待MQ代理的确认,确认消息已成功发送。
- 关闭连接:生产者断开与MQ代理的连接。
示例代码
以下是一个发送消息的示例代码,展示了如何通过MQ发送消息。
import pika def send_message(queue_name, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=message) print(f"Sent message to queue {queue_name}: {message}") connection.close() # 使用示例 send_message('example_queue', 'Hello, world!')
- 建立连接:消费者通过网络连接到MQ代理。
- 声明队列或主题:消费者需要声明要接收消息的队列或主题。
- 接收消息:消费者从队列或主题中接收消息。
- 处理消息:消费者处理接收到的消息。
- 确认接收:消费者发送确认消息给MQ代理,确认消息已成功接收。
- 关闭连接:消费者断开与MQ代理的连接。
示例代码
以下是一个接收消息的示例代码,展示了如何通过MQ接收消息。
import pika def callback(ch, method, properties, body): print("Received message:", body.decode()) def consume_queue(queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print(f'Waiting for messages on queue {queue_name}. To exit press CTRL+C') channel.start_consuming() # 使用示例 consume_queue('example_queue')
MQ提供了多种机制来确保消息的可靠性,包括持久化、确认机制和死信队列等。
持久化
持久化是MQ系统中一种重要的机制,它确保消息在磁盘上持久化存储,即使在系统崩溃后消息也不会丢失。持久化消息需要满足以下条件:
- 生产者声明消息持久化:生产者在发送消息时指定消息为持久化。
- MQ代理将消息持久化到磁盘:MQ代理将持久化消息存储到磁盘上。
- 消费者确认消息接收:消费者在接收并处理完消息后发送确认消息,MQ代理删除持久化消息。
示例代码
以下是一个发送持久化消息的示例代码,展示了如何发送持久化消息。
import pika def send_persistent_message(queue_name, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Transient) channel.basic_publish(exchange='', routing_key=queue_name, body=message, properties=properties) print(f"Sent persistent message to queue {queue_name}: {message}") connection.close() # 使用示例 send_persistent_message('persistent_queue', 'This is a persistent message')
确认机制
确认机制是MQ系统中另一种重要的机制,它确保消息被成功接收和处理。消费者在接收到消息后需要发送确认消息给MQ代理,确认消息已成功接收和处理。如果消费者在处理消息时出现异常,消息会被重新发送给消费者,确保消息不会丢失。
示例代码
以下是一个接收并确认消息的示例代码,展示了如何接收并确认消息。
import pika def callback(ch, method, properties, body): print("Received message:", body.decode()) ch.basic_ack(delivery_tag=method.delivery_tag) def consume_queue(queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) channel.basic_consume(queue=queue_name, on_message_callback=callback) print(f'Waiting for messages on queue {queue_name}. To exit press CTRL+C') channel.start_consuming() # 使用示例 consume_queue('persistent_queue')
死信队列
死信队列是MQ系统中的一种特殊队列,用于存储无法处理的消息。当消息在队列中等待的时间超过指定的时限,或者消息被拒绝接收,这些消息会被移动到死信队列中。死信队列可以用于监控和处理无法处理的消息。
示例代码
以下是一个创建死信队列的示例代码,展示了如何创建死信队列。
import pika def setup_dead_letter_queue(queue_name, dead_letter_queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name, arguments={ 'x-dead-letter-exchange': '', 'x-dead-letter-routing-key': dead_letter_queue_name }) channel.queue_declare(queue=dead_letter_queue_name) print(f"Setup dead letter queue for {queue_name} with dead letter queue {dead_letter_queue_name}") connection.close() # 使用示例 setup_dead_letter_queue('example_queue', 'dead_letter_queue')
消息队列(MQ)在现代应用程序架构中有着广泛的应用场景,包括实时数据处理、系统解耦和异步通信等。
在实时数据处理场景中,MQ可以用于收集和处理来自各种来源的数据,确保数据的实时性。例如,在日志收集系统中,可以通过MQ将来自不同服务的日志收集到一个集中位置,进行实时分析和处理。
示例代码
以下是一个日志收集的示例代码,展示了如何通过MQ收集日志。
import pika import logging def send_log(log_message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='logs') channel.basic_publish(exchange='', routing_key='logs', body=log_message) print(f"Sent log message: {log_message}") connection.close() # 日志收集示例 logger = logging.getLogger('example_logger') logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) def log_example(): logger.info('This is an info message') logger.error('This is an error message') send_log('This is a custom log message') # 使用示例 log_example()
在微服务架构中,服务之间通过MQ实现解耦,确保各个服务可以独立部署和扩展。通过引入MQ,服务之间不再直接依赖,提高了系统的灵活性和可维护性。
示例代码
以下是一个服务之间通过MQ通信的示例代码,展示了如何通过MQ实现服务的解耦。
import pika def send_order(order_details): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='orders') channel.basic_publish(exchange='', routing_key='orders', body=order_details) print(f"Sent order details: {order_details}") connection.close() def handle_order(ch, method, properties, body): print(f"Received order details: {body.decode()}") # 处理订单 print("Order processed") def consume_orders(): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='orders') channel.basic_consume(queue='orders', on_message_callback=handle_order, auto_ack=True) print('Waiting for order details. To exit press CTRL+C') channel.start_consuming() # 使用示例 send_order('Order 甥員') consume_orders()
在需要异步通信的场景中,MQ可以用于实现异步处理。例如,在发送邮件或短信的场景中,可以通过MQ实现异步处理,确保用户请求发送邮件或短信后立即返回,而不需要等待邮件或短信发送完成。
示例代码
以下是一个发送邮件的示例代码,展示了如何通过MQ实现异步发送邮件。
import pika import smtplib from email.mime.text import MIMEText def send_email_task(ch, method, properties, body): print(f"Received email task: {body.decode()}") email_body = body.decode() sender = 'sender@example.com' recipient = 'recipient@example.com' message = MIMEText(email_body) message['From'] = sender message['To'] = recipient message['Subject'] = 'Test Email' try: server = smtplib.SMTP('smtp.example.com', 587) server.starttls() server.login('username', 'password') server.sendmail(sender, recipient, message.as_string()) server.quit() print("Email sent successfully") except Exception as e: print(f"Error sending email: {e}") ch.basic_ack(delivery_tag=method.delivery_tag) def consume_email_tasks(): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='email_tasks') channel.basic_consume(queue='email_tasks', on_message_callback=send_email_task, auto_ack=False) print('Waiting for email tasks. To exit press CTRL+C') channel.start_consuming() # 使用示例 send_message('email_tasks', 'This is a test email body') consume_email_tasks()
部署和配置消息队列(MQ)是实现消息传递系统的关键步骤。正确的部署和配置可以确保消息的可靠传输和处理。本节将介绍MQ的安装与配置,并提供一些常见问题的解决方案。
安装RabbitMQ
RabbitMQ是一个流行的开源消息代理实现,支持多种消息协议。以下是安装RabbitMQ的步骤:
-
安装RabbitMQ:
- 在Ubuntu上安装RabbitMQ:
sudo apt-get update sudo apt-get install rabbitmq-server
- 在CentOS上安装RabbitMQ:
sudo yum install rabbitmq-server
- 在Windows上安装RabbitMQ:
- 下载RabbitMQ Windows安装包:https://www.rabbitmq.com/download.html
- 运行安装包,按照提示完成安装。
- 在Ubuntu上安装RabbitMQ:
-
启动RabbitMQ:
- 启动RabbitMQ服务:
sudo systemctl start rabbitmq-server
- 设置RabbitMQ服务开机启动:
sudo systemctl enable rabbitmq-server
- 启动RabbitMQ服务:
- 管理RabbitMQ:
- 使用RabbitMQ管理插件:
sudo rabbitmq-plugins enable rabbitmq_management
- 访问RabbitMQ管理界面:http://localhost:15672
- 使用RabbitMQ管理插件:
配置RabbitMQ
RabbitMQ可以通过配置文件进行详细的配置,以下是一些常用的配置选项:
-
配置文件:
- RabbitMQ的配置文件位于
/etc/rabbitmq/rabbitmq.conf
或/etc/rabbitmq/rabbitmq-env.conf
。 - 可以通过编辑配置文件来设置MQ的参数,例如配置虚拟主机、用户权限等。
- RabbitMQ的配置文件位于
-
虚拟主机:
- 创建虚拟主机:
sudo rabbitmqctl add_vhost my_vhost
- 添加用户并设置权限:
sudo rabbitmqctl add_user my_user my_password sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
- 创建虚拟主机:
-
队列和主题配置:
-
创建队列:
import pika def create_queue(queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name) print(f"Queue {queue_name} created") connection.close() # 使用示例 create_queue('example_queue')
-
创建主题:
import pika def create_topic(exchange_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange=exchange_name, exchange_type='topic') print(f"Topic {exchange_name} created") connection.close() # 使用示例 create_topic('example_exchange')
-
常见问题
-
连接问题:
- 无法连接到RabbitMQ服务器。
- RabbitMQ服务未启动或防火墙阻止了连接。
-
队列和主题管理问题:
- 创建队列或主题失败。
- 权限不足,无法访问虚拟主机或队列。
- 消息处理问题:
- 消息丢失或未正确处理。
- 消费者未接收到消息。
解决方案
-
连接问题:
- 检查RabbitMQ服务是否已启动:
sudo systemctl status rabbitmq-server
- 检查防火墙设置,确保端口15672(管理界面)和5672(默认RabbitMQ端口)是开放的。
- 检查RabbitMQ服务是否已启动:
-
队列和主题管理问题:
- 确保有足够的权限访问虚拟主机和队列:
sudo rabbitmqctl add_user my_user my_password sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
- 使用RabbitMQ管理界面检查队列和主题的状态。
- 确保有足够的权限访问虚拟主机和队列:
-
消息处理问题:
-
确保消息的持久化和消费者确认机制已启用:
import pika def send_persistent_message(queue_name, message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Transient) channel.basic_publish(exchange='', routing_key=queue_name, body=message, properties=properties) print(f"Sent persistent message to queue {queue_name}: {message}") connection.close() def consume_queue(queue_name): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) channel.basic_consume(queue=queue_name, on_message_callback=callback) print(f'Waiting for messages on queue {queue_name}. To exit press CTRL+C') channel.start_consuming() # 使用示例 send_persistent_message('persistent_queue', 'This is a persistent message') consume_queue('persistent_queue')
-
这篇关于MQ底层原理教程:初学者快速入门指南的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-26MQ消息中间件教程:初学者快速入门指南
- 2024-11-26手写消息队列项目实战:从零开始的入门教程
- 2024-11-26MQ底层原理教程:新手入门必备指南
- 2024-11-26MQ项目开发教程:初学者必备指南
- 2024-11-26MQ消息队教程:新手入门指南
- 2024-11-26MQ消息队列教程:从入门到实践
- 2024-11-26MQ消息中间件教程:新手入门详解
- 2024-11-26MQ源码教程:从入门到实践
- 2024-11-26MQ消息队列入门教程
- 2024-11-26MQ入门教程:轻松掌握消息队列基础知识