河源哪有做网站,电商企业网页设计,遂宁网站建设略奥网络,品牌传播策略目录
说明
认识注册中心
Nacos架构图
Nacos服务注册与发现实现原理总览
SpringCloud服务注册规范
服务注册
心跳机制与健康检查
服务发现
主流服务注册中心对比
小小收获 说明
本篇文章主要目的是从头到尾比较粗粒度的分析Nacos作为注册中心的一些实现#xff0c;很…目录
说明
认识注册中心
Nacos架构图
Nacos服务注册与发现实现原理总览
SpringCloud服务注册规范
服务注册
心跳机制与健康检查
服务发现
主流服务注册中心对比
小小收获 说明
本篇文章主要目的是从头到尾比较粗粒度的分析Nacos作为注册中心的一些实现很多细节没有涉及希望能给大家带来一定的启发。其中的源码是1.x版本的虽然和2.x版本会有不同但它们实现注册中心的思路都是类似。对于服务中心当我们了解了一个的实现原理知道了它的技术本质之后再去了解和学习其他注册中心就会更加游刃有余因为它们的设计思想是相通的解决的问题是一样的。如果大家对其中更多的实现细节感兴趣可以留言区留言大家一起讨论。下面就让我们一起开始它的探索之旅吧
认识注册中心
如果没有注册中心情况很可能是这样的服务消费者需要在本地维护一个服务提供者的节点列表如果服务提供者有新上线的节点或者有旧节点需要下线服务消费者都需要及时去同步删除对应的节点信息。注册中心的出现将所有的服务节点信息集中管理并将前面提到的这些事情全部自动化。
在微服务架构下注册中心的作用主要体现在下面几个方面
服务地址管理服务注册服务动态感知
Nacos架构图
学习任何技术我们首先看下它官方的架构图有个整体的认识。Nacos架构图如下 核心内容就是Nacos Server作为Nacos的服务端其中的Naming Service模块提供了注册中心管理服务然后对外提供了OpenAPI接口供客户端调用。实际应用当中我们是通过Nacos客户端SDK来完成相关接口的调用的SDK屏蔽了所有接口调用的细节我们只需要完成相关的配置即可。
核心Open API接口如下
服务注册/nacos/v1/ns/instance (POST)服务实例获取/nacos/v1/ns/instance/list (GET)服务监听/nacos/v1/ns/instance/list (GET)
Nacos服务注册与发现实现原理总览 服务提供者使用Open API发起服务注册客户端与服务端建立心跳机制检测服务状态客户端(服务消费者)查询服务提供方实例列表定时任务定期(默认10s)拉取一次服务端数据到客户端(服务消费者)Nacos服务端检测到服务提供者异常基于UDP协议推送更新到客户端(服务消费者)。
SpringCloud服务注册规范 核心类ServiceRegistry它是Spring Cloud提供的服务注册标准。集成到Spring Cloud中实现服务注册的组件都会实现该接口。该接口定义如下
package org.springframework.cloud.client.serviceregistry;
public interface ServiceRegistryR extends Registration {void register(R registration);void deregister(R registration);void close();void setStatus(R registration, String status);T T getStatus(R registration);
}
服务注册
Spring Cloud Alibaba Nacos作为注册中心它在具体项目中是如何开始服务注册的呢不论我们在项目中是通过什么样的方式集成Nacos服务注册的开启方式是应用程序启动之后发布相关的事件然后基于spring的事件监听去调用ServiceReistry的register方法。因为ServiceRegistry是一个接口所以当我们在应用中集成了Nacos实际调用的时候会执行对应实现类的register方法这里另一个核心类就出来了它就是NacosServiceRegistry。NacosServiceRegistry主要做了如下两个事情
通过Nacos客户端SDK调用Nacos服务端提供的Open API接口完成服务的注册对应的接口为nacos/v1/ns/instance。向服务端定时发送心跳(服务端确保注册服务健康的手段)。
这里我们先分析第1点第2点后面单独分析。服务注册的时候Nacos客户端一些关键的实现源码如下
public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn(No service to register for nacos client...);} else {NamingService namingService this.namingService();String serviceId registration.getServiceId();String group this.nacosDiscoveryProperties.getGroup();Instance instance this.getNacosInstanceFromRegistration(registration);try {//核心方法(服务注册入口)namingService.registerInstance(serviceId, group, instance);log.info(nacos registry, {} {} {}:{} register finished, new Object[]{group, serviceId, instance.getIp(), instance.getPort()});} catch (Exception var7) {log.error(nacos registry, {} register failed...{},, new Object[]{serviceId, registration.toString(), var7});ReflectionUtils.rethrowRuntimeException(var7);}}
}
通过反射构造NamingService它是一个接口该类封装了和Nacos服务端的各种交互对应的实现类是NacosNamingService。
public static NamingService createNamingService(Properties properties) throws NacosException {try {Class? driverImplClass Class.forName(com.alibaba.nacos.client.naming.NacosNamingService);Constructor constructor driverImplClass.getConstructor(Properties.class);NamingService vendorImpl (NamingService)constructor.newInstance(properties);return vendorImpl;} catch (Throwable var4) {throw new NacosException(-400, var4);}
} NacosNamingService构造方法中会调用一个init方法
private void init(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);this.namespace InitUtils.initNamespaceForNaming(properties);InitUtils.initSerialization();this.initServerAddr(properties);InitUtils.initWebRootContext();this.initCacheDir();this.initLogName(properties);this.eventDispatcher new EventDispatcher();this.serverProxy new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);//客户端心跳发送定时任务在BeatReactor中(BeatTask)this.beatReactor new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));this.hostReactor new HostReactor(this.eventDispatcher, this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.initPollingThreadCount(properties));
} 大家应该注意到了心跳发送的定时任务是在这里初始化的
说了这么多是哪里调用服务端的注册地址呢
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {String groupedServiceName NamingUtils.getGroupedName(serviceName, groupName);if (instance.isEphemeral()) {//创建心跳信息实现健康检查Nacos Server必须要确保注册的服务实例是健康的而心跳检查就是服务健康检测的手段。BeatInfo beatInfo this.beatReactor.buildBeatInfo(groupedServiceName, instance);//心跳发送定时任务BeatTask在这个方法中被运行...this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//serverProxy.registerService实现服务注册(/nacos/v1/ns/instance)this.serverProxy.registerService(groupedServiceName, groupName, instance);
}
完成服务的注册客户端的实现基本上是这样。那么客户端发出了服务注册请求之后服务端会做哪些事情呢对应到服务端服务注册的实现代码在nacos-naming模块下的InstanceController类中。
CanDistro
PostMapping
Secured(parser NamingResourceParser.class, action ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {final String namespaceId WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);final Instance instance HttpRequestInstanceBuilder.newBuilder().setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();getInstanceOperator().registerInstance(namespaceId, serviceName, instance);return ok;
}
这个controller方法做了两个事情
从请求参数中获取namespaceId、serviceName和实例信息Instance调用registerInstance注册实例信息。
registerInstance方法具体实现如下
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {createEmptyService(namespaceId, serviceName, instance.isEphemeral());Service service getService(namespaceId, serviceName);checkServiceIsNull(service, namespaceId, serviceName);addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
第一步创建一个空服务(在Nacos控制台服务列表中展示的服务信息)实际上是初始化一个serviceMap它是一个ConcurrentHashMap集合一个双层Map结构。
/*** Map(namespace, Map(group::serviceName, Service)).*/
private final MapString, MapString, Service serviceMap new ConcurrentHashMap(); 第二步getService从serviceMap中根据namespaceId和serviceName得到一个服务对象。
第三步调用addInstance添加服务实例。
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {//1.根据namespaceId和serviceName从缓存中获取Service实例。//2.如果Service实例为空则创建并保存到缓存中。Service service getService(namespaceId, serviceName);if (service null) {Loggers.SRV_LOG.info(creating empty service {}:{}, namespaceId, serviceName);service new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster ! null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}
这里我们重点看一下putServiceAndInit方法
private void putServiceAndInit(Service service) throws NacosException {//1.通过putService将服务缓存到内存。putService(service);service getService(service.getNamespaceId(), service.getName());//2.service.init()建立心跳检测机制(ClientBeatCheckTask)。它主要是通过定时任务不断检测当前服务下所有实例最后发送心跳包的时间。//15s没有收到客户端发送的心跳服务健康状态设置为false30s没有收到客户端发送的心跳服务实例移除。//如果超时则设置healthy为false表示服务不健康并且发送服务变更事件。这里可以思考一下服务实例的最后心跳包更新时间是由谁来触发的(nacos/vs/ns/beat中的service.processClientBeat(clientBeat) ClientBeatProcessor)。service.init();//3.consistencyService.listen实现数据一致性的监听。consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info([NEW-SERVICE] {}, service.toJson());
}
service.init方法中会启动服务端的心跳检测机制ClientBeatCheckTask具体实现见下面的心跳机制与健康检查。
最后addInstance方法保存服务实例
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {String key KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);Service service getService(namespaceId, serviceName);synchronized (service) {ListInstance instanceList addIpAddresses(service, ephemeral, ips);Instances instances new Instances();instances.setInstanceList(instanceList);consistencyService.put(key, instances);}
}
服务注册总结
1.客户端通过调用OpenAPI的形式发起服务注册请求(POST请求发送请求/nacos/v1/ns/instance)
2.服务端收到请求后会做下面几件事情
构建一个Service对象保存到ConcurrentHashMap集合中。使用定时任务对当前服务下的所有实例建立心跳检测机制(ClientBeatCheckTask)。基于数据一致性协议将服务数据进行同步(Raft一致性协议)。
心跳机制与健康检查
心跳机制是Nacos作为注册中心检测服务是否健康的重要手段接下来我们就来详细看看客户端和服务端各自的实现。
前面我们已经知道了客户端发送心跳的时机这里我们看看下客户端的核心实现代码
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {LogUtils.NAMING_LOGGER.info([BEAT] adding beat: {} to beat map., beatInfo);String key this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat null;if ((existBeat (BeatInfo)this.dom2Beat.remove(key)) ! null) {existBeat.setStopped(true);}this.dom2Beat.put(key, beatInfo);//定时发送心跳包(默认period为5s)this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
package com.alibaba.nacos.client.naming.beat;public class BeatReactor implements Closeable {private final ScheduledExecutorService executorService;private final NamingProxy serverProxy;private boolean lightBeatEnabled;public final MapString, BeatInfo dom2Beat;class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo beatInfo;}public void run() {if (!this.beatInfo.isStopped()) {long nextTime this.beatInfo.getPeriod();try {//向Nacos Server发送心跳(/nacos/v1/ns/instance/beat)服务端收到客户端发送的心跳之后会更新服务实例最后一次上报心跳的时间JsonNode result BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);long interval result.get(clientBeatInterval).asLong();boolean lightBeatEnabled false;if (result.has(lightBeatEnabled)) {lightBeatEnabled result.get(lightBeatEnabled).asBoolean();}BeatReactor.this.lightBeatEnabled lightBeatEnabled;if (interval 0L) {nextTime interval;}int code 10200;if (result.has(code)) {code result.get(code).asInt();}if (code 20404) {Instance instance new Instance();instance.setPort(this.beatInfo.getPort());instance.setIp(this.beatInfo.getIp());instance.setWeight(this.beatInfo.getWeight());instance.setMetadata(this.beatInfo.getMetadata());instance.setClusterName(this.beatInfo.getCluster());instance.setServiceName(this.beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {//如果请求资源Nacos服务端没有找到返回20404向Nacos Server重新发起服务注册
BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);} catch (Exception var10) {}}} catch (NacosException var11) {LogUtils.NAMING_LOGGER.error([CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}, new Object[]{JacksonUtils.toJson(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});}//每一次心跳发送完之后5s再次发送。BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);}}}
}
心跳机制就是客户端通过schedule定时向服务端发送一个数据包然后启动一个线程不断检测服务端的回应如果在设定时间内没有收到服务端的回应则认为服务器出现了故障。Nacos服务端会根据客户端的心跳包不断更新服务的状态。
客户端发送完心跳服务端又是如何对服务健康状态进行检查的呢接下来我们一起看看Nacos服务端是如何实现服务健康检查的从前面服务注册的分析中我们知道服务端的心跳检查机制定时任务为ClientBeatCheckTask(该任务是在服务注册的时候开启的)其具体代码实现如下
package com.alibaba.nacos.naming.healthcheck;
public class ClientBeatCheckTask implements Runnable {Overridepublic void run() {try {ListInstance instances service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {//服务端超过15s没有收到心跳设置服务健康状态为false并发布事件InstanceHeartbeatTimeoutEventif (System.currentTimeMillis() - instance.getLastBeat() instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info({POS} {IP-DISABLED} valid: {}:{}{}{}, region: {}, msg: client timeout after {}, last beat: {},instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}//服务端超过30s没有收到心跳移除服务实例(/nacos/v1/ns/instance - DELETE)if (System.currentTimeMillis() - instance.getLastBeat() instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info([AUTO-DELETE-IP] service: {}, ip: {}, service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn(Exception while processing client beat time out., e);}}private void deleteIp(Instance instance) {try {NamingProxy.Request request NamingProxy.Request.newRequest();request.appendParam(ip, instance.getIp()).appendParam(port, String.valueOf(instance.getPort())).appendParam(ephemeral, true).appendParam(clusterName, instance.getClusterName()).appendParam(serviceName, service.getName()).appendParam(namespaceId, service.getNamespaceId());String url http:// IPUtil.localHostIP() IPUtil.IP_PORT_SPLITER EnvUtil.getPort() EnvUtil.getContextPath() UtilsAndCommons.NACOS_NAMING_CONTEXT /instance? request.toUrl();// delete instance asynchronously:HttpClient.asyncHttpDelete(url, null, null, new CallbackString() {Overridepublic void onReceive(RestResultString result) {if (!result.ok()) {Loggers.SRV_LOG.error([IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {},instance.toJson(), result.getMessage(), result.getCode());}}Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.error([IP-DEAD] failed to delete ip automatically, ip: {}, error: {}, instance.toJson(),throwable);}Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.error([IP-DEAD] failed to delete ip automatically, ip: {}, error: {}, instance.toJson(), e);}}
}其核心逻辑是不断检测当前服务下所有实例最后发送心跳包的时间15s没有收到客户端发送的心跳服务健康状态设置为false30s没有收到客户端发送的心跳服务实例移除。如果超时则设置healthy为false表示服务不健康并且发送服务变更事件。这里有一个小小的问题服务实例的最后心跳包更新时间是由谁来触发的是在客户端向服务端发送心跳之后服务端收到请求之后处理的时候会进行设置(ClientBeatProcessor)。
public class ClientBeatProcessor implements Runnable {public static final long CLIENT_BEAT_TIMEOUT TimeUnit.SECONDS.toMillis(15);private RsInfo rsInfo;private Service service;JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}public RsInfo getRsInfo() {return rsInfo;}public void setRsInfo(RsInfo rsInfo) {this.rsInfo rsInfo;}public Service getService() {return service;}public void setService(Service service) {this.service service;}Overridepublic void run() {Service service this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug([CLIENT-BEAT] processing beat: {}, rsInfo.toString());}String ip rsInfo.getIp();String clusterName rsInfo.getCluster();int port rsInfo.getPort();Cluster cluster service.getClusterMap().get(clusterName);ListInstance instances cluster.allIPs(true);for (Instance instance : instances) {if (instance.getIp().equals(ip) instance.getPort() port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug([CLIENT-BEAT] refresh beat: {}, rsInfo.toString());}//更新心跳最后一次上报的时间instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked() !instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info(service: {} {POS} {IP-ENABLED} valid: {}:{}{}, region: {}, msg: client beat ok,cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}
}3.nacos服务端针对服务的健康检查(15s未收到心跳设置服务健康状态healthyfalse30s未收到心跳删除服务实例)
服务发现
1.客服端主动拉取
服务提供者注册到注册中心之后服务消费者是如何获取服务提供者地址的呢服务消费者完成对服务提供者的订阅之后首先会有一个线程定期去获取服务列表这种场景下是客户端主动去拉取服务提供者的相关信息。分析服务注册实现原理的时候我们说到NacosNamingService的初始化其中有一个很关键的类HostReactor后面的服务动态发现的实现也有它的参与。客户端对服务端进行订阅之后就会主动去获取服务提供者的信息。
public void subscribe(String serviceName, String groupName, ListString clusters, EventListener listener) throws NacosException {this.eventDispatcher.addListener(this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ,)), StringUtils.join(clusters, ,), listener);}
这里调用了HostReactor类的getServiceInfo方法
public ServiceInfo getServiceInfo(String serviceName, String clusters) {LogUtils.NAMING_LOGGER.debug(failover-mode: this.failoverReactor.isFailoverSwitch());String key ServiceInfo.getKey(serviceName, clusters);if (this.failoverReactor.isFailoverSwitch()) {return this.failoverReactor.getService(key);} else {ServiceInfo serviceObj this.getServiceInfo0(serviceName, clusters);if (null serviceObj) {serviceObj new ServiceInfo(serviceName, clusters);this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);this.updatingMap.put(serviceName, new Object());this.updateServiceNow(serviceName, clusters);this.updatingMap.remove(serviceName);} else if (this.updatingMap.containsKey(serviceName)) {synchronized(serviceObj) {try {serviceObj.wait(5000L);} catch (InterruptedException var8) {LogUtils.NAMING_LOGGER.error([getServiceInfo] serviceName: serviceName , clusters: clusters, var8);}}}this.scheduleUpdateIfAbsent(serviceName, clusters);return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());}}
这个方法除了第一次获取服务提供者的信息还会将UpdateTask定时任务启动这个定时任务负责定期拉取服务提供者列表。
public class UpdateTask implements Runnable {long lastRefTime Long.MAX_VALUE;private final String clusters;private final String serviceName;private int failCount 0;public UpdateTask(String serviceName, String clusters) {this.serviceName serviceName;this.clusters clusters;}private void incFailCount() {int limit 6;if (this.failCount ! limit) {this.failCount;}}private void resetFailCount() {this.failCount 0;}public void run() {long delayTime 1000L;try {ServiceInfo serviceObj (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));if (serviceObj null) {HostReactor.this.updateService(this.serviceName, this.clusters);return;}if (serviceObj.getLastRefTime() this.lastRefTime) {HostReactor.this.updateService(this.serviceName, this.clusters);serviceObj (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));} else {HostReactor.this.refreshOnly(this.serviceName, this.clusters);}this.lastRefTime serviceObj.getLastRefTime();if (!HostReactor.this.eventDispatcher.isSubscribed(this.serviceName, this.clusters) !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {LogUtils.NAMING_LOGGER.info(update task is stopped, service: this.serviceName , clusters: this.clusters);return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {this.incFailCount();return;}delayTime serviceObj.getCacheMillis();this.resetFailCount();} catch (Throwable var7) {this.incFailCount();LogUtils.NAMING_LOGGER.warn([NA] failed to update serviceName: this.serviceName, var7);} finally {HostReactor.this.executor.schedule(this, Math.min(delayTime this.failCount, 60000L), TimeUnit.MILLISECONDS);}}}
下面我们再回头来看看服务发现的具体实现客户端请求如下
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {MapString, String params new HashMap(8);params.put(namespaceId, this.namespaceId);//命名空间IDparams.put(serviceName, serviceName);//服务名称params.put(clusters, clusters);//集群params.put(udpPort, String.valueOf(udpPort));//端口params.put(clientIP, NetUtils.localIP());//IPparams.put(healthyOnly, String.valueOf(healthyOnly));return this.reqApi(UtilAndComs.nacosUrlBase /instance/list, params, GET);
}
Nacos服务端对应的Controller实现为
GetMapping(/list)
Secured(parser NamingResourceParser.class, action ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {//1.解析请求参数String namespaceId WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent WebUtils.getUserAgent(request);String clusters WebUtils.optional(request, clusters, StringUtils.EMPTY);String clientIP WebUtils.optional(request, clientIP, StringUtils.EMPTY);int udpPort Integer.parseInt(WebUtils.optional(request, udpPort, 0));String env WebUtils.optional(request, env, StringUtils.EMPTY);boolean isCheck Boolean.parseBoolean(WebUtils.optional(request, isCheck, false));String app WebUtils.optional(request, app, StringUtils.EMPTY);String tenant WebUtils.optional(request, tid, StringUtils.EMPTY);boolean healthyOnly Boolean.parseBoolean(WebUtils.optional(request, healthyOnly, false));//2.通过doSrvIpxt返回服务列表参数return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo new ClientInfo(agent);ObjectNode result JacksonUtils.createEmptyJsonNode();//1.根据namespaceId和serviceName获取Service实例Service service serviceManager.getService(namespaceId, serviceName);//2.获取指定服务下的所有实例IPListInstance srvedIPs;srvedIPs service.srvIPs(Arrays.asList(StringUtils.split(clusters, ,)));// filter ips using selector:if (service.getSelector() ! null StringUtils.isNotBlank(clientIP)) {srvedIPs service.getSelector().select(clientIP, srvedIPs);}MapBoolean, ListInstance ipMap new HashMap(2);ipMap.put(Boolean.TRUE, new ArrayList());ipMap.put(Boolean.FALSE, new ArrayList());for (Instance ip : srvedIPs) {ipMap.get(ip.isHealthy()).add(ip);}//3.遍历完成JSON字符串的组装ArrayNode hosts JacksonUtils.createEmptyArrayNode();for (Map.EntryBoolean, ListInstance entry : ipMap.entrySet()) {ListInstance ips entry.getValue();if (healthyOnly !entry.getKey()) {continue;}for (Instance instance : ips) {// remove disabled instance:if (!instance.isEnabled()) {continue;}ObjectNode ipObj JacksonUtils.createEmptyJsonNode();ipObj.put(ip, instance.getIp());ipObj.put(port, instance.getPort());// deprecated since nacos 1.0.0:ipObj.put(valid, entry.getKey());ipObj.put(healthy, entry.getKey());ipObj.put(marked, instance.isMarked());ipObj.put(instanceId, instance.getInstanceId());ipObj.set(metadata, JacksonUtils.transferToJsonNode(instance.getMetadata()));ipObj.put(enabled, instance.isEnabled());ipObj.put(weight, instance.getWeight());ipObj.put(clusterName, instance.getClusterName());if (clientInfo.type ClientInfo.ClientType.JAVA clientInfo.version.compareTo(VersionUtil.parseVersion(1.0.0)) 0) {ipObj.put(serviceName, instance.getServiceName());} else {ipObj.put(serviceName, NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put(ephemeral, instance.isEphemeral());hosts.add(ipObj);}}result.replace(hosts, hosts);if (clientInfo.type ClientInfo.ClientType.JAVA clientInfo.version.compareTo(VersionUtil.parseVersion(1.0.0)) 0) {result.put(dom, serviceName);} else {result.put(dom, NamingUtils.getServiceName(serviceName));}result.put(name, serviceName);result.put(cacheMillis, cacheMillis);result.put(lastRefTime, System.currentTimeMillis());result.put(checksum, service.getChecksum());result.put(useSpecifiedURL, false);result.put(clusters, clusters);result.put(env, env);result.replace(metadata, JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}
2.服务实例发生变更服务端推送(基于UDP协议)
我们知道定期拉取会存在时效性的问题。Nacos作为注册中心设计思想和Nacos作为配置中心一些思想上都是一致的都采用了推拉结合的模式。下面我们来看看它的具体实现。
这里我们需要回忆一下前面的一些分析服务端的心跳检测机制中如果15s没有收到服务提供者发送的心跳会发布一个ServiceChangeEvent事件。
package com.alibaba.nacos.naming.healthcheck;
public class ClientBeatCheckTask implements Runnable {private Service service;Overridepublic void run() {try {ListInstance instances service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info({POS} {IP-DISABLED} valid: {}:{}{}{}, region: {}, msg: client timeout after {}, last beat: {},instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());//发布ServiceChangeEvent事件getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info([AUTO-DELETE-IP] service: {}, ip: {}, service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn(Exception while processing client beat time out., e);}}private void deleteIp(Instance instance) {try {NamingProxy.Request request NamingProxy.Request.newRequest();request.appendParam(ip, instance.getIp()).appendParam(port, String.valueOf(instance.getPort())).appendParam(ephemeral, true).appendParam(clusterName, instance.getClusterName()).appendParam(serviceName, service.getName()).appendParam(namespaceId, service.getNamespaceId());String url http:// IPUtil.localHostIP() IPUtil.IP_PORT_SPLITER EnvUtil.getPort() EnvUtil.getContextPath() UtilsAndCommons.NACOS_NAMING_CONTEXT /instance? request.toUrl();// delete instance asynchronously:HttpClient.asyncHttpDelete(url, null, null, new CallbackString() {Overridepublic void onReceive(RestResultString result) {if (!result.ok()) {Loggers.SRV_LOG.error([IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {},instance.toJson(), result.getMessage(), result.getCode());}}Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.error([IP-DEAD] failed to delete ip automatically, ip: {}, error: {}, instance.toJson(),throwable);}Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.error([IP-DEAD] failed to delete ip automatically, ip: {}, error: {}, instance.toJson(), e);}}
}
public void serviceChanged(Service service) {// merge some change events to reduce the push frequency:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {return;}this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
} PushService实现了ApplicationListener会监听ServiceChangeEvent事件。
Override
public void onApplicationEvent(ServiceChangeEvent event) {Service service event.getService();String serviceName service.getName();String namespaceId service.getNamespaceId();Future future GlobalExecutor.scheduleUdpSender(() - {try {Loggers.PUSH.info(serviceName is changed, add it to push queue.);ConcurrentMapString, PushClient clients clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}MapString, Object cache new HashMap(16);long lastRefTime System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug(client is zombie: client.toString());clients.remove(client.toString());Loggers.PUSH.debug(client is zombie: client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug(push serviceName: {} to client: {}, serviceName, client.toString());String key getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData null;MapString, Object data null;if (switchDomain.getDefaultPushCacheMillis() 20000 cache.containsKey(key)) {org.javatuples.Pair pair (org.javatuples.Pair) cache.get(key);compressData (byte[]) (pair.getValue0());data (MapString, Object) pair.getValue1();Loggers.PUSH.debug([PUSH-CACHE] cache hit: {}:{}, serviceName, client.getAddrStr());}if (compressData ! null) {ackEntry prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry ! null) {cache.put(key, new org.javatuples.Pair(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info(serviceName: {} changed, schedule push for: {}, agent: {}, key: {},client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry null ? null : ackEntry.key));//基于UDP协议推送信息到客户端udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error([NACOS-PUSH] failed to push serviceName: {} to client, error: {}, serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry null) {Loggers.PUSH.error([NACOS-PUSH] ackEntry is null.);return null;}if (ackEntry.getRetryTimes() MAX_RETRY_TIMES) {Loggers.PUSH.warn(max re-push times reached, retry times {}, key: {}, ackEntry.retryTimes, ackEntry.key);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush 1;return ackEntry;}try {if (!ackMap.containsKey(ackEntry.key)) {totalPush;}ackMap.put(ackEntry.key, ackEntry);udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());Loggers.PUSH.info(send udp packet: ackEntry.key);udpSocket.send(ackEntry.origin);ackEntry.increaseRetryTime();GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error([NACOS-PUSH] failed to push data: {} to client: {}, error: {}, ackEntry.data,ackEntry.origin.getAddress().getHostAddress(), e);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush 1;return null;}
}
服务消费者收到请求后使用HostReactor中提供的processServiceJSON解析消息并更新本地服务地址列表。
HostReactor的构造方法中会实例化一个PushReceiver类它就是用来处理服务端推送的数据的。
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {this.futureMap new HashMap();this.executor new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {//这是一个后台线程一直运行public Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setDaemon(true);thread.setName(com.alibaba.nacos.client.naming.updater);return thread;}});this.eventDispatcher eventDispatcher;this.beatReactor beatReactor;this.serverProxy serverProxy;this.cacheDir cacheDir;if (loadCacheAtStart) {this.serviceInfoMap new ConcurrentHashMap(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap new ConcurrentHashMap(16);}this.updatingMap new ConcurrentHashMap();this.failoverReactor new FailoverReactor(this, cacheDir);this.pushReceiver new PushReceiver(this);
}
public class PushReceiver implements Runnable, Closeable {private static final Charset UTF_8 Charset.forName(UTF-8);private static final int UDP_MSS 65536;private ScheduledExecutorService executorService;private DatagramSocket udpSocket;private HostReactor hostReactor;private volatile boolean closed false;public PushReceiver(HostReactor hostReactor) {try {this.hostReactor hostReactor;this.udpSocket new DatagramSocket();this.executorService new ScheduledThreadPoolExecutor(1, new ThreadFactory() {public Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setDaemon(true);thread.setName(com.alibaba.nacos.naming.push.receiver);return thread;}});this.executorService.execute(this);} catch (Exception var3) {LogUtils.NAMING_LOGGER.error([NA] init udp socket failed, var3);}}public void run() {while(!this.closed) {try {byte[] buffer new byte[65536];DatagramPacket packet new DatagramPacket(buffer, buffer.length);//接收服务端的数据this.udpSocket.receive(packet);String json (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim();LogUtils.NAMING_LOGGER.info(received push data: json from packet.getAddress().toString());PushReceiver.PushPacket pushPacket (PushReceiver.PushPacket)JacksonUtils.toObj(json, PushReceiver.PushPacket.class);String ack;if (!dom.equals(pushPacket.type) !service.equals(pushPacket.type)) {if (dump.equals(pushPacket.type)) {ack {\type\: \dump-ack\, \lastRefTime\: \ pushPacket.lastRefTime \, \data\:\ StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) \};} else {ack {\type\: \unknown-ack\, \lastRefTime\:\ pushPacket.lastRefTime \, \data\:\\};}} else {//解析数据this.hostReactor.processServiceJson(pushPacket.data);ack {\type\: \push-ack\, \lastRefTime\:\ pushPacket.lastRefTime \, \data\:\\};}//向服务端发送确认信息this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));} catch (Exception var6) {LogUtils.NAMING_LOGGER.error([NA] error while receiving push data, var6);}}}public void shutdown() throws NacosException {String className this.getClass().getName();LogUtils.NAMING_LOGGER.info({} do shutdown begin, className);ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);this.closed true;this.udpSocket.close();LogUtils.NAMING_LOGGER.info({} do shutdown stop, className);}public int getUdpPort() {return this.udpSocket.getLocalPort();}public static class PushPacket {public String type;public long lastRefTime;public String data;public PushPacket() {}}
}
主流服务注册中心对比
这里我们对比几个常用的注册中心Nacos、Eureka、zookeeper和consul。下面是网上找的一张它们之间的对比内容供大家参考 不管是配置中心还是这篇文章我们分析的服务注册中心只要它们能实现我们的需求在具体的选型上不用太纠结。简单来说跟着团队目前的技术栈走即可大部分场景下不论我们选择哪一个都能达到我们想要的效果。可能在极少数的情况下我们才需要选择特定的注册中心比如对一致性要求很高那AP模式的注册中心我们就要排除掉。
小小收获
前面分析了这么多关于Nacos作为服务注册中心的实现那我们能从中学习到一些什么样的知识呢下面我会列出一些核心的内容大家感兴趣可以再次去深入了解并学习一下。
SpringBoot启动流程(熟悉启动流程才能找到服务注册的入口)【重要】SpringBoot自动装配机制Spring事件发布与监听机制(阅读一些开源中间件的时候涉及比较多)JDK反射机制(反射创建NamingService)线程池(定时发送心跳、定时拉取服务等)【重要】SpringCloud服务注册标准——ServiceRegistry服务异步注册(实现Nacos高性能手段之一)注册表更新机制(写时复制CopyOnWrite)服务变更实现主动推送(DatagramSocket的UDP协议)数据一致性算法