springCloud-Eureka—服务注册与服务续约(二)

2021/4/22 18:55:13

本文主要是介绍springCloud-Eureka—服务注册与服务续约(二),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

系列文章目录

springCloud实践
springCloud实践之浅谈Feign原理
springCloud-Eureka(一)
springCloud-Eureka—服务注册与服务续约(二)
springCloud-Eureka—服务注销与剔除(三)
springCloud-Eureka—服务获取(四)
springCloud-Eureka—服务同步(五)

文章目录

  • 系列文章目录
  • 前言
  • 一、服务注册机制
  • 二、服务心跳续约


前言

前一篇文章我们介绍了eureka的特点、适用场景以及数据的存储结构,本文继上文结合源码讲解eureka的服务注册、续约机制


一、服务注册机制

服务提供者、服务消费者以及注册中心自己,启动后都会向注册中心注册服务(配置了注册)
在这里插入图片描述
注册中心服务接收到register请求后:

  1. 保存服务信息,将服务信息保存到registry中;
  2. 更新队列,将此事件添加到更新队列中,供eureka Client增量同步服务信息使用;
  3. 清空二级缓存,即readWriteCacheMap,用于保证数据一致性;
  4. 更新阈值,供剔除服务使用;
  5. 同步服务信息,将此事件同步至其他的Eureka Server节点。
我应用的是netflix版spring-cloud-starter-netflix-eureka-server,它整合了spring-cloud-netflix-eureka-client

入口:包引入后,通过META-INF/spring.factories里配置的

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration完成自动注入+注册

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration 

	// 构造实例信息并注入
	@Bean
	public EurekaServiceRegistry eurekaServiceRegistry() {
		return new EurekaServiceRegistry();
	}


	//通过上面这个注解的自动配置先构建实例信息,然后@AutoConfigureAfter进入下一步
	@Configuration
	@EnableConfigurationProperties
	@ConditionalOnClass(EurekaClientConfig.class)
	@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
	public class EurekaDiscoveryClientConfiguration{

	class Marker {}

	@Bean
	public Marker eurekaDiscoverClientMarker() {
		return new Marker();
	}

	@Configuration
	@ConditionalOnClass(RefreshScopeRefreshedEvent.class)
	protected static class EurekaClientConfigurationRefresher {

		@Autowired(required = false)
		private EurekaClient eurekaClient;

		@Autowired(required = false)
		private EurekaAutoServiceRegistration autoRegistration;

		//这个类完成了服务的真正注册
		@EventListener(RefreshScopeRefreshedEvent.class)
		public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
			//This will force the creation of the EurkaClient bean if not already created
			//to make sure the client will be reregistered after a refresh event
			if(eurekaClient != null) {
				eurekaClient.getApplications();
			}
			if (autoRegistration != null) {
				// register in case meta data changed
				this.autoRegistration.stop();
				this.autoRegistration.start();
			}
		}
	}
	}

	//EurekaAutoServiceRegistration 类
	public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
	/**
	* 省略部分代码
	*/
	@Override
	public void start() {
		// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
		if (this.port.get() != 0) {
			if (this.registration.getNonSecurePort() == 0) {
				this.registration.setNonSecurePort(this.port.get());
			}

			if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
				this.registration.setSecurePort(this.port.get());
			}
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
			//完成注册
			this.serviceRegistry.register(this.registration);

			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
			this.running.set(true);
		}
	}
	}
	
	//EurekaRegistration类
	@Bean
	@org.springframework.cloud.context.config.annotation.RefreshScope
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	@ConditionalOnProperty(
			value = "spring.cloud.service-registry.auto-registration.enabled",
			matchIfMissing = true)
	public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
			CloudEurekaInstanceConfig instanceConfig,
			ApplicationInfoManager applicationInfoManager, @Autowired(
					required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
		// 注册方法跟进去看到的是代理类$Proxy,代理的目标对象是
	    // interface com.netflix.discovery.EurekaClient
		return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
				.with(eurekaClient).with(healthCheckHandler).build();
	}
	
	// 查看EurekaClient代码
	@ImplementedBy(DiscoveryClient.class)
	public interface EurekaClient extends LookupService {
	

这里可以看到引入的是DiscoveryClient类

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        //客户端路径唯一标识符:xx-service/120.0.0.1:8080
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;

        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());
		//获取远程eureka服务注册列表的次数
        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
		//判断配置是否需要获取远程eureka服务注册列表
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
		//判断是否需要注册到远程eureka服务
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
		//客户端配置为既不注册也不查询数据
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

            return;  // no need to setup up an network tasks and we are done
        }
		//# 开始定义定时任务调度器 scheduler
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
			//通过定时调度每隔30s,处理心跳
			//线程池的核心线程数为1,最大线程数默认为2(HeartbeatExecutorThreadPoolSize)
			//使用同步队列:SynchronousQueue,每次提交都要阻塞等待处理
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
			//通过定时调度每隔30s,处理eureka提供的服务列表缓存
			//线程池的核心线程数为1,最大线程数默认为2(HeartbeatExecutorThreadPoolSize)
			//使用同步队列:SynchronousQueue,每次提交都要阻塞等待处理
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
			//初始化一个eureka请求传输器
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
		//如果配置需要注册为eureka的服务,并且初始化强制注册,则进行注册
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
		//最后,初始化调度任务(例如,集群解析器、心跳、instanceInfo复制器、fetch)
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

	
	/**重点,开始启动定时任务调度器 scheduler
     * Initializes all scheduled tasks.
     * (例如,集群解析器、心跳、instanceInfo复制器、fetch)
     */
    private void initScheduledTasks() {
    	//根据客户端配置是否需要获取eureka的服务列表,需要的话就开启注册表缓存刷新
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            //注册表缓存刷新定时器
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //注册表缓存刷新任务,超时时间和执行时间30s
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    //注册表缓存定时调度,30s一次
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
		//根据客户端配置是否需要注册到eureka,需要的话就开启心跳续约
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
			//心跳续约定时器,超时时间和执行时间30s
            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    //心跳续约定时调度,30s执行一次
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            //instanceInfo复制,实力信息复制定时器
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
			//状态更新监听器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
			//这里是开启了在需要的时候更新状态变化的开关才会添加监听器,此处当开关开启时,状态发生变化,会立即收到通知,调用onDemandUpdate方法
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
			//启动周期性实例信息复制到远程定时器,默认延迟40s执行	
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

 //取消调度和任务
 private void cancelScheduledTasks() {
        if (instanceInfoReplicator != null) {
            instanceInfoReplicator.stop();
        }
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdownNow();
        }
        if (cacheRefreshExecutor != null) {
            cacheRefreshExecutor.shutdownNow();
        }
        if (scheduler != null) {
            scheduler.shutdownNow();
        }
    }


	/**
     * Register with the eureka service by making the appropriate REST call.
     * 通过发出rest调用向eureka服务注册
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

二、服务心跳续约

 	/**
     * The heartbeat task that renews the lease in the given intervals.
     */
    private class HeartbeatThread implements Runnable {

        public void run() {
        	//心跳续约租期
            if (renew()) {
            	//如果心跳正常,更新发起上次心跳的时间戳为当前时间
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }


	/**通过发送rest调用更新eureka服务
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
        	//客户端请求eureka服务器,维持客户端与eureka服务器的心跳
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            //如果eureka服务器未发现当前客户端,则进行注册
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                //客户端发起注册eureka
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }


这篇关于springCloud-Eureka—服务注册与服务续约(二)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程