主从同步(HA)机制
RocketMQ高可用方案
RocketMQ的Broker分为Master与Slave两个角色,一般高可用方案,采取主从架构,具体也有一主一丛、多主多从(多对主从对)等部署方式,如图
主从架构的工作流程,就是在每台Broker配置他们的主从角色,同一个BrokerName的主从Broker集群中,以BrokerId作为主从角色的奠定,BrokerId为0代表Master,大于0则表示Slave。主从之间会维持一个连接,HAConnection,在Master Broker接收到Producer发过来的消息后,会把消息同步到Slave Broker上,这样一旦Master宕机,Slave依然可以提供服务。这就是RocketMQ保障服务高可用的原理。
主从同步模式
主从Broker之间的数据同步包括以下两种:
同步双写
写入消息时,master先写入,之后复制到slave,确认slave也存储了消息后才向producer答复返回成功。
- 优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
- 缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
异步复制
先答复producer,再去向salve复制。
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
- 缺点:Master宕机,磁盘损坏情况,会丢失少量消息。
基于不同的同步模式,Broker的角色类型如下:
org.apache.rocketmq.store.config.BrokerRole1 2 3 4 5
| public enum BrokerRole { ASYNC_MASTER, SYNC_MASTER, SLAVE; }
|
主从同步实现
同步元数据
Slave需要和Master同步的不仅是消息内容,还有一些元数据也需要同步,譬如:
Topic配置信息( syncTopicConfig)、Consumer偏移量信息(syncConsumerOffset)、延迟队列偏移量(syncDelayOffset) 和 订阅组配置信息(syncSubscriptionGroupConfig)。
Broker启动时,会判断自己是否Slave角色,如果是就会启动定时同步任务,从Master复制元数据过来。
org.apache.rocketmq.broker.BrokerController.initialize()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; }
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask syncAll slave exception", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } else { ...
|
这里的 BrokerController.this.slaveSynchronize.syncAll()方法里边,就调用了syncTopicConfig(),syncConsumerOffset(),syncDelayOffset(),syncSubscriptionGroupConfig()
org.apache.rocketmq.broker.slave.SlaveSynchronize.syncAll()1 2 3 4 5 6
| public void syncAll() { this.syncTopicConfig(); this.syncConsumerOffset(); this.syncDelayOffset(); this.syncSubscriptionGroupConfig(); }
|
这里的各种元数据的复制,最终都是通过Netty走网络请求去Master broker获取到元数据到本地,然后走本地文件写入做的持久化操作。
以其中一个为例子,syncConsumerOffset走一遍同步过程:
org.apache.rocketmq.broker.slave.SlaveSynchronize.syncConsumerOffset()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void syncConsumerOffset() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); this.brokerController.getConsumerOffsetManager().getOffsetTable() .putAll(offsetWrapper.getOffsetTable()); this.brokerController.getConsumerOffsetManager().persist(); log.info("Update slave consumer offset from master, {}", masterAddrBak); } catch (Exception e) { log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e); } } }
|
基本逻辑就是 getAllConsumerOffset 方法里边组装一个RemotingCommand,底层通过Netty将请求发送到Master角色的Broker,表示请求获取offset,然后Master那端响应请求,返回Offset给Slave这端,Slave这端本地拿到offset后,接下来就是用一个map存本地内存,以及调persist方法持久化到文件里。
同步元数据的方法基本与上面差不多,相对比较简单,这里省略。
同步消息体
同步消息体是主从同步的核心,也就是同步commitLog内容。CommitLog和元数据的同步不一样:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性的要求,CommitLog同步的要求要高一些。元数据的同步是定时任务进行的,在两次同步的时间查里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一样,不过这种情况还可以补救(手动调整offset,重启consumer等措施)。但是CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出现故障,消息就彻底丢失了。
HAService 是实现commitLog 同步的主体,它在Master 机器和Slave 机器上执行的逻辑不同, 默认是在Master 机器上执行。
RocketMQ的消息主从同步的实现主要在 store 工程下的 org.apache.rocketmq.store.ha包,包含三个类:HAService、HAConnection、WaitNotifyObject。
HAService是主从同步的核心实现类,其内部类包括:
- AcceptSocketService: HA Master端监听HA Client端(Slave端)连接的实现类
- GroupTransferService: 主从同步通知实现类
- HAClient: HA Client端(Slave端)实现类
HAConnection是HA Master服务端HA连接对象的封装,与Broker Slave服务器之间的读写实现类,内部类包括:
- ReadSocketService: 网络读实现类
- WriteSocketService: 网络写实现类
HAService的start入口
org.apache.rocketmq.store.ha.HAService.start()1 2 3 4 5 6
| public void start() throws Exception { this.acceptSocketService.beginAccept(); this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); }
|
大致过程包括:
- 主服务器启动,建立在特定端口上监听从服务器的socket连接
- 启动监听socket连接线程
- 启动主从同步通知线程,等待同步数据的传输
- 启动HA Client端线程(针对Slave端)
消息体的主从复制方式分为两种:同步双写、异步复制。
同步双写
同步双写方式,就是在Master Broker处理Producer发送消息的时候,会同时写入到Slave Broker,待Slave写入成功并返回,Producer才算完成本次消息发送。在消息存储方法org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)中会调handleHA(…)方法来进行同步双写。
org.apache.rocketmq.store.CommitLog.handleHA(AppendMessageResult, PutMessageResult,MessageExt)1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } else { putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } }
}
|
HAService会向其groupTransferService添加一个GroupCommitRequest任务,然后唤醒WriteSocketService通知master进行commitLog同步,然后
master收到通知开始把commitLog发送到slave,request.waitForFlush等待slave获取master的commitLog同步,在slave broker获取到master
broker的通知后,其HAService的GroupTransferService中的requestsRead列表中就有数据,就可以在GroupTransferService中进行处理。
org.apache.rocketmq.store.ha.HAService.GroupTransferService.doWaitTransfer()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private void doWaitTransfer() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); for (int i = 0; !transferOK && i < 5; i++) { this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); }
if (!transferOK) { log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); }
req.wakeupCustomer(transferOK); }
this.requestsRead.clear(); } } }
|
如上文HAService的start()方法代码,HAService在启动的时候,也会启动HAService中的GroupTransferService,它会启动线程去循环执行doWaitTransfer()方法,处理GroupTransferService中的master已同步的GroupCommitRequest,req.wakeupCustomer(transferOK)唤醒前面等待的request.waitForFlush线程,让其handleHA得以返回。
异步复制
同步双写是Master主动往Slave去写入消息,而异步复制则是Slave端对Master的主动复制,我们直接看haClient:
org.apache.rocketmq.store.ha.HAService.HAClient.run()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public void run() { log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { try { if (this.connectMaster()) { if (this.isTimeToReportOffset()) { boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } }
this.selector.select(1000);
boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); }
if (!reportSlaveMaxOffsetPlus()) { continue; }
long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } }
log.info(this.getServiceName() + " service end"); }
|
如上文中的HAService的start()方法所示,在HAService启动时,会去创建一个HAClient实例,它是一个后台线程(ServiceThread),异步复制master消息体就是在slave第一次启动的时候会上报自己的maxOffset,后续默认每隔5秒向master上报自己的maxOffset,上报失败会关闭连接。也就是说,slave和master之间的互相传输,是slave采取的主动迈出第一步,因为 broker 并不知道slave broker的maxOffset,不知道从哪里同步起。
org.apache.rocketmq.store.ha.HAService.AcceptSocketService.run()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public void run() { log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { try { this.selector.select(1000); Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress());
try { HAConnection conn = new HAConnection(HAService.this, sc); conn.start(); HAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } else { log.warn("Unexpected ops in select " + k.readyOps()); } }
selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + " service has exception.", e); } }
log.info(this.getServiceName() + " service end"); }
|
HAService在创建的时候也会创建AcceptSocketService对象,也是一个后台线程,用来接收处理socket请求的,然后创建一个HAConnection封装对应的socket连接进行请求处理。