rabbitmq学习记录

2022/2/14 23:11:39

本文主要是介绍rabbitmq学习记录,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

部署

1。拉取安装包

docker pull rabbitmq:3-management

2.加载镜像

docker load -i mq.tar

3.安装rabbitMQ

docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

 

15672是浏览器直接访问的管理员地址

5672是代码访问rabbitmq的地址 

 

RabbitMQ中的几个概念: channel:操作MQ的工具 ;  exchange:路由消息到队列中  ; queue:缓存消息 ;  virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:

基本消息队列(BasicQueue) 工作消息队列(WorkQueue)

发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种: Fanout Exchange:广播 Direct Exchange:路由 Topic Exchange:主题

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

开发步骤

1.导包

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.加入配置文件
spring:
  rabbitmq:
    host: 192.168.190.129 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
3.开发代码

Basic Queue 简单队列模型

生产者:

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage2SimpleQueue() {
    String queueName = "simple.queue";
    String message = "hello, spring amqp!";
    rabbitTemplate.convertAndSend(queueName, message);
}
消费者:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
}

Work Queue 工作队列模型

生产者:

@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "hello, message__";
    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}
消费者
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "hello, message__";
    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

@Test
public void testSendFanoutExchange() {
    // 交换机名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, every one!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}
注意:此时要在yml文件中增加配置
spring:
  rabbitmq:
    host: 192.168.190.129 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1
如果不加listener属性,多个消费者绑定同一个队列则会类似轮询机制,浪费服务器资源,加上后,处理完成才能获取下一个消息(类似能者多劳);

 

发布、订阅模型-Fanout

@Test
public void testSendFanoutExchange() {
    // 交换机名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, every one!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
     System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
     System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
}
当前模式为广播模式,只要是绑定当前队列都会收到消息。

 

发布、订阅模型-Direct

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "hello, red!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
只有和RoutingKey相同的才能接受到消息

 

发布、订阅模型-Topic

@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "今天天气不错,我的心情好极了!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
主题模式,支持表达式。

#:代指0个或多个单词 *:代指一个单词

消息转换器
默认spring使用jkd的消息转换器,传递复杂类型时会被序列化

如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
同时在代码中声明
@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}
spring在启动过程中自动会加载当前bean,替换默认的。

部署mq时,可能出现的问题

docker启动rabbitmq报错
WARNING: IPv4 forwarding is disabled. Networking will not work.
解决方法:

docker stop 镜像id

sudo docker rm 镜像id
systemctl restart network && systemctl restart docker 

执行完上面的命令后,在重新启动



这篇关于rabbitmq学习记录的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程