手写RocketMQ学习:从零开始的RocketMQ入门教程

2024/10/16 4:03:25

本文主要是介绍手写RocketMQ学习:从零开始的RocketMQ入门教程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

概述

手写RocketMQ学习涉及RocketMQ的基本概念、安装配置、环境变量设置、消息的创建与发送、接收与消费,以及集群搭建和监控等详细步骤,帮助读者全面掌握RocketMQ的使用。

RocketMQ简介与安装
RocketMQ基本概念

RocketMQ是一个分布式消息中间件,由阿里巴巴开源,目前在Apache软件基金会下作为顶级项目孵化。它具有高吞吐量、高可用性、可扩展性强等特性,广泛应用于大规模分布式系统中。RocketMQ的核心组件包括Name Server、Broker和Producer/Consumer。

  • Name Server:Name Server是消息中间件中的服务发现模块,负责维护Broker的路由信息。每个Broker都会向Name Server注册,Name Server则负责接收并维护这些信息,并提供给客户端进行查询。

  • Broker:Broker是消息的存储和转发中心,负责存储消息和转发消息。RocketMQ支持多个Broker集群,每个Broker集群可以分布在网络的不同节点上,实现消息的分布式存储和分发。

  • Producer:Producer是消息的发送者,负责向指定的Topic发送消息。发送消息时,Producer会将消息发送到Broker,同时也可以设置消息的属性,如消息的优先级、延迟级别等。

  • Consumer:Consumer是消息的接收者,负责从指定的Topic接收消息并进行处理。RocketMQ支持多种消费模式,如顺序消费、广播消费、集群消费等。
安装RocketMQ环境

安装RocketMQ环境需要先下载RocketMQ的安装包,可以从其GitHub仓库或者官网下载。下面以Windows环境为例,介绍RocketMQ的安装步骤,其他操作系统类似。

下载RocketMQ

访问RocketMQ的GitHub仓库,下载最新版本的安装包,解压到指定目录。

# 下载最新版本的RocketMQ
wget https://github.com/apache/rocketmq/releases/download/v4.9.1/rocketmq-release-4.9.1-bin.tar.gz
# 解压安装包
tar -zxvf rocketmq-release-4.9.1-bin.tar.gz
cd rocketmq-release-4.9.1

启动NameServer

RocketMQ的NameServer是分布式系统中的服务发现模块,负责维护Broker的路由信息。启动NameServer前,确保已经下载并解压RocketMQ的安装包。

# 启动NameServer
nohup sh bin/mqnamesrv &

启动NameServer后,可以在控制台看到启动成功的信息,同时NameServer会将路由信息保存到本地的配置文件中,以备后续使用。

启动Broker

在启动NameServer之后,需要启动Broker以提供消息的存储和转发服务。

# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &

启动Broker时,需要指定NameServer的地址,以便Broker能够向NameServer注册自己。启动成功后,控制台会输出启动成功的信息。

配置RocketMQ环境变量

为了方便使用RocketMQ的命令行工具,建议配置环境变量。以下是配置环境变量的步骤:

设置环境变量

在系统的环境变量设置中,添加RocketMQ的安装目录到系统的PATH中,确保可以使用RocketMQ的命令行工具。

export PATH=$PATH:/path/to/rocketmq/bin

验证配置

配置完成后,可以通过命令行工具验证设置是否成功。

# 检查RocketMQ命令行工具是否可用
mqadmin

如果配置正确,将会显示RocketMQ命令行工具的帮助信息。

创建与发送消息
创建生产者实例

生产者是消息的发送者,负责将消息发送到指定的Topic。在RocketMQ中,生产者需要通过NameServer获取Broker的路由信息,然后将消息发送到Broker。

创建生产者代码示例

创建一个生产者实例,用于发送消息。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }
}

在上述代码中,首先创建了一个生产者实例,设置了NameServer的地址,然后启动生产者。接着,创建了一个消息对象,指定了Topic和Tag,然后发送消息。最后,关闭生产者。

发送同步消息

同步消息是指生产者发送消息后,等待Broker返回消息是否发送成功的响应。发送同步消息可以确保消息已被成功发送。

发送同步消息代码示例

SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

上述代码中,producer.send(msg)方法用于发送消息,该方法会阻塞直到收到Broker的响应。如果消息发送成功,sendResult对象将包含消息的发送状态和消息ID等信息。

发送异步消息

异步消息是指生产者发送消息后,不等待Broker返回响应,而是通过回调函数处理发送结果。这种方式可以提高生产者的发送效率。

发送异步消息代码示例

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%s%n", sendResult);
    }

    @Override
    public void onException(Throwable e) {
        e.printStackTrace();
    }
});

在上述代码中,producer.send(msg, new SendCallback())方法用于异步发送消息。当消息发送成功时,会调用onSuccess方法,当发送失败时,会调用onException方法。

接收与消费消息
创建消费者实例

消费者是消息的接收者,负责从指定的Topic接收消息并进行处理。在RocketMQ中,消费者需要通过NameServer获取Broker的路由信息,然后注册到指定的Topic上。

创建消费者代码示例

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("Receive New Message: %s %n", new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

在上述代码中,首先创建了一个消费者实例,设置了NameServer的地址,并注册到指定的Topic上。接着,设置消息模型为集群消费模式,设置从上次消费的位置开始消费。最后,注册消息监听器,用于处理接收到的消息。

接收并消费消息

消费者接收到消息后,会触发消息监听器中的回调函数,消费者在回调函数中对消息进行处理。

消费者消息处理代码示例

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt msg : msgs) {
        System.out.printf("Receive New Message: %s %n", new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

在上述代码中,msg.getBody()用于获取消息的内容。消费者将接收到的消息打印到控制台上。

消息处理逻辑编写

在实际的应用场景中,消息的处理逻辑可能非常复杂,可能需要进行业务逻辑处理、数据库操作、日志记录等。

消息处理逻辑代码示例

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt msg : msgs) {
        String body = new String(msg.getBody());
        System.out.println("Consume Message: " + body);
        // 业务逻辑处理
        // processBusinessLogic(body);
        // 数据库操作
        // saveToDatabase(body);
        // 日志记录
        // logMessage(body);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

在上述代码中,可以根据具体业务需求编写消息的处理逻辑。例如,可以将消息内容保存到数据库,或者进行日志记录。

RocketMQ集群搭建
配置Nameserver与Broker

在生产环境中,通常需要配置多个NameServer和Broker,以提高系统的可靠性和可扩展性。

配置NameServer

配置多个NameServer可以使用多个实例,并在配置文件中指定多个NameServer地址。例如:

# 配置文件示例
namesrv.addr=192.168.1.100:9876;192.168.1.101:9876

配置Broker

配置多个Broker可以使用多个实例,并在配置文件中指定多Broker地址和集群名称。例如:

# 配置文件示例
broker.name=BrokerA
broker.id=0
broker.clusterName=DefaultCluster
broker.addr=192.168.1.102:10911
broker.allowPollIfQueueEmpty=true
namesrv.addr=192.168.1.100:9876;192.168.1.101:9876

启动RocketMQ集群

启动RocketMQ集群需要启动多个NameServer和多个Broker实例,并确保所有实例都能正常启动。

启动NameServer

# 启动NameServer实例1
nohup sh bin/mqnamesrv &
# 启动NameServer实例2
nohup sh bin/mqnamesrv -n 192.168.1.101:9876 &

启动Broker

# 启动Broker实例1
nohup sh bin/mqbroker -n 192.168.1.100:9876;192.168.1.101:9876 -c brokerA.properties &
# 启动Broker实例2
nohup sh bin/mqbroker -n 192.168.1.100:9876;192.168.1.101:9876 -c brokerB.properties &

集群中的消息路由

在集群环境中,RocketMQ通过路由信息来确定消息发送到哪个Broker。每个Broker都会向NameServer注册自己的路由信息,NameServer则维护这些信息,并提供给客户端进行查询。

路由信息示例

# BrokerA的路由信息示例
brokerAddr: 192.168.1.102:10911
brokerName: BrokerA
brokerId: 0
clusterName: DefaultCluster

消息路由示例

当生产者发送消息到Topic时,会从NameServer获取Topic对应的路由信息,并根据路由信息将消息发送到指定的Broker。

Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

在上述代码中,生产者会根据NameServer返回的路由信息,将消息发送到指定的Broker。

消息的过滤与重试机制
消息过滤规则

消息过滤规则用于在消费者端筛选消息,只消费满足特定条件的消息。

消息过滤规则示例

consumer.subscribe("TopicTest", "*");

在上述代码中,使用*表示订阅所有Tag的消息。如果需要订阅特定Tag的消息,可以指定Tag名称,例如:

consumer.subscribe("TopicTest", "TagA");
消息重试策略

消息重试策略用于处理发送失败的消息,确保消息最终能够被成功发送和消费。

消息重试策略示例

producer.setMessageQueueSelector(new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object o) {
        int index = (int) (o);
        return mqs.get(index);
    }
});

在上述代码中,MessageQueueSelector用于选择消息队列,可以实现自定义的消息队列选择逻辑。

消息的幂等性处理

消息的幂等性处理用于避免重复消费消息,确保消息只被消费一次。

消息幂等性处理示例

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt msg : msgs) {
        String msgId = msg.getMsgId();
        if (!context.getMessageIDs().contains(msgId)) {
            System.out.printf("Receive New Message: %s %n", new String(msg.getBody()));
            context.getMessageIDs().add(msgId);
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

在上述代码中,msg.getMsgId()用于获取消息的唯一标识,context.getMessageIDs()用于存储已经消费过的消息ID。通过检查消息ID是否已经消费过,可以避免重复消费消息。

日志与监控
RocketMQ的日志配置

RocketMQ的日志配置用于记录系统运行状态和错误信息,帮助排查问题。

日志配置文件示例

logFile=logs/rocketmqlogs
logLevel=WARN
fileAppend=true
consoleLog=false

在上述配置文件中,logFile用于指定日志文件的路径,logLevel用于指定日志级别,fileAppend用于指定是否追加日志到文件,consoleLog用于指定是否输出日志到控制台。

日志配置代码示例

import org.apache.rocketmq.remoting.common.RemotingContext;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServerFactory;

public class LogConfigurationExample {
    public static void main(String[] args) {
        // 获取日志配置
        Properties properties = RemotingContext.getContext().getProperties();
        String logFile = properties.getProperty("rocketmq.client.logFile");
        String logLevel = properties.getProperty("rocketmq.client.logLevel");
        System.out.println("Log File: " + logFile);
        System.out.println("Log Level: " + logLevel);
    }
}
使用工具监控RocketMQ

RocketMQ提供了多种监控工具,帮助监控系统运行状态和性能指标。

监控工具示例

RocketMQ自带的监控工具包括Dashboard和JMX。

Dashboard监控工具

Dashboard是一个Web界面,提供RocketMQ的监控视图和操作工具。

# 启动Dashboard
nohup sh bin/mqdashboard &

启动Dashboard后,可以在浏览器中访问http://localhost:8080来查看RocketMQ的监控视图。

JMX监控工具

JMX用于监控Java应用程序的运行状态和性能指标。

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;

public class JMXExample {
    public static void main(String[] args) throws Exception {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        ObjectName name = new ObjectName("com.taobao.rocketmq:type=BrokerStats");
        System.out.println(mbs.getAttribute(name, "MessagePutTotal"));
    }
}

在上述代码中,通过JMX获取Broker的统计信息。

检查RocketMQ运行状态

检查RocketMQ运行状态可以帮助确保系统正常运行。

检查RocketMQ运行状态示例

# 检查NameServer状态
mqadmin clusterList
# 检查Broker状态
mqadmin brokerList
# 检查Topic状态
mqadmin topicList

在上述命令中,mqadmin clusterList用于检查NameServer的状态,mqadmin brokerList用于检查Broker的状态,mqadmin topicList用于检查Topic的状态。

通过上述命令,可以获取系统的运行状态信息,帮助排查问题。



这篇关于手写RocketMQ学习:从零开始的RocketMQ入门教程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程