手写RocketMQ学习:从零开始的RocketMQ入门教程
2024/10/16 4:03:25
本文主要是介绍手写RocketMQ学习:从零开始的RocketMQ入门教程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
手写RocketMQ学习涉及RocketMQ的基本概念、安装配置、环境变量设置、消息的创建与发送、接收与消费,以及集群搭建和监控等详细步骤,帮助读者全面掌握RocketMQ的使用。
RocketMQ是一个分布式消息中间件,由阿里巴巴开源,目前在Apache软件基金会下作为顶级项目孵化。它具有高吞吐量、高可用性、可扩展性强等特性,广泛应用于大规模分布式系统中。RocketMQ的核心组件包括Name Server、Broker和Producer/Consumer。
-
Name Server:Name Server是消息中间件中的服务发现模块,负责维护Broker的路由信息。每个Broker都会向Name Server注册,Name Server则负责接收并维护这些信息,并提供给客户端进行查询。
-
Broker:Broker是消息的存储和转发中心,负责存储消息和转发消息。RocketMQ支持多个Broker集群,每个Broker集群可以分布在网络的不同节点上,实现消息的分布式存储和分发。
-
Producer:Producer是消息的发送者,负责向指定的Topic发送消息。发送消息时,Producer会将消息发送到Broker,同时也可以设置消息的属性,如消息的优先级、延迟级别等。
- Consumer:Consumer是消息的接收者,负责从指定的Topic接收消息并进行处理。RocketMQ支持多种消费模式,如顺序消费、广播消费、集群消费等。
安装RocketMQ环境需要先下载RocketMQ的安装包,可以从其GitHub仓库或者官网下载。下面以Windows环境为例,介绍RocketMQ的安装步骤,其他操作系统类似。
下载RocketMQ
访问RocketMQ的GitHub仓库,下载最新版本的安装包,解压到指定目录。
# 下载最新版本的RocketMQ wget https://github.com/apache/rocketmq/releases/download/v4.9.1/rocketmq-release-4.9.1-bin.tar.gz # 解压安装包 tar -zxvf rocketmq-release-4.9.1-bin.tar.gz cd rocketmq-release-4.9.1
启动NameServer
RocketMQ的NameServer是分布式系统中的服务发现模块,负责维护Broker的路由信息。启动NameServer前,确保已经下载并解压RocketMQ的安装包。
# 启动NameServer nohup sh bin/mqnamesrv &
启动NameServer后,可以在控制台看到启动成功的信息,同时NameServer会将路由信息保存到本地的配置文件中,以备后续使用。
启动Broker
在启动NameServer之后,需要启动Broker以提供消息的存储和转发服务。
# 启动Broker nohup sh bin/mqbroker -n localhost:9876 &
启动Broker时,需要指定NameServer的地址,以便Broker能够向NameServer注册自己。启动成功后,控制台会输出启动成功的信息。
为了方便使用RocketMQ的命令行工具,建议配置环境变量。以下是配置环境变量的步骤:
设置环境变量
在系统的环境变量设置中,添加RocketMQ的安装目录到系统的PATH中,确保可以使用RocketMQ的命令行工具。
export PATH=$PATH:/path/to/rocketmq/bin
验证配置
配置完成后,可以通过命令行工具验证设置是否成功。
# 检查RocketMQ命令行工具是否可用 mqadmin
如果配置正确,将会显示RocketMQ命令行工具的帮助信息。
生产者是消息的发送者,负责将消息发送到指定的Topic。在RocketMQ中,生产者需要通过NameServer获取Broker的路由信息,然后将消息发送到Broker。
创建生产者代码示例
创建一个生产者实例,用于发送消息。
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); producer.shutdown(); } }
在上述代码中,首先创建了一个生产者实例,设置了NameServer的地址,然后启动生产者。接着,创建了一个消息对象,指定了Topic和Tag,然后发送消息。最后,关闭生产者。
同步消息是指生产者发送消息后,等待Broker返回消息是否发送成功的响应。发送同步消息可以确保消息已被成功发送。
发送同步消息代码示例
SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);
上述代码中,producer.send(msg)
方法用于发送消息,该方法会阻塞直到收到Broker的响应。如果消息发送成功,sendResult
对象将包含消息的发送状态和消息ID等信息。
异步消息是指生产者发送消息后,不等待Broker返回响应,而是通过回调函数处理发送结果。这种方式可以提高生产者的发送效率。
发送异步消息代码示例
producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%s%n", sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } });
在上述代码中,producer.send(msg, new SendCallback())
方法用于异步发送消息。当消息发送成功时,会调用onSuccess
方法,当发送失败时,会调用onException
方法。
消费者是消息的接收者,负责从指定的Topic接收消息并进行处理。在RocketMQ中,消费者需要通过NameServer获取Broker的路由信息,然后注册到指定的Topic上。
创建消费者代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("Receive New Message: %s %n", new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
在上述代码中,首先创建了一个消费者实例,设置了NameServer的地址,并注册到指定的Topic上。接着,设置消息模型为集群消费模式,设置从上次消费的位置开始消费。最后,注册消息监听器,用于处理接收到的消息。
消费者接收到消息后,会触发消息监听器中的回调函数,消费者在回调函数中对消息进行处理。
消费者消息处理代码示例
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("Receive New Message: %s %n", new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
在上述代码中,msg.getBody()
用于获取消息的内容。消费者将接收到的消息打印到控制台上。
在实际的应用场景中,消息的处理逻辑可能非常复杂,可能需要进行业务逻辑处理、数据库操作、日志记录等。
消息处理逻辑代码示例
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { String body = new String(msg.getBody()); System.out.println("Consume Message: " + body); // 业务逻辑处理 // processBusinessLogic(body); // 数据库操作 // saveToDatabase(body); // 日志记录 // logMessage(body); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
在上述代码中,可以根据具体业务需求编写消息的处理逻辑。例如,可以将消息内容保存到数据库,或者进行日志记录。
在生产环境中,通常需要配置多个NameServer和Broker,以提高系统的可靠性和可扩展性。
配置NameServer
配置多个NameServer可以使用多个实例,并在配置文件中指定多个NameServer地址。例如:
# 配置文件示例 namesrv.addr=192.168.1.100:9876;192.168.1.101:9876
配置Broker
配置多个Broker可以使用多个实例,并在配置文件中指定多Broker地址和集群名称。例如:
# 配置文件示例 broker.name=BrokerA broker.id=0 broker.clusterName=DefaultCluster broker.addr=192.168.1.102:10911 broker.allowPollIfQueueEmpty=true namesrv.addr=192.168.1.100:9876;192.168.1.101:9876
启动RocketMQ集群
启动RocketMQ集群需要启动多个NameServer和多个Broker实例,并确保所有实例都能正常启动。
启动NameServer
# 启动NameServer实例1 nohup sh bin/mqnamesrv & # 启动NameServer实例2 nohup sh bin/mqnamesrv -n 192.168.1.101:9876 &
启动Broker
# 启动Broker实例1 nohup sh bin/mqbroker -n 192.168.1.100:9876;192.168.1.101:9876 -c brokerA.properties & # 启动Broker实例2 nohup sh bin/mqbroker -n 192.168.1.100:9876;192.168.1.101:9876 -c brokerB.properties &
集群中的消息路由
在集群环境中,RocketMQ通过路由信息来确定消息发送到哪个Broker。每个Broker都会向NameServer注册自己的路由信息,NameServer则维护这些信息,并提供给客户端进行查询。
路由信息示例
# BrokerA的路由信息示例 brokerAddr: 192.168.1.102:10911 brokerName: BrokerA brokerId: 0 clusterName: DefaultCluster
消息路由示例
当生产者发送消息到Topic时,会从NameServer获取Topic对应的路由信息,并根据路由信息将消息发送到指定的Broker。
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);
在上述代码中,生产者会根据NameServer返回的路由信息,将消息发送到指定的Broker。
消息过滤规则用于在消费者端筛选消息,只消费满足特定条件的消息。
消息过滤规则示例
consumer.subscribe("TopicTest", "*");
在上述代码中,使用*
表示订阅所有Tag的消息。如果需要订阅特定Tag的消息,可以指定Tag名称,例如:
consumer.subscribe("TopicTest", "TagA");
消息重试策略用于处理发送失败的消息,确保消息最终能够被成功发送和消费。
消息重试策略示例
producer.setMessageQueueSelector(new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object o) { int index = (int) (o); return mqs.get(index); } });
在上述代码中,MessageQueueSelector
用于选择消息队列,可以实现自定义的消息队列选择逻辑。
消息的幂等性处理用于避免重复消费消息,确保消息只被消费一次。
消息幂等性处理示例
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { String msgId = msg.getMsgId(); if (!context.getMessageIDs().contains(msgId)) { System.out.printf("Receive New Message: %s %n", new String(msg.getBody())); context.getMessageIDs().add(msgId); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
在上述代码中,msg.getMsgId()
用于获取消息的唯一标识,context.getMessageIDs()
用于存储已经消费过的消息ID。通过检查消息ID是否已经消费过,可以避免重复消费消息。
RocketMQ的日志配置用于记录系统运行状态和错误信息,帮助排查问题。
日志配置文件示例
logFile=logs/rocketmqlogs logLevel=WARN fileAppend=true consoleLog=false
在上述配置文件中,logFile
用于指定日志文件的路径,logLevel
用于指定日志级别,fileAppend
用于指定是否追加日志到文件,consoleLog
用于指定是否输出日志到控制台。
日志配置代码示例
import org.apache.rocketmq.remoting.common.RemotingContext; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyRemotingServerFactory; public class LogConfigurationExample { public static void main(String[] args) { // 获取日志配置 Properties properties = RemotingContext.getContext().getProperties(); String logFile = properties.getProperty("rocketmq.client.logFile"); String logLevel = properties.getProperty("rocketmq.client.logLevel"); System.out.println("Log File: " + logFile); System.out.println("Log Level: " + logLevel); } }
RocketMQ提供了多种监控工具,帮助监控系统运行状态和性能指标。
监控工具示例
RocketMQ自带的监控工具包括Dashboard和JMX。
Dashboard监控工具
Dashboard是一个Web界面,提供RocketMQ的监控视图和操作工具。
# 启动Dashboard nohup sh bin/mqdashboard &
启动Dashboard后,可以在浏览器中访问http://localhost:8080
来查看RocketMQ的监控视图。
JMX监控工具
JMX用于监控Java应用程序的运行状态和性能指标。
import javax.management.MBeanServer; import javax.management.ObjectName; import java.lang.management.ManagementFactory; public class JMXExample { public static void main(String[] args) throws Exception { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName name = new ObjectName("com.taobao.rocketmq:type=BrokerStats"); System.out.println(mbs.getAttribute(name, "MessagePutTotal")); } }
在上述代码中,通过JMX获取Broker的统计信息。
检查RocketMQ运行状态可以帮助确保系统正常运行。
检查RocketMQ运行状态示例
# 检查NameServer状态 mqadmin clusterList # 检查Broker状态 mqadmin brokerList # 检查Topic状态 mqadmin topicList
在上述命令中,mqadmin clusterList
用于检查NameServer的状态,mqadmin brokerList
用于检查Broker的状态,mqadmin topicList
用于检查Topic的状态。
通过上述命令,可以获取系统的运行状态信息,帮助排查问题。
这篇关于手写RocketMQ学习:从零开始的RocketMQ入门教程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-15在使用平台私钥进行解密时提示 "私钥解密失败" 错误信息是什么原因?-icode9专业技术文章分享
- 2024-11-15Layui框架有哪些方式引入?-icode9专业技术文章分享
- 2024-11-15Layui框架中有哪些减少对全局环境的污染方法?-icode9专业技术文章分享
- 2024-11-15laydate怎么关闭自动的日期格式校验功能?-icode9专业技术文章分享
- 2024-11-15laydate怎么取消初始日期校验?-icode9专业技术文章分享
- 2024-11-15SendGrid 的邮件发送时,怎么设置回复邮箱?-icode9专业技术文章分享
- 2024-11-15使用 SendGrid API 发送邮件后获取到唯一的请求 ID?-icode9专业技术文章分享
- 2024-11-15mailgun 发送邮件 tags标签最多有多少个?-icode9专业技术文章分享
- 2024-11-15mailgun 发送邮件 怎么批量发送给多个人?-icode9专业技术文章分享
- 2024-11-15如何搭建web开发环境并实现 web项目在浏览器中访问?-icode9专业技术文章分享