PPXu

RocketMQ-Consumer消息消费

2019-06-08

消息被Producer发送到Broker后,消息消费端consumer既可请求broker拉取消息并开展消费处理。消息消费以组(Consumer Group)的模式开展,一个消费组内可以包含多个消费者,每一个Consumer Group可订阅多个Topic,Consumer Group之间有两种消费模式:广播模式(BROADCASTING)、集群模式(CLUSTERING)。

  • 集群模式:主题下的同一条消息只允许被其中一个消费者消费。
  • 广播模式:主题下的同一条消息将被集群内的所有消费者消费一次。

Broker与Consumer之间的消息传送方式也有两种:

  • 拉模式:Consumer主动向Broker请求拉取消息
  • 推模式:Broker将消息推送给Consumer,事实上推模式的实现基于拉模式,在拉模式上包装一层而已,详细实现下文展示。
    先来看一下,Consumer的启动过程。

Consumer启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 1. 检验consumer配置
this.checkConfig();

this.copySubscription();

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 2. 实例化mqClientFactory
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 3. 设置rebalance相关属性
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 4. 设置pullAPIWrapper的消息过滤钩子
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 5. 设置consumer的offsetStore参数
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
// 6. 根据consumer设置的messageListner不同子类实例化不同的consumeMessageService,然后启动该类代表的线程
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

this.consumeMessageService.start();
// 7. 注册当前的consumer
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 8. 启动各种线程任务(这里还启动了netty客户端)
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately(); //9、直接执行reblance逻辑(也就是决定consumer的负载均衡)
}

具体步骤如下:

1.校验consumer的配置

其实就是校验consumer设置的值是否正确,consumer重要参数如下:

  • messageModel:消费消息的模式(广播模式和集群模式)
  • consumeFromWhere:选择起始消费位置的方式
  • allocateMessageQueueStrategy:分配具体messageQuene的策略子类。(负载均衡逻辑实现的关键类)
  • consumeThreadMin:消费消息线程池的最小核心线程数(默认20)
  • consumeThreadMax:最大线程数(默认64)
  • pullInterval:拉取消息的间隔,默认是0
  • consumeMessageBatchMaxSize:每批次消费消息的条数,默认为1
  • pullBatchSize:每批次拉取消息的条数,默认32

2.例化mQClientFactory

我们从实例化mQClientFactory代码可以看出:一个consumer客户端只会对应一个mQClientFactory(因为factoryTable存放的mQClientFactory是以客户端作为key存放的),也就是说一个应用节点只会有一个mQClientFactory实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
//factoryTable存放的就是client的实例,key为clientid。
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
} else {
}
}
return instance;
}

3.设置reblance相关属性

也就是设置该consumer对应的负载均衡策略需要的相关参数,例如messageModel、allocateMessageQueueStrategy、实例化mQClientFactory等。

4.设置pullAPIWrapper的消息过滤钩子

此步作用在于可以由用户自己指定consumer过滤消息的策略,只需要调用consumer的registerFilterMessageHook,将自己实现的过滤消息的FilterMessageHook设置给consumer即可。

5.设置consumer的offsetStore

也就是设置consumer使用哪种处理消息消费位置offset的类。
如果是广播消费模式,则选择LocalFileOffsetStore;
如果是集群消费模式,则选择RemoteBrokerOffsetStore;

6.设置consumer的consumeMessageService

根据consumer设置的MessageListener来决定使用具体ConsumeMessageService。
如果是MessageListenerOrderly,则使用代表顺序消息消费的service:ConsumeMessageOrderlyService;
如果是MessageListenerConcurrently,则使用非顺序消息service:ConsumeMessageConcurrentlyService。

PS:此步还调用了consumeMessageService的start方法,这里只是启动了一个定时线程去做cleanExpireMsg的操作,并没有启动消费消息的线程。

7.注册当前的consumer

这里只是将当前consumer放到了一个缓存map中,key为consumerGroup的名称。

8.mQClientFactory.start

org.apache.rocketmq.client.impl.factory.MQClientInstance.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

this.mQClientAPIImpl.start()启动了netty客户端,用于处理Consumer的网络请求。
this.startScheduledTask()启动了一个线程池来安排执行各种定时任务,包括以下:

  • MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
  • MQClientInstance.this.updateTopicRouteInfoFromNameServer();
  • MQClientInstance.this.cleanOfflineBroker();
  • MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
  • MQClientInstance.this.persistAllConsumerOffset();
  • MQClientInstance.this.adjustThreadPool();
    至于,this.pullMessageService.start() 与 this.rebalanceService.start() 则启动了另一类独立的线程任务,分别是拉取消息及重新负载均衡。

9.触发重新负载均衡

mQClientFactory.rebalanceImmediately(),点进去看,实际上是调了rebalanceService.wakeup(),唤醒第8步起的重新负载均衡线程。this.rebalanceService.start()点进去看,有一个volatile变量stopped控制rebalanceService是否进入doRebalance()操作。

org.apache.rocketmq.common.ServiceThread.start()
1
2
3
4
5
6
7
8
9
10
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
org.apache.rocketmq.client.impl.consumer.RebalanceService.run()
1
2
3
4
5
6
7
8
9
10
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}

log.info(this.getServiceName() + " service end");
}

消息拉取过程

PullMessageService负责从Broker拉取消息,run()的逻辑看起来很简单:一个while循环不停地从阻塞队列中获取pullRequest,然后执行pullMessage(),这里再次出现了volatile布尔变量stopped,这是一种通用的设计技巧,将stopped声明为volatile,每执行一次业务逻辑检查一下其运行状态是否为停止,可以通过其他线程将stopped设置为true从而停止该线程。this.pullRequestQueue是一个存放消息拉取请求的阻塞队列,如果PullRequestQueue为空,则线程将被阻塞,直到队列里有拉取请求可以take出来。

org.apache.rocketmq.client.impl.consumer.PullMessageService.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

那么问题来了,PullRequest是什么时候被放进去PullRequestQueue里边的呢?接下来详细分析过程。

PullRequestQueue

这个pullRequestQueue是PullMessageService的私有属性,它存放的PullRequest又是什么结构呢?

org.apache.rocketmq.client.impl.consumer.PullRequest
1
2
3
4
5
6
7
8
9
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean lockedFirst = false;

...
}

从PullRequest类结构看出,pullRequestQueue存放的pullRequest封装的是每一个消费者群组consumerGroup以及对应的消费队列messageQuene,还有消费队列的快照processQueue。

接着,我们跟踪一下其put方法在哪里调用。可以看到,只在PullMessageService.executePullRequestImmediately(PullRequest)方法里边有直接调用

org.apache.rocketmq.client.impl.consumer.PullMessageService.executePullRequestImmediately(PullRequest)
1
2
3
4
5
6
7
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

那么,我们继续跟踪这个executePullRequestImmediately方法的调用链,就会发现,主要有两类调用入口:Rebalance.run()、DefaultMQPushConsumerImpl.pullMessage的结果回调PullCallback。

1. Rebalance.run()

RebalanceService顾名思义,就是针对consumer端要消费哪些messageQuene来做重新负载均衡的策略。当consumer集群某个节点挂了,则要考虑重新负载均衡rebalance,将messageQuene重新按照存活的consumer节点进行分配。

org.apache.rocketmq.client.impl.consumer.RebalanceImpl.updateProcessQueueTableInRebalance(String, Set, boolean)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
...

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}

this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest); // 先把需要重新分配的pullRequest放进一个List
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

this.dispatchPullRequest(pullRequestList);

return changed;
}
1
2
3
4
5
6
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); // 这里遍历pullRequest,逐个把pullRequest加入到pullRequestQueue
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}

前面consumer.start过程中,RebalanceService线程会随之启动执行,那么可以理解,当consumer一启动,相应的pullRequestQueue就会存放有pullRequest对象了。

2. PullCallback

DefaultMQPushConsumerImpl.pullMessage方法里面定义了拉取结果的回调PullCallback,我们得知在PullCallback的onSuccess和onException中调用了pullRequestQueue的put方法。也就是说,RocketMQ保证了每次拉完消息之后都会调用pullRequestQueue的put逻辑。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
    PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 触发调用pullRequestQueue的put逻辑
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); // 触发调用pullRequestQueue的put逻辑
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 触发调用pullRequestQueue的put逻辑
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 触发调用pullRequestQueue的put逻辑
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 触发调用pullRequestQueue的put逻辑
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
// 触发调用pullRequestQueue的put逻辑
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}

根据以上两个入口我们可以得出结论:
当consumer启动时,RebalanceService使得pullRequestQueue有值,PullMessageService的线程不停地从pullRequestQueue中take messageQuene拉取消息处理,处理完之后继续往pullRequestQueue存放messageQuene,从而使得pullRequestQueue不会因为没有值而阻塞。
换句话说,pullRequestQueue每次take完一次,都会再继续put messageQuene,将下一次要拉取的pullRequest再次放到pullRequestQueue中,而拉取消息实际又是一个while循环不停去拉取,这样就保证了消费消息的及时性,使得每个Consumer节点仅有一个消息拉取线程负责所有消费者的消息拉取的情况下,不会产生性能瓶颈。

OK,到此,总算完成了从阻塞队列pullRequestQueue中拿pullRequest的过程了,接下来,就是根据拿到的pullRequest来进行拉取消息pullMessage了。

this.pullMessage(pullRequest)

点进去一直找到实现类的方法org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest),这个方法篇幅比较长,下面逐一拆解
此处以获取订阅信息作为拉取消息的正式开始点,那么,在此之前,其实还有两类的关于当前处理队列(processQueue)的状态校验及相关操作首先执行:1.校验并保证处理队列当前执行状态正常,2. 流控。

processQueue执行状态校验

关于为什么会有ProcessQueue这个数据结构,个人理解,是因为MessageQueue的消费处理其实是并发进行的,那么我们并不能同步获取消息的处理进度,但是本次拉取消息的推进需要根据上一次消费进度来进行,于是就有了ProcessQueue作为MessageQueue的消费处理进度快照。这个类的结构,主要是一个TreeMap及一个读写锁,TreeMap里以MessageQueue的Offset作为Key,以消息内容的引用为Value(所谓快照),保存所有从MessageQueue获取到,但是还未被处理的消息;读写锁的作用是控制多线程下对TreeMap对象的并发访问
从pullRequest中获取ProcessQueue,如果processQueue当前状态未被丢弃,则更新ProcessQueue的lastPullTimestamp为当前时间戳;如果当前消费者被挂起,则将拉取任务延迟指定时间(PULL_TIME_DELAY_MILLS_WHEN_SUSPEND,1s)后再次放入到PullMessageService的拉取任务队列中,结束本次消息拉取。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}

if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}

流控

RocketMQ的消息拉取过程的流量控制,是让consumer根据自身的消息处理速度调整获取消息的操作速度,采取的流控处理做法是作延迟一段时间(默认50ms)后消费处理,流控主要从3种维度进行:消息消费数量、消息大小、偏移量间隔。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}

pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
  • 消息消费数量:当前消息处理总数如果超出了指定阈值(1000条):cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue(), 将触发流控,放弃本次拉取任务,并且指定该队列的下一次拉取任务时间间隔为PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50ms),每触发1000次流控后输出流控日志。
  • 消息大小:当前消息处理的大小如果超出了指定大小阈值(100MB):cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue(),将触发流控,流控处理与上述的消息消费数量维度的流控处理一致。
  • 偏移量间隔:就是ProcessQueue中最大偏移量与最小偏移量的间距(processQueue.getMaxSpan(),等于maxOffset - minOffset,如下图),当这个间距超过指定阈值(2000):processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),就会触发流控,流控处理与上述两种维度的流控处理一致。这个维度的流控设置目的,是为了避免因为一条消息堵塞导致重复消费(下面会细述),不同的是,这里有个前置条件,就是这里只针对非顺序消费模式进行此维度的流控(!this.consumeOrderly)。

如图,假如3109为本批次消息消费后的最大偏移量(maxOffset),1093为最小偏移量(minOffset),那么maxSpan=maxOffset-minOffset=3109-1093=2016,大于阈值(2000),则会触发流控,但可能出现上述的这种情况,就是在minOffset后面的很大部分消息实际上已被消费成功,因为下一次的消费偏移量(nextOffset)即为本次minOffset,所以,如果多次消费都是被同一minOffset位置的这个消息阻塞,那么就会引发大量消息重复被消费。而consumeConcurrentlyMaxSpan在这里只是为达到流控目的而把这个间距值限定在一个合适范围而已,但对于解决重复消费的问题,这个作用其实很有限。

考虑到要是碰上这种极端的情况,一批消息超2K条,实际绝大部分消息都被消费成功,而堵塞前进消费的minOffset那条消息,假设它的消费本身是存在问题的,可能是死循环之类的,那么一直没法成功消费,这就会导致进度一直卡在这条消息这里。其实RocketMQ有现成提供这类问题的解决方案,就是把因为消费超时卡住批量消费进度的消息定义为ExpireMsg,起一个计划线程池,定时执行清掉这些ExpireMsg,清除的具体操作就是把消息重发回去broker,作为延时消息(delayLevel=3,10s)再次存储并后续下发consumer消费。(下文会提到延时消息)

1.获取订阅信息

拉取该主题的订阅信息,如果为空,则结束本次消息拉取,同时设置下一次拉取任务的延时为PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3s)。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest)
1
2
3
4
5
6
7
8
...
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
...

2.构建PullCallback

构建PullCallback是为了后面请求拉取消息的结果响应处理,其中上一步获取到的订阅信息就是在成功拉取消息后用于处理拉取请求。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
...

3.取得要从哪台broker拉取消息的broker地址

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
	...
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
...
```

``` JAVA org.apache.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(MessageQueue, String, String, long, long, int, int, long, long, long, CommunicationMode, PullCallback)
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}

4.构建要拉取消息的网络请求头

PullAPIWrapper.pullKernelImpl(…)里边调用MQClientAPIImpl.pullMessage(…),为拉取消息的实际网络请求作准备

org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessage(String, PullMessageRequestHeader, long, CommunicationMode, PullCallback)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}

return null;
}

5.执行网络层请求broker的代码,根据结果执行对应的回调处理

深入到网络的调用过程,可以发现本质是交给了netty的work线程去向broker请求拉取消息,拉取到消息之后异步回调拉取的结果。入口代码如下:

org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessageAsync(String, RemotingCommand, long, PullCallback)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}
org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(String, RemotingCommand, long)
1
2
3
4
5
6
7
8
9
private PullResult pullMessageSync(
final String addr,
final RemotingCommand request,
final long timeoutMillis
) throws RemotingException, InterruptedException, MQBrokerException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processPullResponse(response);
}

6.执行第一步构建的PullCallback的onSuccess/onException逻辑

其中onSuccess中,有根据broker响应的不同结果做不同的逻辑处理:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(...).new PullCallback() {...}.onSuccess(PullResult)、org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(...).new PullCallback() {...}.onException(Throwable)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset(); // 本次拉取消息的offset
pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 设置下一次拉取消息的offset
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 如果没有发现新拉到的消息,将pullRequest放到pullRequestQueue中
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(t.getMsgFoundList(),processQueue,pullRequest
getMessageQueue(),dispatchToConsume);
// 把拉取到的消息丢给 processQueue
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume); // 把processQueue丢给ConsumeMessageService,提交消费任务从而让拉取到的消息进行消费

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { //判断是否有设置拉取消息的时间间隔,有则走间隔拉取消息的逻辑
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else { // 否则将pullRequest放到pullRequestQueue中
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
...
case NO_MATCHED_MSG:
...
case OFFSET_ILLEGAL:
...
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}

7.将拉取到的消息交给consumeMessageService

如上一步的部分代码所示,就是交给consumeMessageService代表的消费消息线程池处理,由于消费消息的方式有两种,提交线程池的入口也有两个:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.submitConsumeRequest(List, ProcessQueue, MessageQueue, boolean)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}

this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.submitConsumeRequest(List, ProcessQueue, MessageQueue, boolean)
1
2
3
4
5
6
7
8
9
10
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}

也就是说,拉到消息后接下来便是消费消息的过程了。当然,在这之前,还有一个指定动作就是将下一次需要拉取的pullRequest再次放到pullRequestQueue中,其用意在上文已有提及。

消息拉取过程小结

一个consumer客户端会分配一个拉取消息线程(PullMessageService),不停地从存放了messageQuene的阻塞队列中take需要拉取消息的messagequene,最后通过调用通知网络层发起拉取消息拉取的网络请求(实际就是交给netty的worker线程拉消息),netty的worker线程拉取到消息后调用处理PullCallback处理拉取的结果。

由于从broker拉取消息的网络请求交给了netty的worker线程处理,并且work线程处理完之后再异步通知拉取结果处理,我们可以知道pullmessage本身并没有太重的操作,同时每次请求broker拉取消息是批量拉取(默认值是每批32条),因此即使一个consuemr客户端只会有一个线程负责所有consumerGroup,也不会有太慢以及太大的性能瓶颈。

消息消费过程

ConsumeMessageService是消息消费接口,有两个实现类,分别是顺序消费(ConsumeMessageOrderlyService)及普通消费(ConsumeMessageConcurrentlyService)。无论ConsumeMessageOrderlyService还是ConsumeMessageConcurrentlyService,在核心方法ConsumeMessageService.submitConsumeRequest(…)的实现里都有一个核心逻辑,就是将代表消息实际消费的任务ConsumeRequest,提交给了一个名为ConsumeMessageThread的线程池去异步执行。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl, MessageListenerConcurrently)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}

那么,重点就在于ConsumeRequest线程任务的run方法了,而无论ConsumeMessageOrderlyService还是ConsumeMessageConcurrentlyService,核心的消费逻辑基本一致:取得业务方法实现的messageListener,调用其consumeMessage方法,得到处理结果。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// 判断消息是否已被删除
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 得到messageListener,也就是实际的消息消费业务实现类
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
// 准备消息消费的上下文
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); // 调用消息消费的业务逻辑
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp; // 业务消费代码处理时长
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 如果消费超时,则有相应处理
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 处理消费消息的结果
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}

接下来就看下,messageListener得到消费结果后做的处理:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
return;
// 以下为统计数据记录的操作
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
// 以下为针对不同消费模式对消费失败的消息做不同的处理
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 广播模式下的消费失败消息处理比较简单,就是遍历失败消息,拿出来之后打个log,默认直接丢弃失败消息就完了
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING: // 集群模式下的失败消息处理
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context); // 失败的消息,直接重新发回broker
if (!result) { // 如果发回broker的操作结果还是失败,则放到msgBackFailed列表,下一步继续作处理
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 如果msgBackFailed列表不为空,说明上一步有消息在发回broker时发送失败,则走另外的方式处理消费失败的消息:也就是晚一些再重试消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 以下为更新offset操作,可以看到,不管消息消费成功与否,都会更新consumerGroup消费到的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); // 这里实际上只是更新RemoteBrokerOffsetStore.offsetTable里所存储的offset值,在实现上是通过定时线程发网络请求提交到broker,详见org.apache.rocketmq.client.impl.factory.MQClientInstance.persistAllConsumerOffset()
}
}

由以上的消费结果处理基本可以得知:

  • 调用业务实现的消费消息逻辑后,得到消费结果,即使消费超时,也最终会根据messageListener执行返回的结果来决定是否重新消费消息。
  • 根据不同的消费模式会对消费失败的结果做不同的处理(实际上按是否顺序消费来划分的两种消息消费模型来看,两者都有不同的消费结果状态定义ConsumeOrderlyStatus、ConsumeConcurrentlyStatus)。对于广播模式,失败消息的处理是直接丢弃;集群模式则会重新消费消息,相应的处理为 1.把消息重新发回Broker,后续作重试处理 2.若发回broker失败,后续作重试消费
  • 关于offset的更新,其实是不管消息消费成功与否,都会有更新consumerGroup所消费到的offset,因为消费失败的消息会作重试处理,其实并不影响offset的更新

接下来,便是更新offset及重试消费消息的过程分析了。

offset更新

无论是LocalFileOffsetStore还是RemoteBrokerOffsetStore,offset更新的逻辑都是一致的:实际上就是每个messageQueue消费到的offset,存放到一个名为offsetTable的内存缓存map里。

org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore.updateOffset(MessageQueue, long, boolean)/org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore.updateOffset(MessageQueue, long, boolean)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}

if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}

那么,之后是怎样把这offset更新提交给broker这边的呢?找回consumer启动过程可以定位到,offset的更新,其实是由一个定时线程提交给broker的。就在 org.apache.rocketmq.client.impl.factory.MQClientInstance.start()这个方法里,this.startScheduledTask()启动的若干个定时线程池里,其中有一个就是定时持久化所有consumer的offset变更

org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask()
1
2
3
4
5
6
7
8
9
10
11
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

这个定时任务,最终会调用到RemoteBrokerOffsetStore.persistAll(Set)方法,把offset变更发回Broker

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore.persistAll(Set)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;

final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (!mqs.isEmpty()) {
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get()); // 将offset的更新,发到broker
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
}

if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}

由于是先消费消息,再提交offset更新,这里有可能存在消费完消息之后,提交offset失败的情况,尽管这种可能性极低,因为提交offset操作实际上只是做了内存的操作,并没有什么重的慢的操作。倒是另外一种情况导致offset更新的丢失的可能性会大很多,因为offset是先存在内存,再通过定时任务间隔数秒走网络请求提交给broker的,这里可能存在譬如这数秒内consumer突然宕机、网络请求失败等因素导致没有成功提交offset到broker,那么在consumer宕机后重启服务,就会出现重复消费消息。
综上,在consumer的业务消费代码务必要保证幂等性。

而事实上,offset提交broker的操作不是仅仅依赖定时任务完成,在consumer关闭退出时,也会有一次主动触发持久化offset到broker的方法调用(DefaultMQPushConsumerImpl里的shutdown方法也是调的DefaultMQPullConsumerImpl.shutdown()方法):

org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.persistConsumerOffset(); // 在consumer关闭时把offset持久化到broker
this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}

这样做,是为了让应用正常退出时,让consumer实例也正常关闭,保证此时也能触发offset更新正确提交到broker。

消费消息重试

在上文中提到,在消息消费失败后,失败消息会重新发回broker。若重新发送给broker也失败了,那么失败消息会被交给定时任务重新尝试消费。这里看一下broker到底是如何处理消费失败的消息的。
在consumerSendMessageBack方法里看到,失败消息重新发回broker使用到的远程请求类型是RequestCode.CONSUMER_SEND_MSG_BACK

org.apache.rocketmq.client.impl.MQClientAPIImpl.consumerSendMessageBack(String, MessageExt, String, int, long, int)
1
2
3
4
5
6
7
8
9
10
11
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
...

根据这个类型的请求,找到broker处理消费失败的消息的入口:SendMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)

org.apache.rocketmq.broker.processor.SendMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)
1
2
3
4
5
6
7
  public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request); // 处理重新发回broker消息的请求
...
org.apache.rocketmq.broker.processor.SendMessageProcessor.consumerSendMsgBack(ChannelHandlerContext, RemotingCommand)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {

ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));

this.executeConsumeMessageHookAfter(context);
}

SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}

if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}

if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}

if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}

MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}

final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);

int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}

if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}

msgExt.setDelayTimeLevel(delayLevel);
}

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}

this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);

return response;
default:
break;
}

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}

这里可以看到,broker在接收到消费端consumer发回来的失败消息后,会转为延时消息存放起来(因为重试消息是有时间间隔的),利用/的功能,broker端到了延迟的时间点,再将该/转换为重试消息(Topic名转为%RETRY%+consumerGroup),此时consumer端对这些消息重新可见,从而会拉取到该重试消息,达到延迟重复消费的目的。

延时消息

延时消息的使用只需要在发送前,指定message的DelayTimeLevel即可。

1
2
3
Message msg = new Message("topic","msg content".getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(3); // 延迟10s
SendResult sendResult = producer.send(msg);

目前RocketMQ支持的延迟时间有:

延时 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,对应的延迟级别(delayTimeLevel)依次为1,2,3,4,5…

实现原理

延迟消息在Producer发送后,在Broker被存放在单独一个Topic:SCHEDULE_TOPIC_XXXX,每一个延迟级别对应该Topic下的一个消费队列,当延迟时间到之时,由定时调度任务(DeliverDelayedMessageTimerTask)读取消息并转换为普通的消息存取到真实指定的Topic下,此时对于consumer端此消息才可见,从而被consumer消费。

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void start() {
// 1. 根据支持的各种延迟级别,添加不同延迟时间的TimeTask
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level); // 获取每个延迟级别在普通消费队列中的offset
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 添加一个专门执行延迟消息持久化的定时任务
this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}

核心实现在DeliverDelayedMessageTimerTask类,其主要逻辑在于扫描延迟消息SCHEDULE_TOPIC_XXXX的队列下的消息,将延迟消息转换成指定Topic的消息

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask.executeOnTimeup()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel)); //读取队列SCHEDULE_TOPIC_XXXX,其中不同的延迟级别对应不同的队列id(queueId=delayLevel-1)

long failScheduleOffset = offset;

if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 循环读取延迟消息
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();

if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}

long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long countdown = deliverTimestamp - now;

if (countdown <= 0) { // 只有当延迟消息发送的消息到达了才处理
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy); // 根据offset值读取SCHEDULE_TOPIC_XXXX队列的消息

if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); // 将读取的消息转换为真实topic的消息(也就是普通消息)
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner); // 存放该消息

if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me



*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 计算下一次读取延迟队列的offset,是定时任务下一次从该位置读取延时消息
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); // 将下一次读取延迟队列的offset存放到一个缓存map中
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}

这里的流程,大致为读取延迟队列的消息,取得消息的开始位置offset,将延迟消息转换为指定topic的普通消息并存起来,修改下一次读取的offset,改的是在内存中的offset而非文件中的,并指定下一次转换延迟消息的TimeTask。

至于持久化offset,在另一个定时任务ScheduleMessageService的persist()方法中实现

org.apache.rocketmq.common.ConfigManager.persist()
1
2
3
4
5
6
7
8
9
10
11
public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}

综上,延迟消息的核心实现比之普通消息,只是多了一道将从延迟消息Topic的队列中取出延迟消息以转储到普通消息Topic下的处理,其余逻辑与普通消息无异。

总结

消息消费部分是实现过程细节众多的一个模块,当中包括消息消费方式、队列重新负载、消息拉取、消费处理、进度存储及同步、定时消息(延迟消息)、保证顺序消费等等。
RocketMQ的消息消费方式包括集群模式与广播模式,不同模式有不同的处理逻辑。在拉取消息时会因应不同的消息处理情况作出不同的处理,譬如引发队列的重新负载,根据当前消费组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一时间只会分配给一个消费者。
消息拉取由 PullMessageService 线程根据 RebalanceService线程创建的拉取任务进行拉取,默认一批拉取32条消息,提交给消费者消费线程池后继续下一次的消息拉取。
如果消息消费过慢产生消息堆积会触发消息消费拉取流控,流控针对的维度目前分别有:消息消费数量、消息大小、偏移量间隔。
消息消费是在消费线程池中并发地对同一消息消费队列的消息进行消费,消费成功后,取出消息处理队列中最小的消息偏移量作为消息消费进度偏移量存在于消息消费进度存储文件中,集群模式消息进度存储在 Broker,广播模式消息进度存储在Consumer端。
如果业务方返回 RECONSUME_LATER ,则 RocketMQ 启用消息消费重试机制,将原消息的主题与队列存储在消息属性中,将消息存储在主题名为SCHEDULE_TOPIC_XXXX的消息消费队列中,等待指定时间后,RocketMQ自动将该消息重新拉取并再次将消息存储在commitlog进而转发到普通消息消费队列供消费者消费,消息消费重试主题为%RETRY%消费者组名。延迟消息并不支持任意精度的延迟时间调度,只支持定义好的延迟级别,可通过在broker配置文件中设置messageDelayLevel。延迟消息更多地使用在辅助消费的场景下。

扫描二维码,分享此文章