rocketmq中broker启动同步topic到namesrv过程

rocketmq中broker启动同步topic到namesrv过程.

broker端

topic的获取

broker启动会初始化一些系统或者用户自建的topic,在TopicConfigManager中实现,构造TopicConfigManager的时候初始化系统内置的topic,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
{
// MixAll.SELF_TEST_TOPIC
String topic = MixAll.SELF_TEST_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// MixAll.DEFAULT_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.DEFAULT_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
{
// MixAll.BENCHMARK_TOPIC
String topic = MixAll.BENCHMARK_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1024);
topicConfig.setWriteQueueNums(1024);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{

String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
}
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{

String topic = this.brokerController.getBrokerConfig().getBrokerName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
}
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// MixAll.OFFSET_MOVED_EVENT
String topic = MixAll.OFFSET_MOVED_EVENT;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}

用户自建的topic会通过TopicConfigManager的父类ConfigManager.load方法把本地存储的${user.home}/store/config/topic.json 加载出来,然后通过decode方法把json的数据转换出来保存到topicConfigTable表里面去。

这样topic数据就获取到了。
当然第一次运行broker的时候,只有系统内建的topic。

同步到namesrv

启动broker服务中,通过调用BrokerController.start()方法启动broker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 启动broker服务
*/
public void start() throws Exception
{
if (this.messageStore != null) {
this.messageStore.start();
}

if (this.remotingServer != null) {
this.remotingServer.start();
}

if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}

if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}

if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}

if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}

if (this.filterServerManager != null) {
this.filterServerManager.start();
}

/**
* 注册broker到namesrv上去
*/
this.registerBrokerAll(true, false);

/**
* 定时注册broker到namesrv上去
* 启动10秒后运行
* 每隔30秒定时注册broker到namesrv上去
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}

if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}

启动过程中调用了registerBrokerAll方法,注册broker到namesrv上去。
然后又启动一个定时注册任务来保证broker在namesrv上的可用性。
registerBrokerAll定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) 
{
/**
* topic配置序列化包装对象
*/
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

/**
* 如果broker没有读写权限
*/
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission()))
{
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), //topic名称
topicConfig.getReadQueueNums(), //读队列大小
topicConfig.getWriteQueueNums(), //写队列大小
this.brokerConfig.getBrokerPermission()); //权限
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}

//注册broker到namesrv
RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(//
this.brokerConfig.getBrokerClusterName(), //broker配置的集群名称
this.getBrokerAddr(), // broker地址
this.brokerConfig.getBrokerName(), // broker名称
this.brokerConfig.getBrokerId(), // brokerid
this.getHAServerAddr(), // ha的服务地址
topicConfigWrapper,// topic配置文件
this.filterServerManager.buildNewFilterServerList(),//过滤服务列表
oneway,//是否选择oneway方式 false
this.brokerConfig.getRegisterBrokerTimeoutMills()); //超时时间

/**
* 忽略下面
*/
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}

this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}

brokerOuterAPI.registerBrokerAll定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* @描述: 注册broker到namesrv上去
* @修改内容
* @参数:@param clusterName
* @参数:@param brokerAddr
* @参数:@param brokerName
* @参数:@param brokerId
* @参数:@param haServerAddr
* @参数:@param topicConfigWrapper
* @参数:@param filterServerList
* @参数:@param oneway
* @参数:@param timeoutMills
* @参数:@return
* @throws
*/
public RegisterBrokerResult registerBrokerAll(//
final String clusterName, // 1 集群名称
final String brokerAddr, // 2 broker地址
final String brokerName, // 3 broker名称
final long brokerId, // 4 brokerid
final String haServerAddr, // 5 haServerAddr
final TopicConfigSerializeWrapper topicConfigWrapper, // 6 topic配置
final List<String> filterServerList, // 7 过滤server
final boolean oneway,// 8 是否oneway方式
final int timeoutMills// 9 超时事件
) {
RegisterBrokerResult registerBrokerResult = null;

/**
* 获得nameserv地址集合
*/
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
/**
* 发送注册broker的请求消息到namesrv上去
*/
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
if (result != null) {
registerBrokerResult = result;
}

log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, " + namesrvAddr, e);
}
}
}

return registerBrokerResult;
}

真实注册逻辑在registerBroker实现方式如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/**
* @描述: 注册broker的到namesrv上去
* @修改内容
* @参数:@param namesrvAddr namesrv地址
* @参数:@param clusterName 集群名称
* @参数:@param brokerAddr broker地址
* @参数:@param brokerName broker名称
* @参数:@param brokerId brokerid
* @参数:@param haServerAddr haServer
* @参数:@param topicConfigWrapper topic配置
* @参数:@param filterServerList 过滤服务器
* @参数:@param oneway oneway方式
* @参数:@param timeoutMills 超时时间
* @参数:@return RegisterBrokerResult 结果
* @参数:@throws RemotingCommandException
* @参数:@throws MQBrokerException
* @参数:@throws RemotingConnectException
* @参数:@throws RemotingSendRequestException
* @参数:@throws RemotingTimeoutException
* @参数:@throws InterruptedException
* @throws
*/
private RegisterBrokerResult registerBroker(//
final String namesrvAddr, //
final String clusterName, // 1
final String brokerAddr, // 2
final String brokerName, // 3
final long brokerId, // 4
final String haServerAddr, // 5
final TopicConfigSerializeWrapper topicConfigWrapper, // 6
final List<String> filterServerList, // 7
final boolean oneway,// 8
final int timeoutMills// 9
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException
{
/**
* 注册broker请求的消息头
* 设置broker的属性
*/
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);

/**
* 创建请求对象
*/
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);

/**
* 消息体
*/
RegisterBrokerBody requestBody = new RegisterBrokerBody();

/**
* 把broker上的topic信息加到消息体中去
*/
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);

/**
* 把序列化的请求消息体附加到请求对象上去
*/
request.setBody(requestBody.encode());

/**
* 如果是oneway方式发送
*/
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
}

/**
* 直接返回
*/
return null;
}

/**
* 同步发送调用
*/
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

/**
* 响应是否有效
*/
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);

RegisterBrokerResult result = new RegisterBrokerResult();
/**
* 处理broker为slave的场景
*/
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());

if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}

throw new MQBrokerException(response.getCode(), response.getRemark());
}

broker端注册broker主要是告诉namesrv自身的信息和broker所支持topic的信息,以便producer、consumer去namesrv查询broker的时候查询不到。

namesrv端

在com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor文件中处理请求信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* broker注册请求
*/
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
/**
* 3.0.11以后增加过滤服务,加以区分
*/
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
else {
return this.registerBroker(ctx, request);
}

具体的处理逻辑在registerBrokerWithFilterServer中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* @描述: 注册代理服务器
* @修改内容
* @参数:
* @return RemotingCommand
* @throws
*/
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

if (request.getBody() != null) {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
}

//在路由信息中注册broker和维护topic关系
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel()
);

responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());


byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);

response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager文件中实现了具体的注册逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/**
* @描述: 在namesrv上注册broker
* @修改内容
* @参数:@param clusterName 集群名称
* @参数:@param brokerAddr broker地址
* @参数:@param brokerName broker名称
* @参数:@param brokerId brokerid 区分master和slave
* @参数:@param haServerAddr ha地址
* @参数:@param topicConfigWrapper topic的配置信息
* @参数:@param filterServerList filter地址
* @参数:@param channel broker的socket通道
* @参数:@return RegisterBrokerResult
* @throws
*/
public RegisterBrokerResult registerBroker(//
final String clusterName,// 1
final String brokerAddr,// 2
final String brokerName,// 3
final long brokerId,// 4
final String haServerAddr,// 5
final TopicConfigSerializeWrapper topicConfigWrapper,// 6
final List<String> filterServerList, // 7
final Channel channel// 8
)
{
/**
* 注册结果
*/
RegisterBrokerResult result = new RegisterBrokerResult();

try {
try
{
/**
*加锁
*/
this.lock.writeLock().lockInterruptibly();

/**
* 根据broker注册请求的集群名称获取对应的broker name集合
*/
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);

/**
* 如果该集群对应的broker为空
* 需要新建集群对应的broker集合存储对象
*/
if (null == brokerNames) {
brokerNames = new HashSet<String>();
/**
* 维护集群名称和集群broker集合对象的关系
*/
this.clusterAddrTable.put(clusterName, brokerNames);
}

/**
* 把broker名称加到集群的broker集合中去
*/
brokerNames.add(brokerName);

/**
* 默认不是第一次注册
*/
boolean registerFirst = false;

/**
* 获取broker的信息
*/
BrokerData brokerData = this.brokerAddrTable.get(brokerName);

/**
* 如果没有查到broker信息 则创建
*/
if (null == brokerData)
{
/**
* 是第一次注册
*/
registerFirst = true;
brokerData = new BrokerData();
brokerData.setBrokerName(brokerName);
/**
* 该broker对应的地址
*/
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerData.setBrokerAddrs(brokerAddrs);

/**
* 维护broker地址表格数据
*/
this.brokerAddrTable.put(brokerName, brokerData);
}

/**
* 把broker的id和地址加入到broker地址集合中去
* 通过brokerid区分主-从 broker
*/
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

/**
* 再次判定该broker是否是第一次创建
*/
registerFirst = registerFirst || (null == oldAddr);

/**
* 如果topic配置存在
* 同时该broker是主broker的话
*/
if (null != topicConfigWrapper //
&& MixAll.MASTER_ID == brokerId)
{
/**
* 如果topic的配置有变化
* 或者是第一次注册broker的话
* 需要创建或者更新broker对应的队列信息
*/
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
|| registerFirst)
{
ConcurrentHashMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet())
{
/**
* 创建、更新队列配置数据
*/
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}

/**
* 把当前注册的broker放入到broker有效集合
*/
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, //
new BrokerLiveInfo(//
System.currentTimeMillis(), //
topicConfigWrapper.getDataVersion(),//
channel, //
haServerAddr));

/**
* 第一次加入
*/
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
}

/**
* 如果过滤服务器对象不为null
*/
if (filterServerList != null)
{
if (filterServerList.isEmpty())
{
/**
* 如果是空的,说明需要删除
*/
this.filterServerTable.remove(brokerAddr);
}
else
{
/**
* 需要添加或者更新
*/
this.filterServerTable.put(brokerAddr, filterServerList);
}
}


/**
* slave的broker的处理
*/
if (MixAll.MASTER_ID != brokerId)
{
/**
* 获取master broker地址
*/
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null)
{
/**
* 如果当前master的broker存在的话
* 把master的地址和ha地址发送给slave broker
*/
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}
private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
return true;
}

return false;
}

服务端处理broker注册请求逻辑就完成了。

文章目录
  1. 1. broker端
    1. 1.1. topic的获取
    2. 1.2. 同步到namesrv
  2. 2. namesrv端