PPXu

RocketMQ调试环境配置

2019-05-18

调试环境及工具

  1. JDK 1.8+
  2. Maven
  3. IDE工具(Eclipse/IntelliJ IDEA)

拉取源码

RocketMQ的官方仓库 https://github.com/apache/rocketmq ,为方便调试,可以 Fork 出属于自己的仓库。
使用Eclipse从仓库里拉取代码。拉取并以Maven项目形式导入IDE工具,完成后,Maven会自动下载依赖包。

搭建之前先来一张RocketMQ物理部署架构图(集群):

下文只介绍最小化的RocketMQ环境部署,暂时不考虑Namesrv集群、Broker集群、Comsumer集群。
本文所涉及RocketMQ源码基于 4.5.1 版本(master分支)。
搭建调试环境的过程:
1. 启动 RocketMQ Namesrv (元数据管理,Broker、Producer、Consumer各组件的服务注册与发现)
2. 启动 RocketMQ Broker (真正意义上的MQ服务端,接收消息、推送消费)
3. 启动 RocketMQ Producer (消息生产者,发送消息至Broker)
4. 启动 RocketMQ Consumer (消息消费者,消费Broker推送过来的消息)

消息服务端(Namesrv及Broker)的启动方式:
1. 配置文件方式(配置文件多,路径多,配置项多而繁琐,不推荐×)
2. 代码启动方式(代码简单可控,推荐√)

启动Namesrv

配置文件方式

以Eclipse为例简单说明一下,以配置文件方式来启动Namesrv模块

  1. 打开namesrv模块下的NamesrvStartup.java,选择Debug As,配置Debug Configuration,弹出De bug Configuration s 对话框。
  2. 选中Java Application 条目并单击右键,选择New 弹出Debug Configurations 对话框
  3. 设置RocketMQ 运行主目录。选择Environment 选项卡,添加环境变量ROCKET_HOME
  4. 在RocketMQ 运行主目录中创建conf 、logs 、store 三个文件夹
  5. 从RocketMQ distribution 部署目录中将broker.conf、logback_ broker.xml 文件复制到conf 目录中, logback_ namesrv.xml 文件则只需修改日志文件的目录, broker.conf 文件内容如下所示。定义一个自己的RocketMQ store目录路径,替换即可。
    


6. 在Eclipse Debug 中运行NamesrvStartup ,并输出“ The Name Server boot success.Serializetype=JSON ” 。

代码启动方式(推荐√)

打开 org.apache.rocketmq.namesrv.NameServerInstanceTest 单元测试类,参考 #startup() 方法,我们编写一个自己的启动器,NamesrvStarter, 包含 #main(String[] args) 静态方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws Exception {
// NamesrvConfig 配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// NettyServerConfig 配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876); // 设置端口,不设置的话默认9876端口
// 创建 NamesrvController 对象,并启动
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
namesrvController.initialize();
namesrvController.start();
// 休眠1天,一直hold住线程不退出
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

核心配置均在代码里体现,右键运行,RocketMQ Namesrv 就启动完成。输出日志如下:

1
2
07:54:03.354 [NettyEventExecutor] INFO  RocketmqRemoting - NettyEventExecutor service started
07:54:03.355 [FileWatchService] INFO RocketmqCommon - FileWatchService service started

启动Broker

配置文件方式

以Eclipse为例简单说明一下,以配置文件方式来启动 Broker 模块
1. 打开BrokerStartup.java,移动到Debug As ,选中Debug Configurations ,在弹出的对话框,选择arguments 选项卡,配置 -c 属性指定 broker 配置文件路径。

2. 切换选项卡Environment ,配置RocketMQ 主目录。

3. 以Debug 模式运行BrokerStartup.java ,查看${ROCKET_HOME} /logs/ broker.log 文件,没报错则表示启动成功。

代码启动方式(推荐√)

打开 org.apache.rocketmq.broker.BrokerControllerTest 单元测试类,参考 #testBrokerRestart() 方法,我们编写一个BrokerStarter类,包含#main(String[] args) 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws Exception {
// 设置版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// NettyServerConfig 配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
// BrokerConfig 配置
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerName("broker-a");
brokerConfig.setNamesrvAddr("127.0.0.1:9876");
// MessageStoreConfig 配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setDeleteWhen("04");
messageStoreConfig.setFileReservedTime(48);
messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
messageStoreConfig.setDuplicationEnable(false);

BrokerPathConfigHelper.setBrokerConfigPath("D:/Users/xieruxu669/rockemq/conf/broker.conf"); // 指定broker配置文件的路径
// 创建 BrokerController 对象,并启动
BrokerController brokerController = new BrokerController(//
brokerConfig, //
nettyServerConfig, //
new NettyClientConfig(), //
messageStoreConfig);
brokerController.initialize();
brokerController.start();
// 休眠1天,一直hold住线程不退出持续1天
System.out.println("Broker started...休眠1天");
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

然后,右键运行,RocketMQ Broker 就启动完成了。输出日志如下:

1
Broker started...休眠1天

莫惊讶,Broker启动过程其实是没有自己的INFO日志的,但是,Broker起来必然会到Namesrv上面注册,所以,打开Namesrv的控制台,会看到输出如下日志:

1
2
3
4
5
6
7
8
9
10
11
12
07:57:55.033 [NettyServerCodecThread_1] INFO  RocketmqRemoting - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:2597
07:57:55.035 [NettyServerCodecThread_1] INFO RocketmqRemoting - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:2597]
07:57:55.231 [RemotingExecutorThread_1] DEBUG RocketmqNamesrv - receive request, 103 127.0.0.1:2597 RemotingCommand [code=103, language=JAVA, version=315, opaque=0, flag(B)=0, remark=null, extFields={brokerId=0, bodyCrc32=1180178866, clusterName=DefaultCluster, brokerAddr=10.188.32.24:10911, haServerAddr=10.188.32.24:10912, compressed=false, brokerName=broker-a}, serializeTypeCurrentRPC=JSON]
07:57:55.263 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, %RETRY%please_rename_unique_group_name_4 QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
07:57:55.264 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, BenchmarkTest QueueData [brokerName=broker-a, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
07:57:55.264 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
07:57:55.264 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, TopicTest QueueData [brokerName=broker-a, readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0]
07:57:55.264 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, broker-a QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
07:57:55.264 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, TBW102 QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
07:57:55.265 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
07:57:55.266 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, DefaultCluster QueueData [brokerName=broker-a, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
07:57:55.270 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new broker registered, 10.188.32.24:10911 HAServer: 10.188.32.24:10912b

new broker registered, 10.188.32.24:10911 HAServer: 10.188.32.24:10912b 这说明,Broker已经注册到了Namesrv上,也就是妥妥滴起来了。

可以用telnet命令测试一下连接端口

1
telnet 127.0.0.1 10911

启动 Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
*/
public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/

/*
* Launch the instance.
*/
producer.setNamesrvAddr("127.0.0.1:9876"); // 设定 Namesrv 的地址与端口
producer.start();

for (int i = 0; i < 1000; i++) {
try {

/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}

}

这里第 28 行,增加了 producer.setNamesrvAddr(“127.0.0.1:9876”) 代码块,指明 Producer 使用的 Namesrv。
送命题
这里有道送命题:为什么 Producer 这里指明的是 Namesrv 的地址跟端口,而不是 Broker 呢?

启动 Consumer

打开 org.apache.rocketmq.example.quickstart.Consumer 示例类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/

/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setNamesrvAddr("127.0.0.1:9876"); // 设定 Namesrv 的地址与端口
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");

/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

/*
* Launch the consumer instance.
*/
consumer.start();

System.out.printf("Consumer Started.%n");
}

}

在 25 行,我们还增加了 consumer.setNamesrvAddr(“127.0.0.1:9876”) 代码块,指明 Consumer 使用的 RocketMQ Namesrv 。

然后,右键运行,RocketMQ Consumer 就启动完成。输入日志如下:

1
2
08:17:20.362 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.

如有消息堆积,则会在启动时刷刷出现消费消息的日志
再来一问,这里为何指定的不是Broker,而是Namesrv?

扫描二维码,分享此文章