RockerMQ源码分析——Broker消息发送流程
2021/9/4 12:05:44
本文主要是介绍RockerMQ源码分析——Broker消息发送流程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
org.apache.rocketmq.example.quickstart.Producer
- 创建一个消息的生产者,且指定一个组
- 设置namesrv地址,可以从此地址获取topic的队列信息
- 启动生产者实例
- 循环中创建消息对象,并指定topic、tag和消息体
- 在循环中发送消息,采用默认的负载策略,
- 调用org.apache.rocketmq.client.producer.DefaultMQProducer#send
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:
- Message:发送消息
- CommunicationMode:发送方式
- SendCallback:异步消息发送回调函数
- timeout:消息发送超时时间
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:获取topic的路由信息( Broker负载消息存储,一个topic可以利用负载均衡分布在多台broker上,每个broker包含多个Queue:每个QueueData包含BrokerName,读队列和写队列个数,权限?、同步或异步)
- 先从本地缓存中 ConcurrentMap<String/* topic */, TopicPublishInfo>中尝试获取
- ->org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:尝试从Nameserver中获取Topic路由信息,并更新本地缓存
- 为了避免重复从 NameServer 获取配置信息,添加了锁
- 从默认的Topic或者指定的Topic中获取配置信息(从Nameserver获取)
- 获取到最新的Topic信息后,与本地缓存进行对比,有变化的话,需要同步更新消费者、生产者关于该Topic的缓存,更新前是先复制一份信息
- ->org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:尝试从Nameserver中获取Topic路由信息,并更新本地缓存
- 如果未找到路由信息,则从默认的Topic中寻找路由配置
- 先从本地缓存中 ConcurrentMap<String/* topic */, TopicPublishInfo>中尝试获取
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue:根据Topic路由负载算法选择一个消息队列进行消息发送
- ->org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
- 如果开启了消息延时规避
- 首先对Topic所有队列进行验证,因为加入了发送异常延时,确保消息队列(MessageQueue)所在的Broker是正常的
- 关于消息延时机制
- 没有开启的话,就循环向下一个消息队列发送
- 如果开启了消息延时规避
- ->org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl:向MessageQueue消息发送
- 通过Product与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem如果失败就更新下容错策略,主要用来规避发生故障的broker
- 如果是同步调用方式(SYNC),则执行失败重试策略,默认重试两次
主要分析的是RocketMQ 以同步方式发送消息的过程,异步模式与单向模式实现原理基本一样,异步只是增加了发送成功或失败的回掉方法。
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
- 调用org.apache.rocketmq.client.producer.DefaultMQProducer#send
broker消息发送的主要执行流程:
DefaultMQProducerImpl#sendDefaultImpl:producer发送消息
private SendResult sendDefaultImpl( Message msg, // 发送消息 final CommunicationMode communicationMode, // 发送方式 final SendCallback sendCallback, // 异步消息发送回调函数 final long timeout // 消息发送超时时间 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 需要先确保Producer状态正常 this.makeSureStateOK(); // 消息参数校验 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 查询topic路由信息,先尝试从内存中获取,若没有 则从namesrv通过netty远程获取 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; // 最后选择用于发送消息的队列 Exception exception = null; SendResult sendResult = null; // 最后一次发送结果 // 总次数;若是同步模式,则在默认2次的基础上+1,如果是异步和oneway 模式则只有1次,一旦失败就直接返回 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; // 第几次发送 String[] brokersSent = new String[timesTotal]; // 存储每次发送消息时选择的broker名称 for (; times < timesTotal; times++) { // 重试总次数 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 根据Topic路由负载算法选择一个消息队列进行消息发送 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 向MessageQueue 消息发送,消息发送的核心函数 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // 如果失败就更新下容错策略,主要用来规避发生故障的broker this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: // 异步发送 return null; case ONEWAY: // oneway 模式 return null; case SYNC: /** * 状态有4种: * 发送成功、发送成功但刷盘失败、发送成功但同步到slave失败以及发送成功而slave不可用 */ if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 状态不是OK,说明同步发送成功,但存储出现问题 // 是否尝试发送到其他Broker上 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时允许重新发送时,进行重试 continue; } } return sendResult; default: break; } } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环 endTimestamp = System.currentTimeMillis(); // 如果失败就更新下容错策略,主要用来规避发生故障的broker this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环 endTimestamp = System.currentTimeMillis(); // 如果失败就更新下容错策略,主要用来规避发生故障的broker this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环 endTimestamp = System.currentTimeMillis(); // 如果失败就更新下容错策略,主要用来规避发生故障的broker this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { // 如出现以下类型的异常,进行消息发送重试 case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; // 如果有发送结果,进行返回,否则抛出异常 default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); // 如果失败就更新下容错策略,主要用来规避发生故障的broker this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } // 返回发送结果 if (sendResult != null) { return sendResult; } // 根据不同情况,抛出不同异常 String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } // 检查能否找到NameSrv validateNameServerSetting(); // 找不到消息路由的异常 throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
- 先获取topic路由信息,详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo
- 计算允许发送直到成功的最大次数,并进行循环。同步默认配置(2+1)次,异步和oneway只有1次
- 根据topic路由负载算法选择一个消息队列用于发送消息,详细解析见:MQFaultStrategy#selectOneMessageQueue
- 调用DefaultMQProducerImpl#sendKernelImpl方法,producer将消息通过和Broker之间建立的长连接发送给Broker,Broker存储接收到的消息,并返回给producer发送结果的状态,这是发送方发送消息的核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl
- 更新Broker可用信息,主要是再次选择用于发送消息的消息队列时,会参考broker发送消息的延迟,详细解析见:MQFaultStrategy
DefaultMQProducerImpl#tryToFindTopicPublishInfo 查询路由信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 先从内存中获取可用的topic路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { // 当内存没有可用的路由信息,尝试从Nameserver中获取Topic路由信息,并更新本地缓存 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } // 如果找到可用的路由信息 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { /** * 更新Topic路由信息 */ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
- 先从缓存中获取可用的路由信息,topicPublishInfoTable是个ConcurrentMap变量,保存了topic和消息队列的映射关系
- 从namesrv中获取topic路由信息
- 如果从缓存和Namesrv中都没有找到有用的路由信息,调用MQClientInstance#updateTopicRouteInfoFromNameServer创建topic路由信息
MQFaultStrategy#selectOneMessageQueue
/** * 选择一个消息队列发送消息 * * @param tpInfo * @param lastBrokerName * @return */ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { // SendWhichQueue是个本地线程变量 ThreadLocal,保存上一次发送的消息队列下标 int index = tpInfo.getSendWhichQueue().getAndIncrement(); /** * 对Topic所有队列进行验证,因为加入了发送异常延时,确保消息队列(MessageQueue)所在的Broker是正常的 */ for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); /** * 判断当前的消息队列是否可用 * 一旦一个 MessageQueue 符合条件,即刻返回,但该 Topic 所在的所有Broker全部标记不可用时, * 进入到下一步逻辑处理 */ if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } /** * 根据 Broker 的 startTimestart 进行一个排序,值越小,排前面,然后再选择一个, * 返回(此时不能保证一定可用,会抛出异常,如果消息发送方式是同步调用,则有重试机制)。 */ final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
DefaultMQProducerImpl#sendKernelImpl
/** * 通过Producer与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者 * producer 发送消息的核心函数 */ private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); /** * 获取Broker地址 */ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { /** * 查询Topic路由信息 * 先从内存中获取 */ tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { // 设置消息标记类型为TRANSACTION_PREPARED_TYPE;表示消息为预提交 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } ...... } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
MQFaultStrategy
这篇关于RockerMQ源码分析——Broker消息发送流程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-10Rakuten 乐天积分系统从 Cassandra 到 TiDB 的选型与实战
- 2025-01-09CMS内容管理系统是什么?如何选择适合你的平台?
- 2025-01-08CCPM如何缩短项目周期并降低风险?
- 2025-01-08Omnivore 替代品 Readeck 安装与使用教程
- 2025-01-07Cursor 收费太贵?3分钟教你接入超低价 DeepSeek-V3,代码质量逼近 Claude 3.5
- 2025-01-06PingCAP 连续两年入选 Gartner 云数据库管理系统魔力象限“荣誉提及”
- 2025-01-05Easysearch 可搜索快照功能,看这篇就够了
- 2025-01-04BOT+EPC模式在基础设施项目中的应用与优势
- 2025-01-03用LangChain构建会检索和搜索的智能聊天机器人指南
- 2025-01-03图像文字理解,OCR、大模型还是多模态模型?PalliGema2在QLoRA技术上的微调与应用