RocketMQ消息生产者和消息消费者演示
2021/5/9 18:57:35
本文主要是介绍RocketMQ消息生产者和消息消费者演示,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
小提示:阿里云打开namesrv和broker的端口
创建maven项目演示
0、pom依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>mq3</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.4.4</version> </parent> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> </dependencies> </project>
1、启动namesrv
[root@iZ2ze5v2vdwv6veyksylhxZ /]# cd /usr/local/ [root@iZ2ze5v2vdwv6veyksylhxZ local]# ls aegis bin include libexec nginx rocketmq src apache-maven-3.6.3 curl jdk1.8.0_281 libiconv openssl rocketmq-console-ng-1.0.1.jar tengine apache-tomcat-7.0.61 etc lib man redis_bloom sbin yum-3.2.28 apache-tomcat-7.0.61.tar.gz games lib64 mysql redisbloom.so share yum-3.2.28.tar.gz [root@iZ2ze5v2vdwv6veyksylhxZ local]# cd rocketmq [root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# ls benchmark bin conf lib LICENSE NOTICE README.md [root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# cd bin/ [root@iZ2ze5v2vdwv6veyksylhxZ bin]# ls cachedog.sh mqadmin mqbroker.numanode1 mqshutdown play.sh runserver.sh cleancache.sh mqadmin.cmd mqbroker.numanode2 mqshutdown.cmd README.md setcache.sh cleancache.v1.sh mqbroker mqbroker.numanode3 nohup.out runbroker.cmd startfsrv.sh dledger mqbroker.cmd mqnamesrv os.sh runbroker.sh tools.cmd hs_err_pid20448.log mqbroker.numanode0 mqnamesrv.cmd play.cmd runserver.cmd tools.sh [root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSON
2、启动broker
[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqbroker -n 8.131.84.120:9876 -c ../conf/broker.conf The broker[broker-a, 8.131.84.120:10911] boot success. serializeType=JSON and name server is 8.131.84.120:9876
3、启动控制台
[root@iZ2ze5v2vdwv6veyksylhxZ ~]# cd /usr/local/ [root@iZ2ze5v2vdwv6veyksylhxZ local]# ls aegis bin include libexec nginx rocketmq src apache-maven-3.6.3 curl jdk1.8.0_281 libiconv openssl rocketmq-console-ng-1.0.1.jar tengine apache-tomcat-7.0.61 etc lib man redis_bloom sbin yum-3.2.28 apache-tomcat-7.0.61.tar.gz games lib64 mysql redisbloom.so share yum-3.2.28.tar.gz [root@iZ2ze5v2vdwv6veyksylhxZ local]# java -jar rocketmq-console-ng-1.0.1.jar
4、生产者演示demo
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; /** * 生产者、消费者、namesrv、broker 之间关系 * * 1、namesrv 类似注册中心,生产者、消费者、broker 都会注册到namesrv上 * 2、生产者 发送消息给 namesrv * 3、namesrv 将消息转发给 broker的topic上 * 4、broker的topic 再将消息传给 消费者 */ public class Producer { public static void main(String[] args) throws Exception{ //1、先创建了生产者 DefaultMQProducer producer = new DefaultMQProducer("mygroup"); //2、生产者要主动联系namesrvAddr producer.setNamesrvAddr("8.131.84.120:9876"); //3、连接成功后要启动生产者 producer.start(); //4、创建消息类,包含topic和body Message message = new Message("mytopic","hello world".getBytes()); //5、生产者将消息发送出去 System.out.println(producer.send(message)); //6、关闭生产者 producer.shutdown(); } }
5、控制台查看结果
6、消费者演示demo
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws Exception { //1、创建DefaultMQPushConsumer DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("demo-consumer-group"); //2、设置namesrv地址 mqPushConsumer.setNamesrvAddr("8.131.84.120:9876"); //3、设置subscribe读取主题信息 /** * 生产者类似一个作者,namesrv类似杂志社,消费者必须先订阅某家报社,才可以收到生产者给报社写的文章 * 每个消费者只能订阅一个topic * topic:关注消息的地址 * 过滤器 * :表示不过滤 */ mqPushConsumer.subscribe("mytopic","*"); //4、消费者注册个监听器,这样namesrv里传进来生产者提供的消息后,就可以及时知道了 //MessageListenerConcurrently 是普通消息接收,MessageListenerOrderly 是顺序消息接收 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { try { //获取主题 System.out.println(msg.getTopic()); //获取标签 System.out.println(msg.getTags()); //获取消息 System.out.println(msg.getBody().toString()); } catch (Exception e) { e.printStackTrace(); //重新再消费一次 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } //5、默认情况下,这条消息只会被一个consumer消费到,点对点消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //开启Consumer mqPushConsumer.start(); System.out.println("消费者启动..."); } }
7、控制台查看结果
这篇关于RocketMQ消息生产者和消息消费者演示的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-26MATLAB 中 A(7)=[];什么意思?-icode9专业技术文章分享
- 2024-11-26UniApp 中如何实现使用输入法时保持页面列表不动的效果?-icode9专业技术文章分享
- 2024-11-26在 UniApp 中怎么实现输入法弹出时禁止页面向上滚动?-icode9专业技术文章分享
- 2024-11-26WebSocket是什么,怎么使用?-icode9专业技术文章分享
- 2024-11-26页面有多个ref 要动态传入怎么实现?-icode9专业技术文章分享
- 2024-11-26在 UniApp 中实现一个底部输入框的常见方法有哪些?-icode9专业技术文章分享
- 2024-11-26RocketMQ入门指南:搭建与使用全流程详解
- 2024-11-26RocketMQ入门教程:轻松搭建与使用指南
- 2024-11-26手写RocketMQ:从入门到实践的简单教程
- 2024-11-25【机器学习(二)】分类和回归任务-决策树(Decision Tree,DT)算法-Sentosa_DSML社区版