当前位置: 首页 > news >正文

丘里奇网站排名网站建设十年杜绝模板

丘里奇网站排名,网站建设十年杜绝模板,室内设计网站有哪些比较好,南京做网站优化的公司NameServer 作为注册中心#xff0c;提供路由注册、路由踢出、路由发现功能#xff0c;舍弃强一致#xff0c;保证高可用#xff0c;集群中各个节点不会实时通讯#xff0c;其中一个节点下线之后#xff0c;会提供另外一个节点保证路由功能。 启动入口 org.apache.rock… NameServer 作为注册中心提供路由注册、路由踢出、路由发现功能舍弃强一致保证高可用集群中各个节点不会实时通讯其中一个节点下线之后会提供另外一个节点保证路由功能。 启动入口 org.apache.rocketmq.namesrv.NamesrvStartup#main0 public static void main0(String[] args) {try {//启动namesrv之前的准备命令行准备、parseCommandlineAndConfigFile(args);//创建namesrv控制器createAndStartNamesrvController();} catch (Throwable e) {e.printStackTrace();System.exit(-1);} }public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();// 1.构建命令行参数Options options ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine ServerUtil.parseCmdLine(mqnamesrv, args, buildCommandlineOptions(options), new PosixParser());if (null commandLine) {System.exit(-1);return;}// 2.创建namesrvconfig对象 namesrvConfig new NamesrvConfig();// 3、创建netty配置监听9876端口nettyServerConfig new NettyServerConfig();nettyClientConfig new NettyClientConfig();nettyServerConfig.setListenPort(9876);controllerConfig new ControllerConfig();//4、解析启动命令-c参数if (commandLine.hasOption(c)) {String file commandLine.getOptionValue(c);if (file ! null) {InputStream in new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, controllerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf(load config properties file OK, %s%n, file);in.close();}}//5、解析启动命令-p参数if (commandLine.hasOption(p)) {MixAll.printObjectProperties(null, namesrvConfig);MixAll.printObjectProperties(null, nettyServerConfig);MixAll.printObjectProperties(null, nettyClientConfig);MixAll.printObjectProperties(null, controllerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null namesrvConfig.getRocketmqHome()) {System.out.printf(Please set the %s variable in your environment to match the location of the RocketMQ installation%n, MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator new JoranConfigurator();configurator.setContext(lc);lc.reset();//处理配置信息configurator.doConfigure(namesrvConfig.getRocketmqHome() /conf/logback_namesrv.xml);log InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);} public static void createAndStartNamesrvController() throws Exception {NamesrvController controller createNamesrvController();start(controller);String tip The Name Server boot success. serializeType RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf(%s%n, tip); } org.apache.rocketmq.namesrv.NamesrvController#initializepublic boolean initialize() {loadConfig();//实例化netty服务端和客户端initiateNetworkComponents();initiateThreadExecutors();registerProcessor();startScheduleService();initiateSslContext();initiateRpcHooks();return true;} org.apache.rocketmq.namesrv.NamesrvController#startpublic void start() throws Exception {//this.remotingServer.start();// In test scenarios where it is up to OS to pick up an available port, set the listening port back to configif (0 nettyServerConfig.getListenPort()) {nettyServerConfig.setListenPort(this.remotingServer.localListenPort());}this.remotingClient.updateNameServerAddressList(Collections.singletonList(RemotingUtil.getLocalAddress() : nettyServerConfig.getListenPort()));this.remotingClient.start();if (this.fileWatchService ! null) {this.fileWatchService.start();}this.routeInfoManager.start();} //启动bootstrap、channel、bind org.apache.rocketmq.remoting.netty.NettyRemotingServer#startOverridepublic void start() {this.defaultEventExecutorGroup new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private final AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, NettyServerCodecThread_ this.threadIndex.incrementAndGet());}});prepareSharableHandlers();serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});addCustomConfig(serverBootstrap);try {ChannelFuture sync serverBootstrap.bind().sync();InetSocketAddress addr (InetSocketAddress) sync.channel().localAddress();if (0 nettyServerConfig.getListenPort()) {this.nettyServerConfig.setListenPort(addr.getPort());}log.info(RemotingServer started, listening {}:{}, this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort());this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);} catch (Exception e) {throw new IllegalStateException(String.format(Failed to bind to %s:%d, nettyServerConfig.getBindAddress(),nettyServerConfig.getListenPort()), e);}if (this.channelEventListener ! null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error(scanResponseTable exception, e);}}}, 1000 * 3, 1000);} //启动客户端 org.apache.rocketmq.remoting.netty.NettyRemotingClient#startpublic void start() {if (this.defaultEventExecutorGroup null) {this.defaultEventExecutorGroup new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, NettyClientWorkerThread_ this.threadIndex.incrementAndGet());}});}Bootstrap handler this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null ! sslContext) {pipeline.addFirst(defaultEventExecutorGroup, sslHandler, sslContext.newHandler(ch.alloc()));LOGGER.info(Prepend SSL handler);} else {LOGGER.warn(Connections are insecure as SSLContext is null!);}}ch.pipeline().addLast(nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});if (nettyClientConfig.getClientSocketSndBufSize() 0) {LOGGER.info(client set SO_SNDBUF to {}, nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}if (nettyClientConfig.getClientSocketRcvBufSize() 0) {LOGGER.info(client set SO_RCVBUF to {}, nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() 0 nettyClientConfig.getWriteBufferHighWaterMark() 0) {LOGGER.info(client set netty WRITE_BUFFER_WATER_MARK to {},{},nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) {handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}this.timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {LOGGER.error(scanResponseTable exception, e);}}}, 1000 * 3, 1000);// this.timer.scheduleAtFixedRate(new TimerTask() { // Override // public void run() { // try { // NettyRemotingClient.this.scanChannelTablesOfNameServer(); // } catch (Exception e) { // LOGGER.error(scanChannelTablesOfNameServer exception, e); // } // } // }, 1000 * 3, 10 * 1000);this.timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {try {NettyRemotingClient.this.scanAvailableNameSrv();} catch (Exception e) {LOGGER.error(scanAvailableNameSrv exception, e);}}}, 0, this.nettyClientConfig.getConnectTimeoutMillis());} 路由注册 1、Broker服务每隔30秒向Namesrv发送一个心跳包。 org.apache.rocketmq.broker.BrokerController#start public void start() throws Exception {this.shouldStartTime System.currentTimeMillis() messageStoreConfig.getDisappearTimeAfterStart();if (messageStoreConfig.getTotalReplicas() 1 this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {isIsolated true;}if (this.brokerOuterAPI ! null) {this.brokerOuterAPI.start();}startBasicService();if (!isIsolated !this.messageStoreConfig.isEnableDLegerCommitLog() !this.messageStoreConfig.isDuplicationEnable()) {changeSpecialServiceStatus(this.brokerConfig.getBrokerId() MixAll.MASTER_ID);this.registerBrokerAll(true, false, true);}scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {Overridepublic void run2() {try {if (System.currentTimeMillis() shouldStartTime) {BrokerController.LOG.info(Register to namesrv after {}, shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info(Skip register for broker is isolated);return;}BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error(registerBrokerAll Exception, e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));if (this.brokerConfig.isEnableSlaveActingMaster()) {scheduleSendHeartbeat();scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {Overridepublic void run2() {try {BrokerController.this.syncBrokerMemberGroup();} catch (Throwable e) {BrokerController.LOG.error(sync BrokerMemberGroup error. , e);}}}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));}if (this.brokerConfig.isEnableControllerMode()) {scheduleSendHeartbeat();}if (brokerConfig.isSkipPreOnline()) {startServiceWithoutCondition();}} org.apache.rocketmq.broker.BrokerController#scheduleSendHeartbeatprotected void scheduleSendHeartbeat() {scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {Overridepublic void run2() {if (isIsolated) {return;}try {BrokerController.this.sendHeartbeat();} catch (Exception e) {BrokerController.LOG.error(sendHeartbeat Exception, e);}}}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));} 2、broker向namesrv注册服务信息topic信息、queue信息、集群信息。 2-1、broker利用netty发送服务信息 //org.apache.rocketmq.broker.BrokerController#registerBrokerAll public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {TopicConfigAndMappingSerializeWrapper topicConfigWrapper new TopicConfigAndMappingSerializeWrapper();topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(entry - new AbstractMap.SimpleImmutableEntry(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMapString, TopicConfig topicConfigTable new ConcurrentHashMap();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),topicConfig.getPerm() this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isInBrokerContainer())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {if (shutdown) {BrokerController.LOG.info(BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.);return;}ListRegisterBrokerResult registerBrokerResultList this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isEnableSlaveActingMaster(),this.brokerConfig.isCompressedRegister(),this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,this.getBrokerIdentity());handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);} 将broker的信息通过netty发送到namesrv上。 //org.apache.rocketmq.broker.BrokerController#handleRegisterBrokerResult protected void handleRegisterBrokerResult(ListRegisterBrokerResult registerBrokerResultList,boolean checkOrderConfig) {for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {if (registerBrokerResult ! null) {if (this.updateMasterHAServerAddrPeriodically registerBrokerResult.getHaServerAddr() ! null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}break;}}} org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeatToController org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeatViaDataVersion org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeat/*** Send heartbeat to controller*/public void sendHeartbeatToController(final String controllerAddress,final String clusterName,final String brokerAddr,final String brokerName,final Long brokerId,final int timeoutMills,final boolean isInBrokerContainer,final int epoch,final long maxOffset,final long confirmOffset) {if (StringUtils.isEmpty(controllerAddress)) {return;}final BrokerHeartbeatRequestHeader requestHeader new BrokerHeartbeatRequestHeader();requestHeader.setClusterName(clusterName);requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerName(brokerName);requestHeader.setEpoch(epoch);requestHeader.setMaxOffset(maxOffset);requestHeader.setConfirmOffset(confirmOffset);brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {Overridepublic void run2() {RemotingCommand request RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);try {//使用netty客户端发送数据BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, timeoutMills);} catch (Exception e) {LOGGER.error(Error happen when send heartbeat to controller {}, controllerAddress, e);}}});} org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOnewayOverridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {final Channel channel this.getAndCreateChannel(addr);if (channel ! null channel.isActive()) {try {doBeforeRpcHooks(addr, request);this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {LOGGER.warn(invokeOneway: send request exception, so close the channel[{}], addr);this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}} org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImplpublic void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();boolean acquired this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener((ChannelFutureListener) f - {once.release();if (!f.isSuccess()) {log.warn(send a request command to channel channel.remoteAddress() failed.);}});} catch (Exception e) {once.release();log.warn(write send a request command to channel channel.remoteAddress() failed.);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis 0) {throw new RemotingTooMuchRequestException(invokeOnewayImpl invoke too fast);} else {String info String.format(invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d,timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}}2-2、namesrv接受并且注册服务信息到列表  org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final ListString filterServerList,final Channel channel) {RegisterBrokerResult result new RegisterBrokerResult();try {this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSetString brokerNames ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMapString, SetString) this.clusterAddrTable, clusterName, k - new HashSet());brokerNames.add(brokerName);boolean registerFirst false;BrokerData brokerData this.brokerAddrTable.get(brokerName);if (null brokerData) {registerFirst true;brokerData new BrokerData(clusterName, brokerName, new HashMap());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker enableActingMaster null;brokerData.setEnableActingMaster(!isOldVersionBroker enableActingMaster);brokerData.setZoneName(zoneName);MapLong, String brokerAddrsMap brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged false;long prevMinBrokerId 0;if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId Collections.min(brokerAddrsMap.keySet());}if (brokerId prevMinBrokerId) {isMinBrokerIdChanged true;}//Switch slave to master: first remove 1, IP:PORT in namesrv, then add 0, IP:PORT//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item - null ! brokerAddr brokerAddr.equals(item.getValue()) brokerId ! item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr brokerAddrsMap.get(brokerId);if (null ! oldBrokerAddr !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null ! oldBrokerInfo) {long oldStateVersion oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion newStateVersion) {log.warn(Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.,clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) topicConfigWrapper.getTopicConfigTable().size() 1) {log.warn(Cant register topicConfigWrapper{} because broker[{}]{} has not registered.,topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr brokerAddrsMap.put(brokerId, brokerAddr);registerFirst registerFirst || (StringUtils.isEmpty(oldAddr));boolean isMaster MixAll.MASTER_ID brokerId;boolean isPrimeSlave !isOldVersionBroker !isMaster brokerId Collections.min(brokerAddrsMap.keySet());if (null ! topicConfigWrapper (isMaster || isPrimeSlave)) {ConcurrentMapString, TopicConfig tcTable topicConfigWrapper.getTopicConfigTable();if (tcTable ! null) {for (Map.EntryString, TopicConfig entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() (~PermName.PERM_WRITE));}this.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);MapString, TopicQueueMappingInfo topicQueueMappingInfoMap mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.EntryString, TopicQueueMappingInfo entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}BrokerAddrInfo brokerAddrInfo new BrokerAddrInfo(clusterName, brokerAddr);BrokerLiveInfo prevBrokerLiveInfo this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null prevBrokerLiveInfo) {log.info(new broker registered, {} HAService: {}, brokerAddrInfo, haServerAddr);}if (filterServerList ! null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID ! brokerId) {String masterAddr brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr ! null) {BrokerAddrInfo masterAddrInfo new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo ! null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}if (isMinBrokerIdChanged namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error(registerBroker Exception, e);} finally {this.lock.writeLock().unlock();}return result;} 路由剔除 NameServer和每个Broker保持长连接每隔30秒接收Broker发送的心跳包同时自身每个10秒扫描BrokerLiveTable比较上次收到心跳时间和当前时间比较是否大于120秒如果超过那么认为Broker不可用剔除路由表中该Broker相关信息。 org.apache.rocketmq.namesrv.NamesrvController#startScheduleService定时扫描 public void scanNotActiveBroker() {try {log.info(start scanNotActiveBroker);for (EntryBrokerAddrInfo, BrokerLiveInfo next : this.brokerLiveTable.entrySet()) {long last next.getValue().getLastUpdateTimestamp();long timeoutMillis next.getValue().getHeartbeatTimeoutMillis();if ((last timeoutMillis) System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());log.warn(The broker channel expired, {} {}ms, next.getKey(), timeoutMillis);this.onChannelDestroy(next.getKey());}}} catch (Exception e) {log.error(scanNotActiveBroker exception, e);} }public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {UnRegisterBrokerRequestHeader unRegisterRequest new UnRegisterBrokerRequestHeader();boolean needUnRegister false;if (brokerAddrInfo ! null) {try {try {this.lock.readLock().lockInterruptibly();needUnRegister setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error(onChannelDestroy Exception, e);}}if (needUnRegister) {//unregistrationQueue.offer(unRegisterRequest);boolean result this.submitUnRegisterBrokerRequest(unRegisterRequest);log.info(the brokers channel destroyed, submit the unregister request at once, broker info: {}, submit result: {}, unRegisterRequest, result);}}private boolean setupUnRegisterRequest(UnRegisterBrokerRequestHeader unRegisterRequest,BrokerAddrInfo brokerAddrInfo) {unRegisterRequest.setClusterName(brokerAddrInfo.getClusterName());unRegisterRequest.setBrokerAddr(brokerAddrInfo.getBrokerAddr());for (EntryString, BrokerData stringBrokerDataEntry : this.brokerAddrTable.entrySet()) {BrokerData brokerData stringBrokerDataEntry.getValue();if (!brokerAddrInfo.getClusterName().equals(brokerData.getCluster())) {continue;}for (EntryLong, String entry : brokerData.getBrokerAddrs().entrySet()) {Long brokerId entry.getKey();String brokerAddr entry.getValue();if (brokerAddr.equals(brokerAddrInfo.getBrokerAddr())) {unRegisterRequest.setBrokerName(brokerData.getBrokerName());unRegisterRequest.setBrokerId(brokerId);return true;}}}return false;} 路由发现 路由发现不是实时的路由变化后NameServer不主动推给客户端等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用
http://wiki.neutronadmin.com/news/415366/

相关文章:

  • 宿迁宿豫网站建设视频制作表情包
  • 如何分析网站竞争对手徐州领航装饰工程有限公司
  • 百度电脑版登录网站冯耀宗seo课程
  • 苏州建网站用路由器建设网站
  • 建立自己的网站费用长沙百度百科
  • 校园网网站建设黑龙江网站建设
  • 上海免费网站建设模板推荐判断网站的好坏
  • 装饰公司怎么做网站河南建达工程建设监理公司网站
  • 顺德网站开发网站建设规划面试技巧
  • 永城城乡建设局网站网站怎么做镜像
  • 电商网站设计系统阳江市新增确诊病例
  • 桂林北站防疫电话营销网站的建设流程
  • 昆山网站建设书生商友二级网站怎么建
  • 英文网站域名注册刚做的网站为什么搜索不到
  • 钓鱼转转网站在线生成软件东莞做一个企业网站
  • 青岛公司建设网站移动惠生活app下载网址
  • 大同网站建设熊掌号wordpress腾讯cos
  • 网站开发 上海wordpress可以自动采集吗
  • 有用cc域名做网站的福建网站开发公司电话
  • 怎么引导做淘宝的客户做官方网站网站架构师工资
  • Wordpress在中国建站网站精美排版代码
  • 重庆建设工程信息网官网中苏业盛网站怎样做优化调整
  • 网站在哪里找广元网站设计
  • 网站建设支出账务处理flash在线制作网站
  • 网站编辑知识开发微信公众号需要多少钱
  • 设计网站如何推广企业网站脚本语言
  • 如何建设一个专业的网站公司域名备案全部过程
  • 网站建设的用途是什么意思网页设计实训总结心得体会
  • 广州优秀网站设计瑞安网站建设优化推广
  • seo在线优化工具seo就业前景