RabbitMQ--Work Queues

2022/1/24 6:04:53

本文主要是介绍RabbitMQ--Work Queues,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

image-20220124002633222

image-20220124002857138

轮训发送消息

启动两个线程,一个消息发送线程,来看看这两个工作线程是如何工作的。

抽取工具类

package com.uin;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: 工具类
 * @date 2022/1/24/12:29 AM
 */
public class RabbitMQUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        //引入连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

启动两个工作线程

package com.uin.work_queues;


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.uin.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: TODO
 * @date 2022/1/24/12:40 AM
 */
public class Consumer_work01 {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //接受消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接受到的消息:" + new String(message.getBody()));
        };
        //取消消息的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!");
        };
        /**
         * 消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
         * 3.未成功消费的一个回调
         * 4.消费者取消消费的回调
         */
        System.out.println("第一个工作线程!等待接受消息。。。。");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20220124010642181

package com.uin.work_queues;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.uin.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: TODO
 * @date 2022/1/24/1:03 AM
 */
public class Consumer_work02 {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //接受消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接受到的消息:" + new String(message.getBody()));
        };
        //取消消息的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!");
        };
        /**
         * 消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
         * 3.未成功消费的一个回调
         * 4.消费者取消消费的回调
         */
        System.out.println("第二个工作线程!等待接受消息。。。。");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20220124010658569

生产者

package com.uin.work_queues;

import com.rabbitmq.client.Channel;
import com.uin.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: TODO
 * @date 2022/1/24/1:09 AM
 */
public class Producer_task01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 发送一个消息
             *  1.发送到那个交换机
             *  2.路由的 key 是哪个
             *  3.其他的参数信息
             *  4.发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//以二进制传输
            System.out.println("发送消息完成:" + message);
        }
    }
}

image-20220124012930108

image-20220124012952307

image-20220124013011771



这篇关于RabbitMQ--Work Queues的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程