rocketmq中broker启动同步topic到namesrv过程.
broker端
topic的获取
broker启动会初始化一些系统或者用户自建的topic,在TopicConfigManager中实现,构造TopicConfigManager的时候初始化系统内置的topic,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 
 | public TopicConfigManager(BrokerController brokerController) {this.brokerController = brokerController;
 {
 
 String topic = MixAll.SELF_TEST_TOPIC;
 TopicConfig topicConfig = new TopicConfig(topic);
 this.systemTopicList.add(topic);
 topicConfig.setReadQueueNums(1);
 topicConfig.setWriteQueueNums(1);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
 }
 {
 
 if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
 String topic = MixAll.DEFAULT_TOPIC;
 TopicConfig topicConfig = new TopicConfig(topic);
 this.systemTopicList.add(topic);
 topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
 .getDefaultTopicQueueNums());
 topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
 .getDefaultTopicQueueNums());
 int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
 topicConfig.setPerm(perm);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
 }
 }
 {
 // MixAll.BENCHMARK_TOPIC
 String topic = MixAll.BENCHMARK_TOPIC;
 TopicConfig topicConfig = new TopicConfig(topic);
 this.systemTopicList.add(topic);
 topicConfig.setReadQueueNums(1024);
 topicConfig.setWriteQueueNums(1024);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
 }
 {
 
 String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
 TopicConfig topicConfig = new TopicConfig(topic);
 this.systemTopicList.add(topic);
 int perm = PermName.PERM_INHERIT;
 if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
 perm |= PermName.PERM_READ | PermName.PERM_WRITE;
 }
 topicConfig.setPerm(perm);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
 }
 {
 
 String topic = this.brokerController.getBrokerConfig().getBrokerName();
 TopicConfig topicConfig = new TopicConfig(topic);
 this.systemTopicList.add(topic);
 int perm = PermName.PERM_INHERIT;
 if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
 perm |= PermName.PERM_READ | PermName.PERM_WRITE;
 }
 topicConfig.setReadQueueNums(1);
 topicConfig.setWriteQueueNums(1);
 topicConfig.setPerm(perm);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
 }
 {
 // MixAll.OFFSET_MOVED_EVENT
 String topic = MixAll.OFFSET_MOVED_EVENT;
 TopicConfig topicConfig = new TopicConfig(topic);
 this.systemTopicList.add(topic);
 topicConfig.setReadQueueNums(1);
 topicConfig.setWriteQueueNums(1);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
 }
 }
 
 | 
用户自建的topic会通过TopicConfigManager的父类ConfigManager.load方法把本地存储的${user.home}/store/config/topic.json 加载出来,然后通过decode方法把json的数据转换出来保存到topicConfigTable表里面去。
这样topic数据就获取到了。
当然第一次运行broker的时候,只有系统内建的topic。
同步到namesrv
启动broker服务中,通过调用BrokerController.start()方法启动broker。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 
 | 
 
 public void start() throws Exception
 {
 if (this.messageStore != null) {
 this.messageStore.start();
 }
 
 if (this.remotingServer != null) {
 this.remotingServer.start();
 }
 
 if (this.fastRemotingServer != null) {
 this.fastRemotingServer.start();
 }
 
 if (this.brokerOuterAPI != null) {
 this.brokerOuterAPI.start();
 }
 
 if (this.pullRequestHoldService != null) {
 this.pullRequestHoldService.start();
 }
 
 if (this.clientHousekeepingService != null) {
 this.clientHousekeepingService.start();
 }
 
 if (this.filterServerManager != null) {
 this.filterServerManager.start();
 }
 
 
 
 
 this.registerBrokerAll(true, false);
 
 
 
 
 
 
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
 @Override
 public void run() {
 try {
 BrokerController.this.registerBrokerAll(true, false);
 } catch (Throwable e) {
 log.error("registerBrokerAll Exception", e);
 }
 }
 }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
 
 if (this.brokerStatsManager != null) {
 this.brokerStatsManager.start();
 }
 
 if (this.brokerFastFailure != null) {
 this.brokerFastFailure.start();
 }
 }
 
 | 
启动过程中调用了registerBrokerAll方法,注册broker到namesrv上去。
然后又启动一个定时注册任务来保证broker在namesrv上的可用性。
registerBrokerAll定义如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 
 | public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
 
 
 
 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
 
 
 
 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
 || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission()))
 {
 ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
 for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
 TopicConfig tmp =
 new TopicConfig(topicConfig.getTopicName(),
 topicConfig.getReadQueueNums(),
 topicConfig.getWriteQueueNums(),
 this.brokerConfig.getBrokerPermission());
 topicConfigTable.put(topicConfig.getTopicName(), tmp);
 }
 topicConfigWrapper.setTopicConfigTable(topicConfigTable);
 }
 
 
 RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
 this.brokerConfig.getBrokerClusterName(),
 this.getBrokerAddr(),
 this.brokerConfig.getBrokerName(),
 this.brokerConfig.getBrokerId(),
 this.getHAServerAddr(),
 topicConfigWrapper,
 this.filterServerManager.buildNewFilterServerList(),
 oneway,
 this.brokerConfig.getRegisterBrokerTimeoutMills());
 
 
 
 
 if (registerBrokerResult != null) {
 if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
 this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
 }
 
 this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
 
 if (checkOrderConfig) {
 this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
 }
 }
 }
 
 | 
brokerOuterAPI.registerBrokerAll定义如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 
 | 
 
 
 
 
 
 
 
 
 
 
 
 
 
 public RegisterBrokerResult registerBrokerAll(
 final String clusterName,
 final String brokerAddr,
 final String brokerName,
 final long brokerId,
 final String haServerAddr,
 final TopicConfigSerializeWrapper topicConfigWrapper,
 final List<String> filterServerList,
 final boolean oneway,
 final int timeoutMills
 ) {
 RegisterBrokerResult registerBrokerResult = null;
 
 
 
 
 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
 
 if (nameServerAddressList != null) {
 for (String namesrvAddr : nameServerAddressList) {
 try {
 
 
 
 RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
 haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
 if (result != null) {
 registerBrokerResult = result;
 }
 
 log.info("register broker to name server {} OK", namesrvAddr);
 } catch (Exception e) {
 log.warn("registerBroker Exception, " + namesrvAddr, e);
 }
 }
 }
 
 return registerBrokerResult;
 }
 
 | 
真实注册逻辑在registerBroker实现方式如:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 
 | 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 private RegisterBrokerResult registerBroker(
 final String namesrvAddr,
 final String clusterName,
 final String brokerAddr,
 final String brokerName,
 final long brokerId,
 final String haServerAddr,
 final TopicConfigSerializeWrapper topicConfigWrapper,
 final List<String> filterServerList,
 final boolean oneway,
 final int timeoutMills
 ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
 InterruptedException
 {
 
 
 
 
 RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
 requestHeader.setBrokerAddr(brokerAddr);
 requestHeader.setBrokerId(brokerId);
 requestHeader.setBrokerName(brokerName);
 requestHeader.setClusterName(clusterName);
 requestHeader.setHaServerAddr(haServerAddr);
 
 
 
 
 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
 
 
 
 
 RegisterBrokerBody requestBody = new RegisterBrokerBody();
 
 
 
 
 requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
 requestBody.setFilterServerList(filterServerList);
 
 
 
 
 request.setBody(requestBody.encode());
 
 
 
 
 if (oneway) {
 try {
 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
 } catch (RemotingTooMuchRequestException e) {
 }
 
 
 
 
 return null;
 }
 
 
 
 
 RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
 
 
 
 
 assert response != null;
 switch (response.getCode()) {
 case ResponseCode.SUCCESS: {
 RegisterBrokerResponseHeader responseHeader =
 (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
 
 RegisterBrokerResult result = new RegisterBrokerResult();
 
 
 
 result.setMasterAddr(responseHeader.getMasterAddr());
 result.setHaServerAddr(responseHeader.getHaServerAddr());
 
 if (response.getBody() != null) {
 result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
 }
 return result;
 }
 default:
 break;
 }
 
 throw new MQBrokerException(response.getCode(), response.getRemark());
 }
 
 | 
broker端注册broker主要是告诉namesrv自身的信息和broker所支持topic的信息,以便producer、consumer去namesrv查询broker的时候查询不到。
namesrv端
在com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor文件中处理请求信息。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | 
 
 case RequestCode.REGISTER_BROKER:
 Version brokerVersion = MQVersion.value2Version(request.getVersion());
 
 
 
 if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
 return this.registerBrokerWithFilterServer(ctx, request);
 }
 else {
 return this.registerBroker(ctx, request);
 }
 
 | 
具体的处理逻辑在registerBrokerWithFilterServer中
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 
 | 
 
 
 
 
 
 public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
 throws RemotingCommandException {
 final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
 final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
 final RegisterBrokerRequestHeader requestHeader =
 (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
 RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
 
 if (request.getBody() != null) {
 registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
 } else {
 registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
 registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
 }
 
 
 RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
 requestHeader.getClusterName(),
 requestHeader.getBrokerAddr(),
 requestHeader.getBrokerName(),
 requestHeader.getBrokerId(),
 requestHeader.getHaServerAddr(),
 registerBrokerBody.getTopicConfigSerializeWrapper(),
 registerBrokerBody.getFilterServerList(),
 ctx.channel()
 );
 
 responseHeader.setHaServerAddr(result.getHaServerAddr());
 responseHeader.setMasterAddr(result.getMasterAddr());
 
 
 byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
 response.setBody(jsonValue);
 
 response.setCode(ResponseCode.SUCCESS);
 response.setRemark(null);
 return response;
 }
 
 | 
com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager文件中实现了具体的注册逻辑
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 
 | 
 
 
 
 
 
 
 
 
 
 
 
 
 public RegisterBrokerResult registerBroker(
 final String clusterName,
 final String brokerAddr,
 final String brokerName,
 final long brokerId,
 final String haServerAddr,
 final TopicConfigSerializeWrapper topicConfigWrapper,
 final List<String> filterServerList,
 final Channel channel
 )
 {
 
 
 
 RegisterBrokerResult result = new RegisterBrokerResult();
 
 try {
 try
 {
 
 
 
 this.lock.writeLock().lockInterruptibly();
 
 
 
 
 Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
 
 
 
 
 
 if (null == brokerNames) {
 brokerNames = new HashSet<String>();
 
 
 
 this.clusterAddrTable.put(clusterName, brokerNames);
 }
 
 
 
 
 brokerNames.add(brokerName);
 
 
 
 
 boolean registerFirst = false;
 
 
 
 
 BrokerData brokerData = this.brokerAddrTable.get(brokerName);
 
 
 
 
 if (null == brokerData)
 {
 
 
 
 registerFirst = true;
 brokerData = new BrokerData();
 brokerData.setBrokerName(brokerName);
 
 
 
 HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
 brokerData.setBrokerAddrs(brokerAddrs);
 
 
 
 
 this.brokerAddrTable.put(brokerName, brokerData);
 }
 
 
 
 
 
 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
 
 
 
 
 registerFirst = registerFirst || (null == oldAddr);
 
 
 
 
 
 if (null != topicConfigWrapper
 && MixAll.MASTER_ID == brokerId)
 {
 
 
 
 
 
 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
 || registerFirst)
 {
 ConcurrentHashMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
 if (tcTable != null) {
 for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet())
 {
 
 
 
 this.createAndUpdateQueueData(brokerName, entry.getValue());
 }
 }
 }
 }
 
 
 
 
 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
 new BrokerLiveInfo(
 System.currentTimeMillis(),
 topicConfigWrapper.getDataVersion(),
 channel,
 haServerAddr));
 
 
 
 
 if (null == prevBrokerLiveInfo) {
 log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
 }
 
 
 
 
 if (filterServerList != null)
 {
 if (filterServerList.isEmpty())
 {
 
 
 
 this.filterServerTable.remove(brokerAddr);
 }
 else
 {
 
 
 
 this.filterServerTable.put(brokerAddr, filterServerList);
 }
 }
 
 
 
 
 
 if (MixAll.MASTER_ID != brokerId)
 {
 
 
 
 String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
 if (masterAddr != null)
 {
 
 
 
 
 BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
 if (brokerLiveInfo != null) {
 result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
 result.setMasterAddr(masterAddr);
 }
 }
 }
 } finally {
 this.lock.writeLock().unlock();
 }
 } catch (Exception e) {
 log.error("registerBroker Exception", e);
 }
 
 return result;
 }
 private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
 BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
 if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
 return true;
 }
 
 return false;
 }
 
 | 
服务端处理broker注册请求逻辑就完成了。