PPXu

RocketMQ相关总结

2018-11-18

总结RocketMQ相关知识点,便于回顾记忆…

RocketMQ相关网址

官网:https://rocketmq.incubator.apache.org/

源码:https://github.com/apache/rocketmq

RocketMQ总体特点

  1. 能够保证严格的消息顺序

  2. 提供丰富的消息拉取模式

  3. 高效的订阅者水平扩展能力

  4. 实时的消息订阅机制

  5. 亿级消息堆积能力

核心原理

数据结构

主要以commitLog为消息存储的数据结构。

(1)所有数据单独储存到commit Log ,完全顺序写,随机读

(2)对最终用户展现的队列实际只储存消息在Commit Log 的位置信息,并且串行方式刷盘

(3)按照MessageId查询消息

(4)根据查询的key的hashcode%slotNum得到具体的槽位置

(5)根据slotValue(slot对应位置的值)查找到索引项列表的最后一项

(6)遍历索引项列表返回查询时间范围内的结果集

刷盘策略


作为一款纯 Java 语言开发的消息引擎,RocketMQ 自主研发的存储组件,依赖 Page Cache 进行加速和堆积,意味着它的性能会受到 JVM、 GC、内核、Linux 内存管理机制、文件 IO 等因素的影响。Rocketmq中的所有消息都是持久化到硬盘的,但会使用系统PageCache加速访问,消息的落地方式是先写PageCache后刷盘,可以保证内存与磁盘都有一份数据,访问时,可以直接从内存读取。如图所示,一条消息从客户端发送出,到最终落盘持久化,每个环节都有产生延迟的风险。

《不一样的技术创新-阿里巴巴2016双十一背后的技术》一书中提到,有线上数据显示,RocketMQ 写消息链路存在偶发的高达数秒的延迟

  • 同步刷盘
    同步刷盘是指,broker在收到每个消息后,都是先要保存到硬盘上,然后再给producer确认。

  • 异步刷盘
    异步刷盘就是先回复确认,然后批量保存到硬盘上。异步刷盘有更好的性能,当然也有更大的丢失消息的风险。

角色关系图

架构图

架构特点

所有的集群都具有水平扩展能力,无单点障碍。

  • NameServer以轻量级的方式提供服务发现和路由功能,每个NameServer存有全量的路由信息,提供对等的读写服务,是一个几乎无状态节点,可集群部署,节点之间无任何信息同步,支持快速扩缩容。
  • Broker为实际的消息队列服务器(MQ Server),在整体架构中,可以看作是Producer与Comsumer之间的驳脚者,消息通过它从Producer接收,并存储,后转发给Consumer。以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型,具备多副本容错机制(2副本或3副本)、强大的削峰填谷以及上亿级消息堆积能力,同时可严格保证消息的有序性。

    除此之外,Broker还提供了同城异地容灾能力,丰富的Metrics统计以及告警机制。这些都是传统消息系统无法比拟的。

  • Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
  • Consumer也由用户部署,支持PUSH和PULL两种消费模式(推模式的实现也是用的拉方式),支持集群消费和广播消息,提供实时的消息订阅机制,满足大多数消费场景。

RocketMQ亮点-支持多种消费模式

RocketMQ最初还未正式称为RocketMQ,一开始v1.0还是叫metaQ,经历了3代的重要演进,v3.0开始改名RocketMQ,其重要改进包括消息获取模式。

  1. 第一代,推模式,数据存储采用关系型数据库。在这种模式下,消息具有很低的延迟特性,并且很容易支持分布式事务。尤其在阿里淘宝这种高频交易场景中,具有非常广泛地应用。典型代表包括Notify、Napoli。
  2. 第二代,拉模式,自研的专有消息存储。在日志处理方面能够媲美Kafka的吞吐性能,但考虑到淘宝的应用场景,尤其是其交易链路的高可靠需求,消息引擎并没有一味的追求吞吐,而是将稳定可靠放在首位。因为采用了长连接拉模式,在消息的实时方面丝毫不逊推模式。典型代表MetaQ。
  3. 第三代,以拉模式为主,兼有推模式的高性能、低延迟消息引擎RocketMQ,在二代功能特性的基础上,为电商金融领域添加了可靠重试、基于文件存储的分布式事务等特性,并做了大量优化。从2012年开始,经历了历次双11核心交易链路检验。目前已经捐赠给Apache基金会。

不难看出,RocketMQ其实是伴随着阿里巴巴整个生态的成长,逐渐衍生出来的高性能,高可用,兼具高吞吐量和低延迟、能够同时满足电商领域和金融领域的极尽苛刻场景的消息中间件。

Broker部署方式

单Master

这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

多Master模式

多台Broker,全是Master

优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。

HA方案:多Master/Slave对模式

每个 Master 配对一个 Slave,有多对Master-Slave。

Master/Slave复制方式

  • 同步双写
    写入消息时,master先写入,之后复制到slave,确认slave也存储了消息后才向producer答复返回成功。

    优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
    缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。

  • 异步复制
    先答复producer,再去向salve复制。

    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
    缺点:Master宕机,磁盘损坏情况,会丢失少量消息。

通过同步复制技术可以完全避免单点,同步复制势必会影响性能,适合应用于消息可靠性要求极高的场合。RocketMQ从3.0版本开始支持同步双写。

两种消息消费的交互方式的区别

留意源码可以得知:
consumer被分为2类:MQPullConsumer和MQPushConsumer,本质都是拉模式(pull),即consumer轮询从broker拉取消息。
区别在于:

  • push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对客户端而言,感觉消息是被推送(push)过来的。

  • pull方式里,取消息的过程,RocketMQ交给了用户自己实现,首先通过待消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class Consumer {

// Java缓存
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

private static final String nameServAndAddr = "172.16.235.77:9876;172.16.235.78:9876";

private static final String consumerGroupName ="ConsumerGroupName";

private static final String consumber ="Consumber";

/**
* 主动拉取方式消费
*
* @throws MQClientException
*/
public static void main(String[] args) throws MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一 ,最好使用服务的包名区分同一服务,一类Consumer集合的名称,
* 这类Consumer通常消费一类消息,且消费逻辑一致
* PullConsumer:Consumer的一种,应用通常主动调用Consumer的拉取消息方法从Broker拉消息,主动权由应用控制
*/
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroupName);
// //nameserver服务
consumer.setNamesrvAddr(nameServAndAddr);
consumer.setInstanceName(consumber);
consumer.start();

// 拉取订阅主题的队列,默认队列大小是4
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(
mq, null, getMessageQueueOffset(mq), 32);
List<MessageExt> list = pullResult.getMsgFoundList();
if (list != null && list.size() < 100) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}

private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null) {
System.out.println(offset);
return offset;
}
return 0;
}
}

RocketMQ使用长轮询Pull方式,可保证消息非常实时,消息实时性不低于Push

长轮询Pull:
建立长连接,每隔一定时间,客户端向服务端发起请求询问数据,如有则返回数据,如无则返回空,然后关闭请求。
长轮询与普通轮询的不同之处在于,哪怕服务端此时没有数据,连接还是保持的,等到有数据时可以立即返回(也就模拟push),或者超时返回。
长轮询好处在于可以减少无效请求,保证消息实时性获取,又不会造成积压。

推拉模式的具体选取视乎实际情况而定,在一些离线大批量数据处理系统中,消息获取的需求强调的更多是吞吐量,而非低延迟,此时拉模式可能更优。

RocketMQ 高可用保障

通过可用性计算公式可以看出,要提升系统的可用性,需要在保障系统健壮性以延长平均无故障时间的基础上,进一步加强系统的故障自动恢复能力以缩短平均故障修复时间。 RocketMQ 高可用架构设计并实现了 Controller 组件,按照单主状态、异步复制状态、半同步状态以及最终的同步复制状态的有限状态机进行转换。在最终的同步复制状态下,Master 和 Slave 任一节点故障时,其它节点能够在秒级时间内切换到单主状态继续提供服务。相比于之前人工介入重启来恢复服务,RokcetMQ 高可用架构赋予了系统故障自动恢复的能力,能极大缩短平均故障恢复时间,提升系统的可用性。

下图描述了 RocketMQ 高可用架构中有限状态机的转换:

1) 第一个节点启动后,Controller 控制状态机切换为单主状态,通知启动节点以 Master 角色提供服务。
2) 第二个节点启动后, Controller 控制状态机切换成异步复制状态。Master 通过异步方式向 Slave 复制数据。
3) 当 Slave 的数据即将赶上 Master,Controller 控制状态机切换成半同步状态,此时命中 Master 的写请求会被 Hold 住,直到 Master以异步方式向 Slave 复制了所有差异的数据。
4) 当半同步状态下 Slave 的数据完全赶上 Master 时,Controller控制状态机切换成同步复制模式,Mater 开始以同步方式向 Slave 复制数据。该状态下任一节点出现故障,其它节点能够在秒级内切换到单主状态继续提供服务。
Controller 组件控制 RocketMQ 按照单主状态,异步复制状态,半同步状态,同步复制状态的顺序进行状态机切换。中间状态的停留时间与主备之间的数据差异以及网络带宽有关,但最终都会稳定在同步复制状态下。

如何保证消息有序消费?

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。

假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

M1发送到S1后,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端后,通知S2,然后S2再将M2发送到消费端。

这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到,也就不能保证M1被先消费,那么就需要在MQ Server集群维护消息的顺序。那么如何解决?一种简单的方式就是将M1、M2发送到同一个Server上:

这样可以保证M1先于M2到达MQServer(客户端等待M1成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

这个模型,理论上可以保证消息的顺序,但在实际运用中你应该会遇到下面的问题:

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就先被消费,仍然不能保证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费。如何解决这个问题?将M1和M2发往同一个消费者即可,且发送M1后,需要消费端响应成功后才能发送M2。

但又会引入另外一个问题,如果发送M1后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达,另外一种情况是消费端1已经响应,但是Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就是我们后面要说的第二个问题,消息重复问题。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,而且处理问题也比较容易,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系

但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  1. 不关注乱序的应用实际大量存在
  2. 队列无序并不意味着消息无序

最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。

一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

1
2
3
4
5
6
7
8
9
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
private SendResult send()  {
// 获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根据我们的算法,选择一个发送队列
// 这里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}

消息重复–如何保证幂等性

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。

造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  1. 消费端处理消息的业务逻辑保持幂等性
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。

旧版事务消息:

参考 RocketMQ总结整理-事务消息

RocketMQ 4.3 新版事务消息:

这张图说明了事务消息的大致方案,分为两个逻辑:正常事务消息的发送及提交、事务消息的补偿流程。

  • 事务消息发送及提交:

    发送消息(half消息)
    服务端响应消息写入结果
    根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
    根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

  • 补偿流程:

    对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
    Producer收到回查消息,检查回查消息对应的本地事务的状态
    根据本地事务状态,重新Commit或者Rollback
    补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

新版事务消息的设计原理

RocketMQ事务消息的提交方式是2PC,一阶段消息可以理解为Prepared Message或者Pending Message,实际上就是说,消息要先提交并落地到Broker,但不能是对用户可见的。


如何做到写入了消息但是对用户不可见?——写入消息数据,但是不创建对应的消息的索引信息。

RocketMQ消息在服务端的存储结构如上,每条消息都会有对应的索引信息,Consumer通过索引读取消息。
那么实现一阶段写入的消息不被用户消费(需要在Commit后才能消费),只需要写入Storage Queue,但是不构建Index Queue即可。

RocketMQ中具体实现策略是:写入的如果是事务消息,则对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中。
代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

替换属性后这条消息被写入到TransactionalMessageUtil.buildHalfTopic()的Queue 0中:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TransactionalMessageUtil {
public static final String REMOVETAG = "d";
public static Charset charset = Charset.forName("utf-8");

...

public static String buildHalfTopic() {
return MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
}

...

}

在完成Storage Queue的写入后,在appendCallback中,普通消息会去构建消息索引,而如果发现是事务消息,则跳过了创建索引的逻辑。

RocketMQ将事务消息一阶段发送的消息称为Half消息,我们可以理解为,这条消息相对普通消息的操作只做了一半(只落地而未索引),不算是一条完整的普通消息

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。

先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。

但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。

RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息是否状态已经确定(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。

引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。

Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

Half消息的索引构建

在执行二阶段的Commit操作时,需要构建出Half消息的索引。

一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。

所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

如何处理二阶段失败的消息

如果二阶段失败了,比如在Commit操作时出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。

RocketMQ采用了一种补偿机制,称为“回查”。

Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。

Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的一点是具体实现中,在回查前,系统会执行putBackHalfMsgQueue操作,即将Half消息重新写一遍到Half消息的Queue中。这么做其实是为了能有效的推进上面的CheckPoint。

新版事务消息设计总结

  • 通过写Half消息的方式来实现一阶段消息对用户不可见
  • 通过Op消息来标记事务消息的状态
  • 通过读取Half消息来生成一条新的Normal消息来完成二阶段Commit之后消息对Consumer可见
  • 通过Op消息来执行回查

Reference

扫描二维码,分享此文章