rocketmq源码解析(rocketmq源码部署)

本文主要分析RocketMQ中如何保证消息有序的。

RocketMQ的版本为:4.2.0 release。

一.时序图

还是老规矩,先把分析过程的时序图摆出来:

1.Producer发送顺序消息

rocketmq源码解析(rocketmq源码部署)

2.Consumer接收顺序消息(一)

rocketmq源码解析(rocketmq源码部署)

3.Consumer接收顺序消息(二)

rocketmq源码解析(rocketmq源码部署)

二.源码分析 – Producer发送顺序消息

1 DefaultMQProducer#send:发送消息,入参中有自定义的消息队列选择器。

 // DefaultMQProducer#send
 public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
 throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 return this.defaultMQProducerImpl.send(msg, selector, arg);
 }

1.1 DefaultMQProducerImpl#makeSureStateOK:确保Producer的状态是运行状态-ServiceState.RUNNING。

 // DefaultMQProducerImpl#makeSureStateOK
 private void makeSureStateOK() throws MQClientException {
 if (this.serviceState != ServiceState.RUNNING) {
 throw new MQClientException("The producer service state not OK, "+ this.serviceState
 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
 null);
 }
 }

1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:根据Topic获取发布Topic用到的路由信息。

 // DefaultMQProducerImpl#tryToFindTopicPublishInfo
 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
 if (null == topicPublishInfo || !topicPublishInfo.ok()) {
 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 为空则从 NameServer更新获取,false,不传入 defaultMQProducer
 topicPublishInfo = this.topicPublishInfoTable.get(topic);
 }
 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 有了路由信息而且状态OK,则返回
 return topicPublishInfo;
 } else {
 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
 topicPublishInfo = this.topicPublishInfoTable.get(topic);
 return topicPublishInfo;
 }
 }

1.3 调用自定义消息队列选择器的select方法。

 // DefaultMQProducerImpl#sendSelectImpl
 MessageQueue mq = null;
 try {
 mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
 } catch (Throwable e) {
 throw new MQClientException("select message queue throwed exception.", e);
 }
 // Producer#main
 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
 @Override
 public MessageQueue select(List mqs, Message msg, Object arg) {
 Integer id = (Integer) arg;
 int index = id % mqs.size();
 return mqs.get(index);
 }
 }, orderId);

1.4 DefaultMQProducerImpl#sendKernelImpl:发送消息的核心实现方法。

 // DefaultMQProducerImpl#sendKernelImpl
 ......
 switch (communicationMode) {
 case SYNC:
 long costTimeSync = System.currentTimeMillis() - beginStartTime;
 if (timeout 

1.4.1 MQClientAPIImpl#sendMessage:发送消息。

 // MQClientAPIImpl#sendMessage
 ......
 switch (communicationMode) {// 根据发送消息的模式(同步/异步)选择不同的方式,默认是同步
 case SYNC:
 long costTimeSync = System.currentTimeMillis() - beginStartTime;
 if (timeoutMillis 

1.4.1.1 MQClientAPIImpl#sendMessageSync:发送同步消息。

 // MQClientAPIImpl#sendMessageSync
 private SendResult sendMessageSync(
 final String addr,
 final String brokerName,
 final Message msg,
 final long timeoutMillis,
 final RemotingCommand request
 ) throws RemotingException, MQBrokerException, InterruptedException {
 RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
 assert response != null;
 return this.processSendResponse(brokerName, msg, response);
 }

1.4.1.1.1 NettyRemotingClient#invokeSync:构造RemotingCommand,调用的方式是同步。

 // NettyRemotingClient#invokeSync 
 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
 if (this.rpcHook != null) {
 this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
 }
 return response;

三.源码分析 – Consumer接收顺序消息(一)

1 DefaultMQPushConsumer#registerMessageListener:把Consumer传入的消息监听器加入到messageListener中。

 // DefaultMQPushConsumer#registerMessageListener
 public void registerMessageListener(MessageListenerOrderly messageListener) {
 this.messageListener = messageListener;
 this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
 }

1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传入的消息监听器加入到messageListenerInner中。

 // DefaultMQPushConsumerImpl#registerMessageListener
 public void registerMessageListener(MessageListener messageListener) {
 this.messageListenerInner = messageListener;
 }

2 DefaultMQPushConsumer#start:启动Consumer。

 // DefaultMQPushConsumer#start
 public void start() throws MQClientException {
 this.defaultMQPushConsumerImpl.start();
 }

2.1 DefaultMQPushConsumerImpl#start:启动ConsumerImpl。

 // DefaultMQPushConsumerImpl#start
 switch (this.serviceState) {
 case CREATE_JUST:// 刚刚创建
 ......
 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 有序消息服务
 this.consumeOrderly = true;
 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 并发无序消息服务
 this.consumeOrderly = false;
 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
 }
 ......
 this.consumeMessageService.start();// 启动消息服务
 ......
 mQClientFactory.start();// 启动MQClientInstance
 ......

2.1.1 new
ConsumeMessageOrderlyService():构造顺序消息服务。

 // ConsumeMessageOrderlyService#ConsumeMessageOrderlyService
 public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
 MessageListenerOrderly messageListener) {
 this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
 this.messageListener = messageListener;
 this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
 this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
 this.consumeRequestQueue = new LinkedBlockingQueue();
 this.consumeExecutor = new ThreadPoolExecutor(// 主消息消费线程池,正常执行收到的ConsumeRequest。多线程
 this.defaultMQPushConsumer.getConsumeThreadMin(),
 this.defaultMQPushConsumer.getConsumeThreadMax(),
 1000 * 60,
 TimeUnit.MILLISECONDS,
 this.consumeRequestQueue,
 new ThreadFactoryImpl("ConsumeMessageThread_"));
 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
 }

2.1.2
ConsumeMessageOrderlyService#start:启动消息队列客户端实例。

 // DefaultMQPushConsumerImpl#start
 this.consumeMessageService.start();
 // ConsumeMessageOrderlyService#start
 public void start() {
 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 @Override
 public void run() {
 ConsumeMessageOrderlyService.this.lockMQPeriodically();// 定时向broker发送批量锁住当前正在消费的队列集合的消息
 }
 }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
 }
 }

2.1.2.1
ConsumeMessageOrderlyService#lockMQPeriodically:定时向broker发送批量锁住当前正在消费的队列集合的消息。

2.1.2.1.1 RebalanceImpl#lockAll:锁住所有正在消息的队列。

 // ConsumeMessageOrderlyService#lockMQPeriodically
 if (!this.stopped) {
 this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
 }
 // RebalanceImpl#lockAll
 HashMap> brokerMqs = this.buildProcessQueueTableByBrokerName();// 根据brokerName从processQueueTable获取正在消费的队列集合
 ......
 Set lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker发送锁住消息队列的指令
 for (MessageQueue mq : lockOKMQSet) {
 ProcessQueue processQueue = this.processQueueTable.get(mq);
 if (processQueue != null) {
 if (!processQueue.isLocked()) {
 log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
 }
 processQueue.setLocked(true);
 processQueue.setLastLockTimestamp(System.currentTimeMillis());
 }
 }
 ......

2.1.3 MQClientInstance#start:启动MQClientInstance。过程较复杂,放到大标题四中分析。

 // DefaultMQPushConsumerImpl#start
 mQClientFactory.start();

四.源码分析 – Consumer接收顺序消息(二)

1 MQClientInstance#start:启动客户端实例MQClientInstance。

 // MQClientInstance#start
 synchronized (this) {
 switch (this.serviceState) {
 case CREATE_JUST:
 ......
 // Start pull service 启动拉取消息服务
 this.pullMessageService.start();
 // Start rebalance service 启动消费端负载均衡服务
 this.rebalanceService.start();
 ......

1.1 PullMessageService#run:启动拉取消息服务。实际调用的是DefaultMQPushConsumerImpl的pullMessage方法。

 // PullMessageService#run
 public void run() {
 log.info(this.getServiceName() + " service started");
 while (!this.isStopped()) {
 try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
 } catch (InterruptedException ignored) {
 } catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
 }
 }
 log.info(this.getServiceName() + " service end");
 }
 // PullMessageService#pullMessage
 private void pullMessage(final PullRequest pullRequest) {
 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
 if (consumer != null) {
 DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
 impl.pullMessage(pullRequest);// 调用DefaultMQPushConsumerImpl的pullMessage
 } else {
 log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
 }
 }

1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息。提交到
ConsumeMessageOrderlyService的线程池consumeExecutor中执行。

 // DefaultMQPushConsumerImpl#pullMessage
 ......
 PullCallback pullCallback = new PullCallback() {
 @Override
 public void onSuccess(PullResult pullResult) {
 switch (pullResult.getPullStatus()) {
 case FOUND:
 long prevRequestOffset = pullRequest.getNextOffset();
 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 long pullRT = System.currentTimeMillis() - beginTimestamp;
 ......
 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
 pullResult.getMsgFoundList(),
 processQueue,
 pullRequest.getMessageQueue(),
 dispatchToConsume);
 ......

1.1.1.1.1.1.1 ConsumeRequest#run:处理消息消费的线程。

 // ConsumeMessageOrderlyService.ConsumeRequest#run
 List msgs = this.processQueue.takeMessags(consumeBatchSize);
 ......
 long beginTimestamp = System.currentTimeMillis();
 ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
 boolean hasException = false;
 try {
 this.processQueue.getLockConsume().lock();
 if (this.processQueue.isDropped()) {
 log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);
 break;
 }
 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 实际消费消息的地方,回调消息监听器的consumeMessage方法
 } catch (Throwable e) {
 log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
 RemotingHelper.exceptionSimpleDesc(e),
 ConsumeMessageOrderlyService.this.consumerGroup,
 msgs,messageQueue);
 hasException = true;
 } finally {
 this.processQueue.getLockConsume().unlock();
 }
 ......

1.2 RebalanceService#run:启动消息端负载均衡服务。

 // RebalanceService#run
 public void run() {
 log.info(this.getServiceName() + " service started");
 while (!this.isStopped()) {
 this.waitForRunning(waitInterval);
 this.mqClientFactory.doRebalance();
 }
 log.info(this.getServiceName() + " service end");
 }
 // MQClientInstance#doRebalance
 public void doRebalance() {
 for (Map.Entry entry : this.consumerTable.entrySet()) {
 MQConsumerInner impl = entry.getValue();
 if (impl != null) {
 try {
 impl.doRebalance();
 } catch (Throwable e) {
 log.error("doRebalance exception", e);
 }
 }
 }
 }
 // DefaultMQPushConsumerImpl#doRebalance
 public void doRebalance() {
 if (!this.pause) {
 this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
 }
 }

1.2.1.1.1 RebalanceImpl#doRebalance:负载均衡服务类处理。

 // RebalanceImpl#doRebalance
 public void doRebalance(final boolean isOrder) {
 Map subTable = this.getSubscriptionInner();
 if (subTable != null) {
 for (final Map.Entry entry : subTable.entrySet()) {
 final String topic = entry.getKey();
 try {
 this.rebalanceByTopic(topic, isOrder);
 } catch (Throwable e) {
 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 log.warn("rebalanceByTopic Exception", e);
 }
 }
 }
 }
 this.truncateMessageQueueNotMyTopic();
 }
 // RebalanceImpl#rebalanceByTopic
 switch (messageModel) {
 case BROADCASTING: {
 Set mqSet = this.topicSubscribeInfoTable.get(topic);
 if (mqSet != null) {
 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 根据Toipc去除queue
 if (changed) {
 this.messageQueueChanged(topic, mqSet, mqSet);
 log.info("messageQueueChanged {} {} {} {}",
 consumerGroup,
 topic,
 mqSet,
 mqSet);
 }
 } else {
 ......
 // RebalanceImpl#updateProcessQueueTableInRebalance
 this.dispatchPullRequest(pullRequestList);// RebalancePushImpl分发消息

1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分发。

 // RebalancePushImpl#dispatchPullRequest
 public void dispatchPullRequest(List pullRequestList) {
 for (PullRequest pullRequest : pullRequestList) {
 this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
 log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
 }
 }

五.总结

相比Producer的发送流程,Consumer的接收流程稍微复杂一点。通过上面的源码分析,可以知道RocketMQ是怎样保证消息的有序的:

1.通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列。设置本地队列ProcessQueue的locked属性为true。保证broker中的每个消息队列只对应一个消费端;

2.另外,消费端也是通过锁,保证每个ProcessQueue只有一个线程消费。

秒鲨号所有文章资讯、展示的图片素材等内容均为注册用户上传(部分报媒/平媒内容转载自网络合作媒体),仅供学习参考。用户通过本站上传、发布的任何内容的知识产权归属用户或原始著作权人所有。如有侵犯您的版权,请联系我们反馈!本站将在三个工作日内改正。
(0)

大家都在看

  • 五岳寻仙不辞远下一句("五岳寻仙不辞远)

    曾经还是高中学子时,每逢写“最爱”诗词,我总会写贾岛的《寻隐者不遇》: 松下问童子,言师采药去。 只在此山中,云深不知处。 那时学业繁忙,一心便只向往隐士,觉得竹杖芒鞋逍遥山水间,…

    2022年8月19日 投稿
  • 网上购物的步骤(网上购物的步骤中客户向认证机构)

    双十一倒计时中…… 铺天盖地的购物宣传让你心动了吗? 如何放心买买买? 废话不多说,直接上教程! 宠粉案例“炸”一波 南京铁路运输检察院办理网络销售假冒商品的案件量逐年增加,这些案…

    2022年6月10日
  • 盘点顶级LV精仿包包哪家强?推荐这五个渠道

    顶级精仿LV包包哪里买?淘宝网、拼多多、阿里巴巴,搜索微商或网上的广告都以找到并购买。在哪里入手其实不重要,因为包包货源都是差不多的大同小异,关键还得看人,诚信的商家才是根本。选好…

    2022年5月30日 投稿
  • 多肉植物怎么养(怎么养多肉植物)

    多肉不好养,买回家就养死了,要么就徒长,教你正确养护技巧 秋天是一个养多肉的旺季,由于季节的影响,气温降低了,植物开始进入生长阶段,多肉也不例外,我家里的生石花,最近这几天全部都开…

    2022年6月22日 投稿
  • 人际交往的最高境界是什么?

    人际交往的最高境界是什么? 初次的会面如果让对方回味无穷,自然就盼望有第二次的见面,这就是人际交往的最高境界。然而怎样才能做到这一点呢?最重要的就是善于制造余韵无穷的谈话,让对方在…

    2022年3月15日
  • 名牌双肩电脑包推荐(“名牌双肩电脑包”)

    本内容来源于@什么值得买APP,观点仅代表作者本人 |作者:大鹏鹏 前言 熟悉我的朋友,肯定知道我是一个特别喜欢买包的人,不管是单肩包还是双肩包,只要价格合适,然后款式以及材质到位…

    2022年8月9日 投稿
  • 社保卡和医保卡(医保卡社保卡合一了吗)

    社会保险和医疗保险作为最基础的社会保障,二者之间有什么区别联系呢?社保卡就是医保卡吗?领了社保卡还需要申领医保卡吗?跟着小诺一起来了解! 一、社保就是医保吗 社保是社会保险的全称,…

    2022年6月24日
  • 第一次提出毛东思想这一概念的人是谁(这位功臣你认识吗)

    1945年4月,我党在延安召开了第七次全国代表大会,这次的会议是空前统一,胜利的一次大会,在这次会议中,有一个极大的历史贡献,那就是将“毛泽东思想”正式的写入了党章。 “毛泽东思想…

    2022年5月27日
  • cpi指数是什么意思(cpi上涨好还是下降好)

    最近国家统计局公布了2021年上半年的CPI,同比上涨0.5%,一季度同比持平,其中6月份全国居民消费价格同比上涨1.1%,涨幅比5月份回落0.2个百分点,环比下降0.4%,上半年…

    2022年3月16日
  • 苹果手机抢红包软件哪个最好用(最快0秒自动抢红包神器)

    今年你抢到了多少红包? 2022年春节即将到来,相信不少人都已经摩拳擦掌,做好了抢红包的准备。在前几年微信、QQ、支付宝等平台推出红包功能后,抢红包就成了我们春节里不可或缺的一项活…

    2022年2月22日 投稿
品牌推广 在线咨询
返回顶部