RocketMQ 4.3 新版事务消息 关于RocketMQ事务消息,之前有过一篇总结,此处再从编码实现上详细捋一捋。 总体流程如下:
实现思路上就是两阶段提交(2PC)的思想,第一阶段是发送HalfMessage(或者说PreparedMessage),这种消息的特性是虽存储到Broker但未经二次确定前会被标记为“暂时无法投递消费”的状态,主要是Producer在本地事务中完成业务数据落库的操作同时同步调用RocketMQ消息发送接口,使用消息的一个属性(PROPERTY_TRANSACTION_PREPARED)标记该消息先提交到Broker但暂时不能对Consumer可见,从而达到二次最终确认提交、消费生效,消息发送成功后,Producer会回调事件监听器(TransactionListener)记录消息的本地事务状态,从而确保消息发送与本地事务的原子性。
实现先提交Broker落地存储,又暂时不可见,关键在于不创建消息对应的索引信息。因为Consumer是通过索引来读取消息从而进行消费的。也就是说,第一阶段仅写入文件存储而不在IndexFile上写入该消息即可。
除此之外,具体实现还有一些策略,譬如Broker在对事务消息的发送请求响应处理接收消息时,会先备份消息的原Topic和原ConsumeQueue,继而替换Topic为指定专门的Topic(RMQ_SYS_TRANS_HALF_TOPIC),然后将消息像普通消息一样地存储在RMQ_SYS_TRANS_HALF_TOPIC这个Topic下的CommitLog及其对应的ConsumeQueue,从而与普通消息进行区分隔离。而上文所述的不构建IndexFile,正是因为没有提交到原Topic(普通消息Topic),而Consumer端的正常消费只对普通Topic有效,那么事务消息就不会被消费。
第二阶段,根据本地事务状态,决定提交或回滚消息。既然替换了Topic来达到两阶段提交,自然要有一个操作去处理该Topic的消息,然后将该消息在满足可以提交的条件下恢复到普通消息Topic,进而被consumer正常消费。而RocketMQ的实现就是Broker端开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC这个Topic下的消息,向Producer发起消息事务状态回查,事务状态目前有commit、rollback、unknown,如果是commit或rollback,则broker对消息进行提交或回滚,并使用将消息写入到另一个Topic(RMQ_SYS_TRANS_OP_HALF_TOPIC)的方式来记录HalfMessage消费操作已进行,如果是unknown,则等待下一次回查,RocketMQ支持设置消息的回查间隔与回查次数。这种使用定时任务预处理特殊Topic消息再决定替换回普通Topic的实现,其实类似于RocketMQ定时消息的处理过程。
大致流程如上,接下来,就是源码分析为主了。
TransactionProducer官方Demo 在example模块里,transaction包里有官方的事务消息生产者TransactionProducer的Demo
org.apache.rocketmq.example.transaction.TransactionProducer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class TransactionProducer { public static void main (String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl (); TransactionMQProducer producer = new TransactionMQProducer ("please_rename_unique_group_name" ); ExecutorService executorService = new ThreadPoolExecutor (2 , 5 , 100 , TimeUnit.SECONDS, new ArrayBlockingQueue <Runnable>(2000 ), new ThreadFactory () { @Override public Thread newThread (Runnable r) { Thread thread = new Thread (r); thread.setName("client-transaction-msg-check-thread" ); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String [] {"TagA" , "TagB" , "TagC" , "TagD" , "TagE" }; for (int i = 0 ; i < 10 ; i++) { try { Message msg = new Message ("TopicTest1234" , tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null ); System.out.printf("%s%n" , sendResult); Thread.sleep(10 ); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0 ; i < 100000 ; i++) { Thread.sleep(1000 ); } producer.shutdown(); } }
可以对以上代码做一些精简,提取出最核心的部分
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl (); TransactionMQProducer producer = new TransactionMQProducer ("please_rename_unique_group_name" ); producer.setTransactionListener(transactionListener); producer.start(); SendResult sendResult = producer.sendMessageInTransaction(msg, null ); producer.shutdown(); }
对比普通消息的Producer,关键在于多了本地事务回调实现,并且需要注册到TransactionProducer中。
事务回调接口及其实现TransactionListenerImpl,代码如下:
org.apache.rocketmq.client.producer.TransactionListener 1 2 3 4 5 public interface TransactionListener { LocalTransactionState executeLocalTransaction (final Message msg, final Object arg) ; LocalTransactionState checkLocalTransaction (final MessageExt msg) ; }
org.apache.rocketmq.example.transaction.TransactionListenerImpl 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger (0 ); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap <>(); @Override public LocalTransactionState executeLocalTransaction (Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3 ; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction (MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0 : return LocalTransactionState.UNKNOW; case 1 : return LocalTransactionState.COMMIT_MESSAGE; case 2 : return LocalTransactionState.ROLLBACK_MESSAGE; default : return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
事务消息发送 从 producer.sendMessageInTransaction(msg, null) 消息发送入口,走起,最终会调到 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction(Message, LocalTransactionExecuter, Object),这个方法实现了事务消息发送的关键逻辑:发送消息 → 回调执行本地事务 → commit/rollback消息
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction(Message, LocalTransactionExecuter, Object) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public TransactionSendResult sendMessageInTransaction (final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException ("tranExecutor is null" , null ); } Validators.checkMessage(msg, this .defaultMQProducer); SendResult sendResult = null ; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true" ); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this .defaultMQProducer.getProducerGroup()); try { sendResult = this .send(msg); } catch (Exception e) { throw new MQClientException ("send message Exception" , e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null ; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null ) { msg.putUserProperty("__transactionId__" , sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"" .equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null ) { log.debug("Used new transaction API" ); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}" , localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception" , e); log.info(msg.toString()); localException = e; } } break ; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break ; default : break ; } try { this .endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed" , e); } TransactionSendResult transactionSendResult = new TransactionSendResult (); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
这里的 this.send(msg) 是按同步发送消息方式 来走的,逻辑大致就是从DefaultMQProducerImpl .send()方法最终会调MQClientAPIImpl的sendMessage(),底层网络通信就是封装的Netty组件NettyRemotingClient,调它的invokeSync()方法进行处理,通过NIO方式走网络把消息发送到MQ服务器Broker。 Broker端同步地响应请求,接收消息,然后返回Producer。 Producer拿到SendResult后,根据这个发送结果,往下走到 switch (sendResult.getSendStatus()) 这一行,对发送状态(成功与否)来执行不同的处理过程。 如果是发送成功(SEND_OK),那么会执行本地事务(回调TransactionListenerImpl的executeLocalTransaction()方法) ,本地事务的返回值则是一个枚举 LocalTransactionState(COMMIT_MESSAGE,ROLLBACK_MESSAGE,UNKNOW) 如果halfMessage发送失败,则直接设置本地事务状态为回滚(localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE),最终执行endTransaction()方法时,会二次确认。
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void endTransaction ( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { ... String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this .brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this .brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden" ); return response; } putMessageResult = this .brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this .brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }
this.brokerController.getTransactionalMessageService().prepareMessage(msgInner) 里边,最终会调到 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge.putHalfMessage(MessageExtBrokerInner)
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge.putHalfMessage(MessageExtBrokerInner) 1 2 3 public PutMessageResult putHalfMessage (MessageExtBrokerInner messageInner) { return store.putMessage(parseHalfMessageInner(messageInner)); }
一目了然,里边就是调store(MessageStore)来存储消息(putMessage)了,但在这之前,先要做一层转换,先调parseHalfMessageInner方法。
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge.parseHalfMessageInner(MessageExtBrokerInner) 1 2 3 4 5 6 7 8 9 10 11 private MessageExtBrokerInner parseHalfMessageInner (MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0 ); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
这里边就有设置事务消息的真实Topic,MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()) 以及即将被替换的Topic,msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()),buildHalfTopic()里边就是返回事务消息的Topic名称RMQ_SYS_TRANS_HALF_TOPIC 。
事务消息的二次确认 这个阶段其实就是对应2PC分布式事务的第二个阶段:提交或回滚事务。Producer端对应的处理主要是DefaultMQProducerImpl.endTransaction方法
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public void endTransaction ( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null ) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader (); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break ; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break ; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break ; default : break ; } requestHeader.setProducerGroup(this .defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null ; this .mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this .defaultMQProducer.getSendMsgTimeout()); }
就是根据本地事务状态通过网络告知Broker端执行不同的操作:COMMIT_MESSAGE–投递消息消费,ROLLBACK_MESSAGE–回滚消息,UNKNOWN–不作为(继续下一次回查事务状态) 。
至于Broker端,则是“事务终结处理器”–EndTransactionProcessor。
org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest(ChannelHandlerContext, RemotingCommand) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { ... OperationResult result = new OperationResult (); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { result = this .brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { this .brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this .brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this .brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } } response.setCode(result.getResponseCode()); response.setRemark(result.getResponseRemark()); return response; }
无论回滚抑或提交事务消息,都要做事务消息的“删除”,所以在两条逻辑分支里,都会看到deletePrepareMessage方法的调用,而这个所谓删除处理,在RocketMQ里边并非真实删除,而是将 prepare 消息存储到另一个名为 RMQ_SYS_TRANS_OP_HALF_TOPIC 的 Topic 中,表示该事务消息(prepare状态的消息)已经处理过(提交或回滚),这么做估计一来是在commitLog文件上面进行操作删除一行数据也不现实,二来保留事务消息的存根,方便消息查找,譬如方便为未处理的事务进行事务回查提供查找依据。
事务的回滚与提交的唯一差别是无需将消息恢复到普通Topic,而是直接“删除”即可。
提交的处理主要是多出来以下几个流程: 3.1 从请求头部获取到物理偏移量(commitLogOffset),之后根据偏移量获取消息实体,这部分逻辑由TransactionalMessageServiceImpl.commitMessage(EndTransactionRequestHeader)实现。 3.2 恢复消息到普通的Topic、ConsumeQueue,构建新的消息对象,由EndTransactionProcessor.endMessageTransaction(MessageExt)实现。 3.3 恢复为普通消息后,再次存储到普通Topic的commitLog文件中,之后就能被consumer端正常消费了。这部分由EndTransactionProcessor.sendFinalMessage(MessageExtBrokerInner)实现。
事务状态回查 Broker在启动时会启动线程回查的服务,在TransactionMessageCheckService的run方法中,该方法会执行到onWaitEnd方法:
org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService.run() 1 2 3 4 5 6 7 8 public void run () { log.info("Start transaction check service thread!" ); long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); while (!this .isStopped()) { this .waitForRunning(checkInterval); } log.info("End transaction check service thread!" ); }
org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService.onWaitEnd() 1 2 3 4 5 6 7 8 protected void onWaitEnd () { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}" , begin); this .brokerController.getTransactionalMessageService().check(timeout, checkMax, this .brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}" , System.currentTimeMillis() - begin); }
这里的check(…)方法里边逻辑还是比较多的,包括偏移量的获取及下次处理的起始偏移量、避免重复处理的控制、判断是否needDiscard/needSkip、各种操作时间的对比等等,然而,这些不是主流程要了解的重点,此处略过。 最终会通过netty传递消息给Producer端,让Producer调用到TransactionListenerImpl的checkLocalTransaction()方法来检查本地事务的状态。
org.apache.rocketmq.example.transaction.TransactionListenerImpl.checkLocalTransaction(MessageExt) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public LocalTransactionState checkLocalTransaction (MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0 : return LocalTransactionState.UNKNOW; case 1 : return LocalTransactionState.COMMIT_MESSAGE; case 2 : return LocalTransactionState.ROLLBACK_MESSAGE; default : return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; }
关于对事务消息不创建索引 上文中提到,如果是事务消息,在还没投递到普通Topic之前,是不会创建索引的,这一点,我们其实可以直接到创建索引的方法里边去看
org.apache.rocketmq.store.index.IndexService.buildIndex(DispatchRequest) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void buildIndex (DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null ) { ... final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: break ; case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: return ; } ... }
小结 总而言之,RocketMQ的事务消息就是基于两阶段提交(2PC)和事务状态回查机制来实现的,也是最终一致的解决方案。所谓两阶段提交,就是先发Half(Prepare)消息,后回调本地事务,保证俩操作原子性,及后待事务提交或回滚时发送commit/rollback命令给Broker,同时提供补偿机制,就是结合定时任务,以专门的线程以特定的频率对Broker上的half消息进行处理,回查Producer端的事务状态从而决定提交或回滚消息。 另外,RocketMQ这样的分布式事务,依然存在因为网络问题或者消费端本身异常导致消费一直失败,最终不能完成整个事务的情况,针对这种情况,目前RocketMQ提供的方案是人工解决。