PPXu

RocketMQ-事务消息

2019-06-15

RocketMQ 4.3 新版事务消息

关于RocketMQ事务消息,之前有过一篇总结,此处再从编码实现上详细捋一捋。
总体流程如下:


实现思路上就是两阶段提交(2PC)的思想,第一阶段是发送HalfMessage(或者说PreparedMessage),这种消息的特性是虽存储到Broker但未经二次确定前会被标记为“暂时无法投递消费”的状态,主要是Producer在本地事务中完成业务数据落库的操作同时同步调用RocketMQ消息发送接口,使用消息的一个属性(PROPERTY_TRANSACTION_PREPARED)标记该消息先提交到Broker但暂时不能对Consumer可见,从而达到二次最终确认提交、消费生效,消息发送成功后,Producer会回调事件监听器(TransactionListener)记录消息的本地事务状态,从而确保消息发送与本地事务的原子性。


实现先提交Broker落地存储,又暂时不可见,关键在于不创建消息对应的索引信息。因为Consumer是通过索引来读取消息从而进行消费的。也就是说,第一阶段仅写入文件存储而不在IndexFile上写入该消息即可。


除此之外,具体实现还有一些策略,譬如Broker在对事务消息的发送请求响应处理接收消息时,会先备份消息的原Topic和原ConsumeQueue,继而替换Topic为指定专门的Topic(RMQ_SYS_TRANS_HALF_TOPIC),然后将消息像普通消息一样地存储在RMQ_SYS_TRANS_HALF_TOPIC这个Topic下的CommitLog及其对应的ConsumeQueue,从而与普通消息进行区分隔离。而上文所述的不构建IndexFile,正是因为没有提交到原Topic(普通消息Topic),而Consumer端的正常消费只对普通Topic有效,那么事务消息就不会被消费。


第二阶段,根据本地事务状态,决定提交或回滚消息。既然替换了Topic来达到两阶段提交,自然要有一个操作去处理该Topic的消息,然后将该消息在满足可以提交的条件下恢复到普通消息Topic,进而被consumer正常消费。而RocketMQ的实现就是Broker端开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC这个Topic下的消息,向Producer发起消息事务状态回查,事务状态目前有commit、rollback、unknown,如果是commit或rollback,则broker对消息进行提交或回滚,并使用将消息写入到另一个Topic(RMQ_SYS_TRANS_OP_HALF_TOPIC)的方式来记录HalfMessage消费操作已进行,如果是unknown,则等待下一次回查,RocketMQ支持设置消息的回查间隔与回查次数。这种使用定时任务预处理特殊Topic消息再决定替换回普通Topic的实现,其实类似于RocketMQ定时消息的处理过程。


大致流程如上,接下来,就是源码分析为主了。

TransactionProducer官方Demo

在example模块里,transaction包里有官方的事务消息生产者TransactionProducer的Demo

org.apache.rocketmq.example.transaction.TransactionProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);

Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}

for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

可以对以上代码做一些精简,提取出最核心的部分

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建一个本地事务回调器
TransactionListener transactionListener = new TransactionListenerImpl();
// 事务消息生产者初始化
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
// 设置生产者的本地事务回调器
producer.setTransactionListener(transactionListener);
producer.start();
// 使用事务方式发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
producer.shutdown();
}

对比普通消息的Producer,关键在于多了本地事务回调实现,并且需要注册到TransactionProducer中。

事务回调接口及其实现TransactionListenerImpl,代码如下:

org.apache.rocketmq.client.producer.TransactionListener
1
2
3
4
5
public interface TransactionListener {
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
org.apache.rocketmq.example.transaction.TransactionListenerImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

事务消息发送

从 producer.sendMessageInTransaction(msg, null) 消息发送入口,走起,最终会调到 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction(Message, LocalTransactionExecuter, Object),这个方法实现了事务消息发送的关键逻辑:发送消息 → 回调执行本地事务 → commit/rollback消息

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction(Message, LocalTransactionExecuter, Object)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener(); //获取之前注册的transactionListener本地事务回调器
if (null == localTransactionExecuter && null == transactionListener) { // 本地事务执行器是旧的实现方式,下面的executeLocalTransactionBranch可以看到是注册了@Deprecated的方法,预期即将在新版中废除,这里4.5.1版本暂时还有保留,估计是为了兼容旧版。
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg); //发送half消息(prepare)
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: { // 消息发送成功
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) { // 实际上我们localTransactionExecuter传进来为null,不走这个分支,这里估计是为兼容旧版而保留
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg); // 回调执行本地事务
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try { //根据本地事务执行的结果来决定commit消息或者rollback消息,最终结束事务消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

这里的 this.send(msg) 是按同步发送消息方式来走的,逻辑大致就是从DefaultMQProducerImpl .send()方法最终会调MQClientAPIImpl的sendMessage(),底层网络通信就是封装的Netty组件NettyRemotingClient,调它的invokeSync()方法进行处理,通过NIO方式走网络把消息发送到MQ服务器Broker。
Broker端同步地响应请求,接收消息,然后返回Producer。
Producer拿到SendResult后,根据这个发送结果,往下走到 switch (sendResult.getSendStatus()) 这一行,对发送状态(成功与否)来执行不同的处理过程。
如果是发送成功(SEND_OK),那么会执行本地事务(回调TransactionListenerImpl的executeLocalTransaction()方法),本地事务的返回值则是一个枚举 LocalTransactionState(COMMIT_MESSAGE,ROLLBACK_MESSAGE,UNKNOW)
如果halfMessage发送失败,则直接设置本地事务状态为回滚(localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE),最终执行endTransaction()方法时,会二次确认。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
...
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

this.brokerController.getTransactionalMessageService().prepareMessage(msgInner)里边,最终会调到 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge.putHalfMessage(MessageExtBrokerInner)

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge.putHalfMessage(MessageExtBrokerInner)
1
2
3
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}

一目了然,里边就是调store(MessageStore)来存储消息(putMessage)了,但在这之前,先要做一层转换,先调parseHalfMessageInner方法。

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge.parseHalfMessageInner(MessageExtBrokerInner)
1
2
3
4
5
6
7
8
9
10
11
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

这里边就有设置事务消息的真实Topic,MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()) 以及即将被替换的Topic,msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()),buildHalfTopic()里边就是返回事务消息的Topic名称RMQ_SYS_TRANS_HALF_TOPIC

事务消息的二次确认

这个阶段其实就是对应2PC分布式事务的第二个阶段:提交或回滚事务。Producer端对应的处理主要是DefaultMQProducerImpl.endTransaction方法

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

就是根据本地事务状态通过网络告知Broker端执行不同的操作:COMMIT_MESSAGE–投递消息消费,ROLLBACK_MESSAGE–回滚消息,UNKNOWN–不作为(继续下一次回查事务状态)

至于Broker端,则是“事务终结处理器”–EndTransactionProcessor。

org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest(ChannelHandlerContext, RemotingCommand)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
...
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
  1. 无论回滚抑或提交事务消息,都要做事务消息的“删除”,所以在两条逻辑分支里,都会看到deletePrepareMessage方法的调用,而这个所谓删除处理,在RocketMQ里边并非真实删除,而是将 prepare 消息存储到另一个名为 RMQ_SYS_TRANS_OP_HALF_TOPIC 的 Topic 中,表示该事务消息(prepare状态的消息)已经处理过(提交或回滚),这么做估计一来是在commitLog文件上面进行操作删除一行数据也不现实,二来保留事务消息的存根,方便消息查找,譬如方便为未处理的事务进行事务回查提供查找依据。
  2. 事务的回滚与提交的唯一差别是无需将消息恢复到普通Topic,而是直接“删除”即可。
  3. 提交的处理主要是多出来以下几个流程:
    3.1 从请求头部获取到物理偏移量(commitLogOffset),之后根据偏移量获取消息实体,这部分逻辑由TransactionalMessageServiceImpl.commitMessage(EndTransactionRequestHeader)实现。
    3.2 恢复消息到普通的Topic、ConsumeQueue,构建新的消息对象,由EndTransactionProcessor.endMessageTransaction(MessageExt)实现。
    3.3 恢复为普通消息后,再次存储到普通Topic的commitLog文件中,之后就能被consumer端正常消费了。这部分由EndTransactionProcessor.sendFinalMessage(MessageExtBrokerInner)实现。

事务状态回查

Broker在启动时会启动线程回查的服务,在TransactionMessageCheckService的run方法中,该方法会执行到onWaitEnd方法:

org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService.run()
1
2
3
4
5
6
7
8
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService.onWaitEnd()
1
2
3
4
5
6
7
8
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); //获取配置的事务执行超时时间(6s),也就是说,broker端存储消息时间+此值 大于当前时间,才会触发起回查
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); //获取配置的最大检测次数(15次),如超出这个次数,默认丢弃消息(回滚消息)
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); //开始检测
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

这里的check(…)方法里边逻辑还是比较多的,包括偏移量的获取及下次处理的起始偏移量、避免重复处理的控制、判断是否needDiscard/needSkip、各种操作时间的对比等等,然而,这些不是主流程要了解的重点,此处略过。
最终会通过netty传递消息给Producer端,让Producer调用到TransactionListenerImpl的checkLocalTransaction()方法来检查本地事务的状态。

org.apache.rocketmq.example.transaction.TransactionListenerImpl.checkLocalTransaction(MessageExt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}

关于对事务消息不创建索引

上文中提到,如果是事务消息,在还没投递到普通Topic之前,是不会创建索引的,这一点,我们其实可以直接到创建索引的方法里边去看

org.apache.rocketmq.store.index.IndexService.buildIndex(DispatchRequest)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
   public void buildIndex(DispatchRequest req) {
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
...

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
...
}

小结

总而言之,RocketMQ的事务消息就是基于两阶段提交(2PC)和事务状态回查机制来实现的,也是最终一致的解决方案。所谓两阶段提交,就是先发Half(Prepare)消息,后回调本地事务,保证俩操作原子性,及后待事务提交或回滚时发送commit/rollback命令给Broker,同时提供补偿机制,就是结合定时任务,以专门的线程以特定的频率对Broker上的half消息进行处理,回查Producer端的事务状态从而决定提交或回滚消息。
另外,RocketMQ这样的分布式事务,依然存在因为网络问题或者消费端本身异常导致消费一直失败,最终不能完成整个事务的情况,针对这种情况,目前RocketMQ提供的方案是人工解决。

扫描二维码,分享此文章