PPXu

RocketMQ-高可用

2019-06-22

主从同步(HA)机制

RocketMQ高可用方案

RocketMQ的Broker分为Master与Slave两个角色,一般高可用方案,采取主从架构,具体也有一主一丛、多主多从(多对主从对)等部署方式,如图

主从架构的工作流程,就是在每台Broker配置他们的主从角色,同一个BrokerName的主从Broker集群中,以BrokerId作为主从角色的奠定,BrokerId为0代表Master,大于0则表示Slave。主从之间会维持一个连接,HAConnection,在Master Broker接收到Producer发过来的消息后,会把消息同步到Slave Broker上,这样一旦Master宕机,Slave依然可以提供服务。这就是RocketMQ保障服务高可用的原理。

主从同步模式

主从Broker之间的数据同步包括以下两种:

同步双写

写入消息时,master先写入,之后复制到slave,确认slave也存储了消息后才向producer答复返回成功。

  • 优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
  • 缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。

异步复制

先答复producer,再去向salve复制。

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
  • 缺点:Master宕机,磁盘损坏情况,会丢失少量消息。

基于不同的同步模式,Broker的角色类型如下:

org.apache.rocketmq.store.config.BrokerRole
1
2
3
4
5
public enum BrokerRole {
ASYNC_MASTER,
SYNC_MASTER,
SLAVE;
}

主从同步实现

同步元数据

Slave需要和Master同步的不仅是消息内容,还有一些元数据也需要同步,譬如:
Topic配置信息( syncTopicConfig)、Consumer偏移量信息(syncConsumerOffset)、延迟队列偏移量(syncDelayOffset) 和 订阅组配置信息(syncSubscriptionGroupConfig)。
Broker启动时,会判断自己是否Slave角色,如果是就会启动定时同步任务,从Master复制元数据过来。

org.apache.rocketmq.broker.BrokerController.initialize()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
//slave 定期同步
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
...

这里的 BrokerController.this.slaveSynchronize.syncAll()方法里边,就调用了syncTopicConfig(),syncConsumerOffset(),syncDelayOffset(),syncSubscriptionGroupConfig()

org.apache.rocketmq.broker.slave.SlaveSynchronize.syncAll()
1
2
3
4
5
6
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}

这里的各种元数据的复制,最终都是通过Netty走网络请求去Master broker获取到元数据到本地,然后走本地文件写入做的持久化操作。
以其中一个为例子,syncConsumerOffset走一遍同步过程:

org.apache.rocketmq.broker.slave.SlaveSynchronize.syncConsumerOffset()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}

基本逻辑就是 getAllConsumerOffset 方法里边组装一个RemotingCommand,底层通过Netty将请求发送到Master角色的Broker,表示请求获取offset,然后Master那端响应请求,返回Offset给Slave这端,Slave这端本地拿到offset后,接下来就是用一个map存本地内存,以及调persist方法持久化到文件里。

同步元数据的方法基本与上面差不多,相对比较简单,这里省略。

同步消息体

同步消息体是主从同步的核心,也就是同步commitLog内容。CommitLog和元数据的同步不一样:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性的要求,CommitLog同步的要求要高一些。元数据的同步是定时任务进行的,在两次同步的时间查里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一样,不过这种情况还可以补救(手动调整offset,重启consumer等措施)。但是CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出现故障,消息就彻底丢失了。

HAService 是实现commitLog 同步的主体,它在Master 机器和Slave 机器上执行的逻辑不同, 默认是在Master 机器上执行。
RocketMQ的消息主从同步的实现主要在 store 工程下的 org.apache.rocketmq.store.ha包,包含三个类:HAService、HAConnection、WaitNotifyObject。

HAService是主从同步的核心实现类,其内部类包括:

  • AcceptSocketService: HA Master端监听HA Client端(Slave端)连接的实现类
  • GroupTransferService: 主从同步通知实现类
  • HAClient: HA Client端(Slave端)实现类

HAConnection是HA Master服务端HA连接对象的封装,与Broker Slave服务器之间的读写实现类,内部类包括:

  • ReadSocketService: 网络读实现类
  • WriteSocketService: 网络写实现类

HAService的start入口

org.apache.rocketmq.store.ha.HAService.start()
1
2
3
4
5
6
public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}

大致过程包括:

  1. 主服务器启动,建立在特定端口上监听从服务器的socket连接
  2. 启动监听socket连接线程
  3. 启动主从同步通知线程,等待同步数据的传输
  4. 启动HA Client端线程(针对Slave端)
    消息体的主从复制方式分为两种:同步双写、异步复制。

同步双写

同步双写方式,就是在Master Broker处理Producer发送消息的时候,会同时写入到Slave Broker,待Slave写入成功并返回,Producer才算完成本次消息发送。在消息存储方法org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)中会调handleHA(…)方法来进行同步双写。

org.apache.rocketmq.store.CommitLog.handleHA(AppendMessageResult, PutMessageResult,MessageExt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}

}

HAService会向其groupTransferService添加一个GroupCommitRequest任务,然后唤醒WriteSocketService通知master进行commitLog同步,然后
master收到通知开始把commitLog发送到slave,request.waitForFlush等待slave获取master的commitLog同步,在slave broker获取到master
broker的通知后,其HAService的GroupTransferService中的requestsRead列表中就有数据,就可以在GroupTransferService中进行处理。

org.apache.rocketmq.store.ha.HAService.GroupTransferService.doWaitTransfer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}

if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}

req.wakeupCustomer(transferOK);
}

this.requestsRead.clear();
}
}
}

如上文HAService的start()方法代码,HAService在启动的时候,也会启动HAService中的GroupTransferService,它会启动线程去循环执行doWaitTransfer()方法,处理GroupTransferService中的master已同步的GroupCommitRequest,req.wakeupCustomer(transferOK)唤醒前面等待的request.waitForFlush线程,让其handleHA得以返回。

异步复制

同步双写是Master主动往Slave去写入消息,而异步复制则是Slave端对Master的主动复制,我们直接看haClient:

org.apache.rocketmq.store.ha.HAService.HAClient.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
if (this.connectMaster()) {
// 先汇报最大物理Offset || 定时心跳方式汇报
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);//上报master
if (!result) {
this.closeMaster();
}
}

// 等待应答
this.selector.select(1000);

// 接收数据
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}

// 只要本地有更新,就汇报最大物理Offset
if (!reportSlaveMaxOffsetPlus()) {
continue;
}

// 检查Master的反向心跳
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}

log.info(this.getServiceName() + " service end");
}

如上文中的HAService的start()方法所示,在HAService启动时,会去创建一个HAClient实例,它是一个后台线程(ServiceThread),异步复制master消息体就是在slave第一次启动的时候会上报自己的maxOffset,后续默认每隔5秒向master上报自己的maxOffset,上报失败会关闭连接。也就是说,slave和master之间的互相传输,是slave采取的主动迈出第一步,因为 broker 并不知道slave broker的maxOffset,不知道从哪里同步起。

org.apache.rocketmq.store.ha.HAService.AcceptSocketService.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();

if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());

try {
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}

selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}

log.info(this.getServiceName() + " service end");
}

HAService在创建的时候也会创建AcceptSocketService对象,也是一个后台线程,用来接收处理socket请求的,然后创建一个HAConnection封装对应的socket连接进行请求处理。

扫描二维码,分享此文章