springboot连接rabbit

2021/5/6 10:29:46

本文主要是介绍springboot连接rabbit,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

rabbitMQ连接springboot

(1)父工程引入相关的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>
    <modules>
        <module>product</module>
        <module>consumer</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zmj</groupId>
    <artifactId>springboot-rabbit-parent</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbit-parent</name>
    <description>springboot整合rabbitMQ</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

(2)相应的生产者和消费的配置

server:
  port: 8888
#rabbit的配置
spring:
  rabbitmq:
    host: 192.168.81.166

(3)生产者

package com.aaa.qy129.controller;

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
public class HelloController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("hello")
    public String hello(){ //业务层
        System.out.println("下单成功");
        //String exchange, String routingKey, Object message
        Map<String,Object> map=new HashMap<>();
        map.put("productId",1);
        map.put("num",10);
        map.put("price",12);
        rabbitTemplate.convertAndSend("exchange","", JSON.toJSONString(map)); //序列化过程
        return "下单成功";
    }
}

消费者监听消息

package com.aaa.qy129.listener;

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class MyRabbitListener {

    //队列中存在消息则立即回调该方法
    @RabbitListener(queues = {"queue_fanout"})
    public void listener(String msg){
        Map map = JSON.parseObject(msg, Map.class);
        System.out.println(map);
    }

   
}

消息可靠性投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

(1)确认模式

必须开启确认模式

spring:
  rabbitmq:
    host: 192.168.31.166
    #开启rabbitMQ的生产方确认模式
    publisher-confirm-type: correlated

设置RabbitTemplate的确认回调函数

@Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testConfirm(){

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if(b==false){//消息没有到达交换机  根据业务需求。
                    System.out.println("继续发现消息");
                    //取消订单
                }
            }
        });
        rabbitTemplate.convertAndSend("exchange","","hello confirm");
    }

(2)退回模式

(1)开启回退机制

server:
  port: 8003

spring:
  rabbitmq:
    host: 192.168.31.166
    #开启rabbitMQ的生产方确认模式
    publisher-confirm-type: correlated
    # 开启发布者退回模式
    publisher-returns: true

(2)设置RabbitTemplate回调的函数

/**
     *  退回模式:
     *     1. 开启退回模式。
     *     2. 设置RabbitTemplate的退回回调函数。
     */
    @Test
    public void testReturn(){
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    //只要交换机到队列失败时才会触发该方法。 可以继续发送也可以取消相应的业务功能。
                    System.out.println("消息从交换机到队列失败"+returnedMessage.getReplyText());
                }
            });
        rabbitTemplate.convertAndSend("exchange_direct","error2","hello confirm2");
    }

Consumer ACK

表示消费端收到消息后的确认方式。

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

(1)消费端配置手动开启确认模式

spring:
  rabbitmq:
    host: 192.168.31.166
    listener:
      simple:
        #表示手动确认
        acknowledge-mode: manual
      # 表示自动确认模式
        # acknowledge-mode: none
 @RabbitListener(queues = "queue_direct01")
    public void listener(Message message, Channel channel) throws Exception{
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        byte[] body = message.getBody();
        String msg=new String(body);

        System.out.println(msg);
        try {
//            int c = 10 / 0;
            System.out.println("处理业务逻辑");
            //消费端手动确认消息
            //long deliveryTag, 表示的标识。
            // boolean multiple:是否允许多确认
            channel.basicAck(deliveryTag,true); //从队列中删除该消息。
        }catch (Exception e){
            //(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。
            channel.basicNack(deliveryTag,true,true);
        }

    }

消费端限流

1. 必须为手动确认模式。
2. 必须配置限流的个数。
spring:
  rabbitmq:
    host: 192.168.31.166
    listener:
      simple:
        #表示手动确认
        acknowledge-mode: manual
      # 表示自动确认模式
        # acknowledge-mode: none
        # 设置每次消费的个数。
        prefetch: 100
package com.aaa.qy129.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyListener {

    @RabbitListener(queues = "queue_direct01")
    public void listener(Message message, Channel channel) throws Exception{
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        byte[] body = message.getBody();
        String msg=new String(body);

        System.out.println(msg);

        try {
//            int c = 10 / 0;
            //System.out.println("处理业务逻辑");
            //消费端手动确认消息
            //long deliveryTag, 表示的标识。
            // boolean multiple:是否允许多确认
            channel.basicAck(deliveryTag,true); //从队列中删除该消息。
        }catch (Exception e){
            //(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。
            channel.basicNack(deliveryTag,true,true);
        }

    }
}

TTL

1.设置队列过期;
2.设置消息的过期;该消息必须在队列的头部时才会被移除。
 //为队列设置过期时间  相当于该队列里面的消息都由过期时间
    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend("exchange","","hello xiaoxi");
    }

    //设置消息的过期时间 如果由设置了队列的过期时间 也设置了消息的过期时间 谁的过期时间短 以谁为准。
    //该消息必须在头部才能从队列中移除。
    @Test
    public void testSend02(){

        for(int i=0;i<10;i++) {
            if(i==3){
                MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("20000");
                        return message;
                    }
                };
                rabbitTemplate.convertAndSend("exchange", "", "hello xiaoxi"+i, messagePostProcessor);
            }else {
                rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"+i);
            }
        }
    }

8. 通过代码创建队列和交换机以及绑定。

@Configuration
public class RabbitConfig {

    private final String exchange_name="exchange";
    private final String queue_name="queue";
    //创建交换机对象
    @Bean
    public Exchange exchange(){
        Exchange exchange= ExchangeBuilder.fanoutExchange(exchange_name).durable(true).build();
        return exchange;
    }

    //创建队列
    @Bean(value = "queue")
    public Queue queue(){
        Queue queue= QueueBuilder.durable(queue_name).withArgument("x-message-ttl",20000).build();
        return queue;
    }

    //绑定交换机和队列
    @Bean
    public Binding binding(Queue queue,Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
}



这篇关于springboot连接rabbit的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程