rocketmq中broker启动同步topic到namesrv过程.
broker端
topic的获取
broker启动会初始化一些系统或者用户自建的topic,在TopicConfigManager中实现,构造TopicConfigManager的时候初始化系统内置的topic,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| public TopicConfigManager(BrokerController brokerController) { this.brokerController = brokerController; { String topic = MixAll.SELF_TEST_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.DEFAULT_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } } { // MixAll.BENCHMARK_TOPIC String topic = MixAll.BENCHMARK_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1024); topicConfig.setWriteQueueNums(1024); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } {
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; } topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } {
String topic = this.brokerController.getBrokerConfig().getBrokerName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; } topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { // MixAll.OFFSET_MOVED_EVENT String topic = MixAll.OFFSET_MOVED_EVENT; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } }
|
用户自建的topic会通过TopicConfigManager的父类ConfigManager.load方法把本地存储的${user.home}/store/config/topic.json 加载出来,然后通过decode方法把json的数据转换出来保存到topicConfigTable表里面去。
这样topic数据就获取到了。
当然第一次运行broker的时候,只有系统内建的topic。
同步到namesrv
启动broker服务中,通过调用BrokerController.start()方法启动broker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
public void start() throws Exception { if (this.messageStore != null) { this.messageStore.start(); }
if (this.remotingServer != null) { this.remotingServer.start(); }
if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); }
if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); }
if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); }
if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); }
if (this.filterServerManager != null) { this.filterServerManager.start(); }
this.registerBrokerAll(true, false);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); }
if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } }
|
启动过程中调用了registerBrokerAll方法,注册broker到namesrv上去。
然后又启动一个定时注册任务来保证broker在namesrv上的可用性。
registerBrokerAll定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); }
RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills());
if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); }
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } }
|
brokerOuterAPI.registerBrokerAll定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
public RegisterBrokerResult registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills ) { RegisterBrokerResult registerBrokerResult = null;
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null) { for (String namesrvAddr : nameServerAddressList) { try {
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); if (result != null) { registerBrokerResult = result; }
log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, " + namesrvAddr, e); } } }
return registerBrokerResult; }
|
真实注册逻辑在registerBroker实现方式如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
private RegisterBrokerResult registerBroker( final String namesrvAddr, final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList);
request.setBody(requestBody.encode());
if (oneway) { try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { }
return null; }
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; }
throw new MQBrokerException(response.getCode(), response.getRemark()); }
|
broker端注册broker主要是告诉namesrv自身的信息和broker所支持topic的信息,以便producer、consumer去namesrv查询broker的时候查询不到。
namesrv端
在com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor文件中处理请求信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); }
|
具体的处理逻辑在registerBrokerWithFilterServer中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) { registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class); } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0); }
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel() );
responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
|
com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager文件中实现了具体的注册逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel ) {
RegisterBrokerResult result = new RegisterBrokerResult(); try { try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) { brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames); }
brokerNames.add(brokerName);
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true; brokerData = new BrokerData(); brokerData.setBrokerName(brokerName);
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>(); brokerData.setBrokerAddrs(brokerAddrs);
this.brokerAddrTable.put(brokerName, brokerData); }
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentHashMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } }
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));
if (null == prevBrokerLiveInfo) { log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); }
if (filterServerList != null) { if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr); } else {
this.filterServerTable.put(brokerAddr, filterServerList); } }
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); }
return result; } private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) { BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr); if (null == prev || !prev.getDataVersion().equals(dataVersion)) { return true; }
return false; }
|
服务端处理broker注册请求逻辑就完成了。