springCloud-Eureka—服务注册与服务续约(二)
2021/4/22 18:55:13
本文主要是介绍springCloud-Eureka—服务注册与服务续约(二),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
系列文章目录
springCloud实践
springCloud实践之浅谈Feign原理
springCloud-Eureka(一)
springCloud-Eureka—服务注册与服务续约(二)
springCloud-Eureka—服务注销与剔除(三)
springCloud-Eureka—服务获取(四)
springCloud-Eureka—服务同步(五)
文章目录
- 系列文章目录
- 前言
- 一、服务注册机制
- 二、服务心跳续约
前言
前一篇文章我们介绍了eureka的特点、适用场景以及数据的存储结构,本文继上文结合源码讲解eureka的服务注册、续约机制
一、服务注册机制
服务提供者、服务消费者以及注册中心自己,启动后都会向注册中心注册服务(配置了注册)
注册中心服务接收到register请求后:
- 保存服务信息,将服务信息保存到registry中;
- 更新队列,将此事件添加到更新队列中,供eureka Client增量同步服务信息使用;
- 清空二级缓存,即readWriteCacheMap,用于保证数据一致性;
- 更新阈值,供剔除服务使用;
- 同步服务信息,将此事件同步至其他的Eureka Server节点。
我应用的是netflix版spring-cloud-starter-netflix-eureka-server,它整合了spring-cloud-netflix-eureka-client
入口:包引入后,通过META-INF/spring.factories里配置的
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\ org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\ org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration完成自动注入+注册
@Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @Import(DiscoveryClientOptionalArgsConfiguration.class) @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"}) public class EurekaClientAutoConfiguration // 构造实例信息并注入 @Bean public EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry(); } //通过上面这个注解的自动配置先构建实例信息,然后@AutoConfigureAfter进入下一步 @Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) public class EurekaDiscoveryClientConfiguration{ class Marker {} @Bean public Marker eurekaDiscoverClientMarker() { return new Marker(); } @Configuration @ConditionalOnClass(RefreshScopeRefreshedEvent.class) protected static class EurekaClientConfigurationRefresher { @Autowired(required = false) private EurekaClient eurekaClient; @Autowired(required = false) private EurekaAutoServiceRegistration autoRegistration; //这个类完成了服务的真正注册 @EventListener(RefreshScopeRefreshedEvent.class) public void onApplicationEvent(RefreshScopeRefreshedEvent event) { //This will force the creation of the EurkaClient bean if not already created //to make sure the client will be reregistered after a refresh event if(eurekaClient != null) { eurekaClient.getApplications(); } if (autoRegistration != null) { // register in case meta data changed this.autoRegistration.stop(); this.autoRegistration.start(); } } } } //EurekaAutoServiceRegistration 类 public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered { /** * 省略部分代码 */ @Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { //完成注册 this.serviceRegistry.register(this.registration); this.context.publishEvent( new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } } } //EurekaRegistration类 @Bean @org.springframework.cloud.context.config.annotation.RefreshScope @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty( value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { // 注册方法跟进去看到的是代理类$Proxy,代理的目标对象是 // interface com.netflix.discovery.EurekaClient return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) .with(eurekaClient).with(healthCheckHandler).build(); } // 查看EurekaClient代码 @ImplementedBy(DiscoveryClient.class) public interface EurekaClient extends LookupService {
这里可以看到引入的是DiscoveryClient类
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; //客户端路径唯一标识符:xx-service/120.0.0.1:8080 if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); //获取远程eureka服务注册列表的次数 fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); //判断配置是否需要获取远程eureka服务注册列表 if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } //判断是否需要注册到远程eureka服务 if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); //客户端配置为既不注册也不查询数据 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } //# 开始定义定时任务调度器 scheduler try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); //通过定时调度每隔30s,处理心跳 //线程池的核心线程数为1,最大线程数默认为2(HeartbeatExecutorThreadPoolSize) //使用同步队列:SynchronousQueue,每次提交都要阻塞等待处理 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //通过定时调度每隔30s,处理eureka提供的服务列表缓存 //线程池的核心线程数为1,最大线程数默认为2(HeartbeatExecutorThreadPoolSize) //使用同步队列:SynchronousQueue,每次提交都要阻塞等待处理 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //初始化一个eureka请求传输器 eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } //如果配置需要注册为eureka的服务,并且初始化强制注册,则进行注册 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } //最后,初始化调度任务(例如,集群解析器、心跳、instanceInfo复制器、fetch) // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); } /**重点,开始启动定时任务调度器 scheduler * Initializes all scheduled tasks. * (例如,集群解析器、心跳、instanceInfo复制器、fetch) */ private void initScheduledTasks() { //根据客户端配置是否需要获取eureka的服务列表,需要的话就开启注册表缓存刷新 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer //注册表缓存刷新定时器 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //注册表缓存刷新任务,超时时间和执行时间30s scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), //注册表缓存定时调度,30s一次 registryFetchIntervalSeconds, TimeUnit.SECONDS); } //根据客户端配置是否需要注册到eureka,需要的话就开启心跳续约 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); //心跳续约定时器,超时时间和执行时间30s // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), //心跳续约定时调度,30s执行一次 renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator //instanceInfo复制,实力信息复制定时器 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //状态更新监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; //这里是开启了在需要的时候更新状态变化的开关才会添加监听器,此处当开关开启时,状态发生变化,会立即收到通知,调用onDemandUpdate方法 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //启动周期性实例信息复制到远程定时器,默认延迟40s执行 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } //取消调度和任务 private void cancelScheduledTasks() { if (instanceInfoReplicator != null) { instanceInfoReplicator.stop(); } if (heartbeatExecutor != null) { heartbeatExecutor.shutdownNow(); } if (cacheRefreshExecutor != null) { cacheRefreshExecutor.shutdownNow(); } if (scheduler != null) { scheduler.shutdownNow(); } } /** * Register with the eureka service by making the appropriate REST call. * 通过发出rest调用向eureka服务注册 */ boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
二、服务心跳续约
/** * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable { public void run() { //心跳续约租期 if (renew()) { //如果心跳正常,更新发起上次心跳的时间戳为当前时间 lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } /**通过发送rest调用更新eureka服务 * Renew with the eureka service by making the appropriate REST call */ boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { //客户端请求eureka服务器,维持客户端与eureka服务器的心跳 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); //如果eureka服务器未发现当前客户端,则进行注册 if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); //客户端发起注册eureka boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
这篇关于springCloud-Eureka—服务注册与服务续约(二)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-26结对编程到底难不难?答案在这里
- 2024-06-19《2023版Java工程师》课程升级公告
- 2024-06-15matplotlib作图不显示3D图,怎么办?
- 2024-06-1503-Loki 日志监控
- 2024-06-1504-让LLM理解知识 -Prompt
- 2024-06-05做软件测试需要懂代码吗?
- 2024-06-0514-ShardingSphere的分布式主键实现
- 2024-06-03为什么以及如何要进行架构设计权衡?
- 2024-05-31全网首发第二弹!软考2024年5月《软件设计师》真题+解析+答案!(11-20题)
- 2024-05-31全网首发!软考2024年5月《软件设计师》真题+解析+答案!(21-30题)