天呐,更新小小的注册表居然这么复杂?【手撕eureka源码NO.2】
2021/11/17 14:10:56
本文主要是介绍天呐,更新小小的注册表居然这么复杂?【手撕eureka源码NO.2】,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
通过前面的学习,我们已经知道eureka-client向eureka-server主动上报了自己的通信地址
,这一过程是通过调用服务注册的接口来完成的。
现在eureka-server能够获取到所有eureka-client的通信地址了,可eureka-client是怎么获取其他eureka-client的通信地址的呢?本篇揭晓。
eureka-client是怎样初始化的
先来看看eureka-client初始化时做了哪些操作。不知道大家还记不记得,上一篇讲过在eureka-server的初始化过程中,有一个步骤是创建eurekaClient
对象。
为什么eureka-server的初始化
过程,需要创建eurekaClient
对象呢?
如果我们的eureka-server只部署在一台服务器上,并且只部署一个eureka-server,那么当然不需要创建eurekaClient
。可一旦需要将eureka-server部署成一个集群,情况就变得复杂起来了。
因为在集群中eureka-server之间需要相互注册,所以每一个eureka服务端,同时也是eureka的客户端。
这里只是简单的提一下,具体的内容,后面讲解eureka-server集群相关的知识时再做补充。此处我们只是借助eureka-server的启动,来查看eurekaClient
的初始化逻辑。
先贴一小段代码看看,大家脑海里有个大概的印象,后面画一张流程图讲解。
//创建eurekaClient对象 if (eurekaClient == null) { //读取eureka-client.properties中的配置 EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); //通过instanceConfig和InstanceInfo构造Manager对象 applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); //创建eurekaClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); }
流程图:
InstanceInfo
上一章里提到过,里面包含了服务的ip、端口号、服务名称、实例id等信息。服务注册时,eureka-client会把InstanceInfo
中的信息发送给eureka-server。
现在大家知道InstanceInfo
是在哪里创建的了:基于我们自己的配置文件+一些默认配置构成。
eureka-client是怎样获取注册表的
上文创建了一个DiscoveryClient
对象,在该对象创建的过程中,eureka-client发起http请求向eureka-server请求到了所有的注册表信息。
老规矩,先上一张DiscoveryClient
创建的源码简图。
拉取注册表的主要逻辑在getAndStoreFullRegistry()
方法中,贴出来看一下(部分代码省略)
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; //构建http请求,发起请求 EurekaHttpResponse<Applications> httpResponse = 略...; if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //从请求结果中,获取服务列表 apps = httpResponse.getEntity(); } if (apps == null) { logger.error("略..."); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //保存服务列表 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("略..."); } }
服务列表被保存在AtomicReference<Applications> localRegionApps
中,稍后我们会通过断点来看看Applications
中的内容。
在此之前,我们先来看看,eureka-server在接收到客户端http请求后做了哪些逻辑处理?你可能会想:这有什么难的?直接把注册表数据返回不就可以了?憋着急,我们来看看源码验证一下你的猜想正不正确。
eureka-server是怎样返回注册表的
eureka-server接收http请求的代码,在eureka-core工程下的ApplicationsResource.java中,目录结构如下:
返回服务列表的方法是getContainers(参数略)
,下面我们通过一张代码简图,来看看方法内部的主要逻辑。
在eureka-server的处理逻辑中,用到了一套多级缓存机制,返回服务列表时,不是直接返回注册表,而是先从只读缓存中
读取,如果没有缓存数据,再从读写缓存
中取,如果读写缓存也没有,则从注册表
读取。
只读缓存:ConcurrentMap<Key, Value> readOnlyCacheMap
读写缓存:LoadingCache<Key, Value> readWriteCacheMap
,基于com.google.common.cache.LoadingCache
实现。
缓存读取流程如图:
既然这里使用了缓存,那么一个新的问题就被引入了,什么时候过期缓存数据呢?
1.创建readWriteCacheMap
时,指定了180s自动过期。(定时过期)
public class ResponseCacheImpl implements ResponseCache { private final LoadingCache<Key, Value> readWriteCacheMap; //读写缓存创建 this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000) //从配置中读取缓存过期时间,指定时间单位为-秒 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) }
时间配置在DefaultEurekaServerConfig.java
中:
public class DefaultEurekaServerConfig implements EurekaServerConfig { @Override public long getResponseCacheAutoExpirationInSeconds() { //默认配置180 return configInstance.getIntProperty( namespace + "responseCacheAutoExpirationInSeconds", 180).get(); } }
2.有新服务注册时,缓存会过期。(主动过期)
前面的内容里,我们讲解服务注册时,实际上是调用了eureka-server
中AbstractInstanceRegistry.java
的register()
方法,当时我们忽略了一些细节,其中就包括过期缓存的这一小段。现在我们来看一看。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { public void register(略...) { //调用内部的invalidateCache()方法 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); } private void invalidateCache(略...) { // 调用ResponseCacheImpl.java中过期缓存的方法 responseCache.invalidate(appName, vipAddress, secureVipAddress); } }
从invalidateCache()
可以看出,实际上还是使用的ResponseCacheImpl.java
内部定义的invalidate()
方法。
public void invalidate(Key... keys) { for (Key key : keys) { //使用缓存框架自带的清除方法,清除缓存 readWriteCacheMap.invalidate(key); //略... } }
3.ResponseCacheImpl.java
创建时启动了一个定时任务,每隔30秒就会去过期缓存。(被动过期)
public class ResponseCacheImpl implements ResponseCache { //构造方法 ResponseCacheImpl(略...){ //是否启动只读缓存,默认true if (shouldUseReadOnlyResponseCache) { //具体要执行的任务:getCacheUpdateTask() timer.schedule(getCacheUpdateTask(), //间隔多少毫秒后,首次执行此任务 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), //每隔多少秒后,重复执行此任务 //config中的配置:"responseCacheUpdateIntervalMs", (30 * 1000) responseCacheUpdateIntervalMs); } } //具体的缓存处理逻辑 private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { //遍历readOnlyCacheMap中所有缓存 for (Key key : readOnlyCacheMap.keySet()) { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); //如果'只读缓存'中的数据和'读写缓存'的数据不一致,则用'读写缓存'数据覆盖掉'只读缓存' if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } } }; } }
一张图总结一下:
最后,我们来看一下eureka-server返回的注册表数据
长什么样。
Applications
大概长什么样。
至此eureka-client获取注册表的机制我们就学习了二分之一了。
为什么说是二分之一?憋着急,继续往后看。
现在eureka-client已经能够获取到注册表了,看上去似乎没什么问题,可只要我们多想一步,就会发现问题所在。比如,一旦有新的服务注册到eureka-server上去,之前已经获取到注册表的eureka-client怎么同步新的数据呢?
eureka-client是怎么更新注册表的
为了获取到eureka-server端发生变动的注册表,在eureka-client初始化时启动了一个定时任务,每隔30秒就向eureka-server请求一次变化的注册表数据。
本文开始的代码简图中,我们梳理了discoverClient
创建过程,其中有一段是initScheduledTasks()
,也就是初始化定时任务的地方,当时让大家暂时忽略,现在我们来看一下。
在initScheduledTasks()
中包含多个定时任务,暂时我们只关注了其中刷新注册表的定时任务
,余下部分依然还是后面用到时再来看。
主要逻辑代码如下:
@Singleton public class DiscoveryClient implements EurekaClient { private final ScheduledExecutorService scheduler; private void initScheduledTasks() { //略... scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, //间隔多久执行一次定时任务,默认30(从默认配置获取) registryFetchIntervalSeconds, //间隔时间的单位 TimeUnit.SECONDS, expBackOffBound, //具体任务逻辑 new CacheRefreshThread() ),registryFetchIntervalSeconds, TimeUnit.SECONDS); } }
这里我们先来看一下eureka-client的默认配置是定义在什么地方的,定义在DefaultEurekaClientConfig.java
中:
public class DefaultEurekaClientConfig implements EurekaClientConfig { @Override public int getRegistryFetchIntervalSeconds() { //默认配置:定时器间隔多久执行 return configInstance.getIntProperty( namespace + REGISTRY_REFRESH_INTERVAL_KEY, 30).get(); //配置的key定义:PropertyBasedClientConfigConstants.java //String REGISTRY_REFRESH_INTERVAL_KEY = "client.refresh.interval"; } }
了解了一下默认配置,接下来我们把思路拉回来,继续看刷新注册表的定时任务。
由于代码内部的方法调用过于复杂,此处就不贴源码了,采用代码简图代替。
最主要的方法是getAndUpdateDelta()
,贴出源码看一下:
private void getAndUpdateDelta(Applications applications) Applications delta = null; //发送获取增量注册表的请求 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient. getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } //如果返回数据为空,则重新拉取所有的注册表 if (delta == null) { getAndStoreFullRegistry(); }else if(略...){ //将获取到的注册表数据,与本地合并 updateDelta(delta); //将合并后的结果,生成一个HashCode (此处稍后分析) String reconcileHashCode = getReconcileHashCode(applications); //对比http请求返回的HashCode和新生成的HashCode if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()){ //对比结果不一致,重新拉取全量注册表 reconcileAndLogDifference(delta, reconcileHashCode); } }else{ //log.err(..); } )
从源代码中可以看到,eureka-client获取到数据后,本地进行了一个合并和校验数据的过程,我们来看看这一块。
注册表数据合并:eureka-client获取到注册表数据后,会根据一个ActionType
来判断服务实例的变动类型,也就是判断服务实例到底是需要新增到本地的注册表中,还是要从本地注册表删除,还是需要将本地的某个服务实例信息进行更新。
先看一眼源码,稍后通过流程图加以理解:
private void updateDelta(Applications delta) { //遍历获取到的注册表信息 for (Application app : delta.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (ActionType.ADDED.equals(instance.getActionType())) { //新增 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); }else if (ActionType.MODIFIED.equals(instance.getActionType())) { //修改 applications.***.addInstance(instance); }else if (ActionType.DELETED.equals(instance.getActionType())) { //删除 applications.***.removeInstance(instance); } } } }
**数据校验:**在eureka-server更新完本地注册表后,会将本地的注册表信息做一个hash计算
得到一个哈希值,同时eureka-server在返回数据时,也携带了一个哈希值。
从理论上讲经过一轮更新后,eureka-server和eureka-client中的注册表数据是完全一致的,所以得出的hash计算结果也应该是一样的。
如果不一样,说明eureka-client和eureka-server之间的数据同步出现了问题,那么此时eureka-client会重新向eureka-server请求一次全部的注册表数据。然后将新获取到的数据覆盖掉本地的注册表数据,以保证自己和eureka-server的数据一致。
结合流程图,理解合并和数据校验过程:
以上,整个eureka-client端是怎样定时发送http请求的,获取到数据是怎么进行合并和校验的,就已经梳理清楚了。
接下来看看eureka-server的接口是怎样处理的呢?还是通过多级缓存机制返回数据吗?
eureka-server增量注册表数据是怎么维护的
对于增量注册表数据,eureka-server依然是通过多级缓存机制来返回,但是由于注册表信息在不断发生变化,所以eureka-server是不会重复的返回所有的注册表信息的,在这里eureka-server借助了Queue(队列)
来记录变化的那一部分注册表信息。
每当发生服务注册或者服务主动下线时,就将变化的注册表信息发送到一个recentlyChangedQueue
中,同时在需要这部分数据时,直接取queue
中的数据即可。
配合流程图理解:
recentlyChangedQueue
定义在AbstractInstanceRegistry.java
中,如果大家还有印象,就能记得前面代表注册表的ConcurrentHashMap
也是定义在这里。
我们简单来看看recentlyChangedQueue
长什么样:
public abstract class AbstractInstanceRegistry implements InstanceRegistry { //变量定义 private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>(); //队列中存放的数据类型(一个内部类) private static final class RecentlyChangedItem { private long lastUpdateTime; private Lease<InstanceInfo> leaseInfo; } }
其中的Lease
、InstanceInfo
分别代表租约信息和服务实例信息,前面已经讲过了。
接下来我们看一下readWriteCacheMap
从Queue
中数据的逻辑。
public class ResponseCacheImpl implements ResponseCache { private final AbstractInstanceRegistry registry; //readWriteCacheMap根据指定key未获取到数据,则执行此方法 private Value generatePayload(Key key) { //获取全部注册表数据(全量数据) if (ALL_APPS.equals(key.getName())) {略..} //获取变化的注册表数据(增量数据) else if(ALL_APPS_DELTA.equals(key.getName())){ payload = getPayLoad(key, //调用AbstractInstanceRegistry.java的 //getApplicationDeltasFromMultipleRegions()方法获取队列数据 registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } } }
大家想一想,这里使用队列存储数据会产生一个什么新的问题?随着时间推移,recentlyChangedQueue
中的数据会不断增加,这就导致eureka-client多次定时获取数据时,会获取到重复的数据。
明明之前已经获取过的数据,再反复的重复的获取,完全没有必要。因为eureka-client获取到增量注册表数据后,还需要在本地做一些合并和校验工作,那么随着数据的增多,从网络传输、合并数据、校验数据整条工作路径上的效率都会降低。
所以eureka-server开启了一个定时任务,每隔30秒
就清理一下recentlyChangedQueue
中的数据,确保Queue
中的数据是在180秒
内发生变化的服务实例。
定时任务清除队列数据流程图:
贴一小段简化源码:
public abstract class AbstractInstanceRegistry implements InstanceRegistry { //构造方法 protected AbstractInstanceRegistry(略..){ //开启定时任务 this.deltaRetentionTimer.schedule(getDeltaRetentionTask(), //默认配置30s serverConfig.getDeltaRetentionTimerIntervalInMs(), serverConfig.getDeltaRetentionTimerIntervalInMs()); } //定时任务执行逻辑 private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { //遍历队列中的数据 Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { //将180s内未发生变动的服务,从队列中删除 if (it.next().getLastUpdateTime() < System.currentTimeMillis() - //默认180s serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; } }
到此,整个服务注册和服务发现的核心流程我们就全部明白了,下一篇我们来看看eureka的心跳机制
是如何实现的。
这篇关于天呐,更新小小的注册表居然这么复杂?【手撕eureka源码NO.2】的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-28微服务架构中API版本控制的实践
- 2024-09-28AI给的和自己写的Python代码,都无法改变输入框的内容,替换也不行
- 2024-09-27Sentinel配置限流资料:新手入门教程
- 2024-09-27Sentinel配置限流资料详解
- 2024-09-27Sentinel限流资料:新手入门教程
- 2024-09-26Sentinel限流资料入门详解
- 2024-09-26Springboot框架资料:初学者入门教程
- 2024-09-26Springboot框架资料详解:新手入门教程
- 2024-09-26Springboot企业级开发资料:新手入门指南
- 2024-09-26SpringBoot企业级开发资料新手指南