rabbitmq学习记录
2022/2/14 23:11:39
本文主要是介绍rabbitmq学习记录,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
部署
1。拉取安装包
docker pull rabbitmq:3-management
2.加载镜像
docker load -i mq.tar
3.安装rabbitMQ
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
15672是浏览器直接访问的管理员地址
5672是代码访问rabbitmq的地址
RabbitMQ中的几个概念: channel:操作MQ的工具 ; exchange:路由消息到队列中 ; queue:缓存消息 ; virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:
基本消息队列(BasicQueue) 工作消息队列(WorkQueue)
发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种: Fanout Exchange:广播 Direct Exchange:路由 Topic Exchange:主题
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
开发步骤
1.导包
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2.加入配置文件
spring: rabbitmq: host: 192.168.190.129 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: / 3.开发代码
Basic Queue 简单队列模型
生产者:
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } 消费者:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { System.out.println("消费者接收到simple.queue的消息:【" + msg + "】"); }
Work Queue 工作队列模型
生产者:
@Test public void testSendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message__"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } } 消费者
@Test public void testSendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message__"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } } @Test public void testSendFanoutExchange() { // 交换机名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, every one!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); } 注意:此时要在yml文件中增加配置
spring: rabbitmq: host: 192.168.190.129 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: / listener: simple: prefetch: 1 如果不加listener属性,多个消费者绑定同一个队列则会类似轮询机制,浪费服务器资源,加上后,处理完成才能获取下一个消息(类似能者多劳);
发布、订阅模型-Fanout
@Test public void testSendFanoutExchange() { // 交换机名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, every one!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】"); } 当前模式为广播模式,只要是绑定当前队列都会收到消息。
发布、订阅模型-Direct
@Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "itcast.direct"; // 消息 String message = "hello, red!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); } 只有和RoutingKey相同的才能接受到消息
发布、订阅模型-Topic
@Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "itcast.topic"; // 消息 String message = "今天天气不错,我的心情好极了!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.weather", message); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】"); } 主题模式,支持表达式。 #:代指0个或多个单词 *:代指一个单词 消息转换器 默认spring使用jkd的消息转换器,传递复杂类型时会被序列化 如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> 同时在代码中声明
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } spring在启动过程中自动会加载当前bean,替换默认的。 部署mq时,可能出现的问题
docker启动rabbitmq报错
WARNING: IPv4 forwarding is disabled. Networking will not work.
解决方法:
docker stop 镜像id
sudo docker rm 镜像id
systemctl restart network && systemctl restart docker
执行完上面的命令后,在重新启动
这篇关于rabbitmq学习记录的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-27[开源] 一款轻量级的kafka可视化管理平台
- 2024-10-23Kafka消息丢失资料详解:初学者必看教程
- 2024-10-23Kafka资料新手入门指南
- 2024-10-23Kafka解耦入门:新手必读教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka消息丢失入门:新手必读指南
- 2024-10-23Kafka消息队列入门:新手必看的简单教程
- 2024-10-23Kafka消息队列入门与应用
- 2024-10-23Kafka重复消费入门:轻松掌握Kafka重复消息处理技巧