原创

Spring Cloud源码解析1-Eureka Client

Eureka Client 启动类

eureka先找到spring-cloud-netflix-eureka-client-2.2.1.RELEASE.jar的META-INF/spring.factories,从里面找到最像的自动配置类:EurekaClientAutoConfiguration

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
        CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
        "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
        "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
        "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })

从这里可以总结几点:

  1. @EnableConfigurationProperties注解不只是可以从value里面得到配置类对象,还可以从标准模式(@Bean注解的bean)中获取,只要对象中包含有@ConfigurationProperties注解即可。

  2. @Configuration(proxyBeanMethods=false),这里我们要先理解proxyBeanMethods=true(默认不写就是true),是Full模式;proxyBeanMethods=false是Lite模式。

    • Full模式指的是Configuration类被cglib代理增强,bean是单例的,@Bean方法调用生成实例时,如果已经存在这个bean,就从spring容器中直接取出来返回。

    • Lite模式下Configuration类就是没有被代理的,那么每次获取的对象都是新的实例化对象。

    • Full模式和Lite模式对比:
      1. 默认情况下(不显示指定),配置类会被CGLIB增强(生成代理对象),放进IoC容器内的是代理
      2. Full模式下,配置类内部可以通过方法调用来处理依赖,并且能够保证是同一个实例,都指向IoC内的那个单例
      3. Full模式下,因为被代理,所以@Bean注解的方法不能被private/final等进行修饰(CGLib是通过继承方式实现的)
      4. Full模式在配置类多的情况下会影响性能,所以如果在不牵扯上面几种情况问题的前提下(最好在bean依赖的时候不使用方法依赖),使用Lite模式。
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
    EurekaClientConfigBean client = new EurekaClientConfigBean();
    if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
        client.setRegisterWithEureka(false);
    }
    return client;
}

这里可以看到SpringCloud使用的不是eureka默认的EurekaClientConfig实现类:DefaultEurekaClientConfig,而是自己实现的EurekaClientConfigBean,点击进入可以看到是读取配置文件中的以eureka.client前缀的配置属性。

@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
        ManagementMetadataProvider managementMetadataProvider) {
    String hostname = getProperty("eureka.instance.hostname");
    // ...
}

eureka-client instance配置,点击进入可以看到是读取配置文件中以eureka.instance前缀的配置。

eureka client创建Spring bean实例

EurekaClient根据ResfreshScope的不同有两个不同的创建Spring bean实例的方式,两种只能保存其一。

第一种方式:

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {

    @Autowired
    private ApplicationContext context;

    @Autowired
    private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class,
            search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
            EurekaClientConfig config) {
        return new CloudEurekaClient(manager, config, this.optionalArgs,
                this.context);
    }

    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class,
            search = SearchStrategy.CURRENT)
    public ApplicationInfoManager eurekaApplicationInfoManager(
            EurekaInstanceConfig config) {
        InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(
            value = "spring.cloud.service-registry.auto-registration.enabled",
            matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
            CloudEurekaInstanceConfig instanceConfig,
            ApplicationInfoManager applicationInfoManager, @Autowired(
                    required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
                .with(eurekaClient).with(healthCheckHandler).build();
    }

}

这里我们重点关注:@ConditionalOnMissingRefreshScope

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Conditional(OnMissingRefreshScopeCondition.class)
@interface ConditionalOnMissingRefreshScope {
}

可以看出此处有用的是OnMissingRefreshScopeCondition类:

private static class OnMissingRefreshScopeCondition extends AnyNestedCondition {

    OnMissingRefreshScopeCondition() {
        super(ConfigurationPhase.REGISTER_BEAN);
    }

    @ConditionalOnMissingClass("org.springframework.cloud.context.scope.refresh.RefreshScope")
    static class MissingClass {

    }

    @ConditionalOnMissingBean(RefreshAutoConfiguration.class)
    static class MissingScope {

    }

    @ConditionalOnProperty(value = "eureka.client.refresh.enable",
            havingValue = "false")
    static class OnPropertyDisabled {

    }

}

这里我们需要了解AnyNestedCondition,这个类代表的是该类中的所有内置Condition类只要有任意一个匹配,即可算匹配。那么我们可以得出结论,EurekaClientConfiguration生效的前提是:

  1. org.springframework.cloud.context.scope.refresh.RefreshScope类不存在
  2. 容器中不存在RefreshAutoConfiguration的bean实例
  3. eureka.client.refresh.enable配置为false

第二种方式:

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
        search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
        EurekaClientConfig config) {
    return new CloudEurekaClient(manager, config, this.optionalArgs,
            this.context);
}

这里代表,在当前spring容器中没有EurekaClient的bean实例时会执行该方法,其中SearchStrategy有三种:

  1. CURRENT-当前spring context
  2. ANCESTORS-当前spring context的所有父容器
  3. ALL-包含前两者
@Configuration(proxyBeanMethods = false)
@ConditionalOnRefreshScope
protected static class RefreshableEurekaClientConfiguration {

    @Autowired
    private ApplicationContext context;

    @Autowired
    private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class,
            search = SearchStrategy.CURRENT)
    @org.springframework.cloud.context.config.annotation.RefreshScope
    @Lazy
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
            EurekaClientConfig config, EurekaInstanceConfig instance,
            @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
        ApplicationInfoManager appManager;
        if (AopUtils.isAopProxy(manager)) {
            appManager = ProxyUtils.getTargetObject(manager);
        }
        else {
            appManager = manager;
        }
        CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
                config, this.optionalArgs, this.context);
        cloudEurekaClient.registerHealthCheck(healthCheckHandler);
        return cloudEurekaClient;
    }

    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class,
            search = SearchStrategy.CURRENT)
    @org.springframework.cloud.context.config.annotation.RefreshScope
    @Lazy
    public ApplicationInfoManager eurekaApplicationInfoManager(
            EurekaInstanceConfig config) {
        InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);
    }

    @Bean
    @org.springframework.cloud.context.config.annotation.RefreshScope
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(
            value = "spring.cloud.service-registry.auto-registration.enabled",
            matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
            CloudEurekaInstanceConfig instanceConfig,
            ApplicationInfoManager applicationInfoManager, @Autowired(
                    required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
                .with(eurekaClient).with(healthCheckHandler).build();
    }
}

关注@ConditionalOnRefreshScope

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ConditionalOnClass(RefreshScope.class)
@ConditionalOnBean(RefreshAutoConfiguration.class)
@ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true",
        matchIfMissing = true)
@interface ConditionalOnRefreshScope {

}

可以看到此处需要达成以下几点:

  1. 需要存在RefreshScope
  2. spring容器中需要存在RefreshAutoConfiguration的bean实例
  3. eureka.client.refresh.enable配置为true

预备知识

InstanceInfo

  1. 两个时间戳:

    • lastDirtyTimestamp:记录 instanceInfo 中任何信息被修改的时间。记录 instanceInfo 在 Client 端被修改的时间。

    • lastUpdatedTimestamp:记录 instanceInfo 的状态被修改的时间。记录 instanceInfo 在 Server 端被修改的时间。

  2. 三个状态相关方法:

public synchronized void setOverriddenStatus(InstanceStatus status) {
     if (this.overriddenStatus != status) {
         this.overriddenStatus = status;
     }
 }

该方法仅会在 Eureka Server 端被调用。

public synchronized void setStatusWithoutDirty(InstanceStatus status) {
     if (this.status != status) {
         this.status = status;
     }
 }

该方法仅会在 Eureka Server 端被调用,在修改 status 时不记录修改时间戳。

public synchronized InstanceStatus setStatus(InstanceStatus status) {
     if (this.status != status) {
         InstanceStatus prev = this.status;
         this.status = status;
         setIsDirty();
         return prev;
     }
     return null;
 }

该方法用于设置 instance 的服务状态。只有当 status 的状态为 UP 时,其它 Client 才能从 Eureka 中发现该 instance。该方法会在 Client 端被调用。

另外注意InstanceInfoequals方法,此处对比的是 InstanceInfo.id

Application和Applications

Application代表的是提供相同能力的服务的集合,其中维护了一个Set和一个Map:

@XStreamImplicit
private final Set<InstanceInfo> instances;
private final Map<String, InstanceInfo> instancesMap;

instancesMap的key是instance的id,value就是对应的InstanceInfo,而instances其实就是instancesMap的values。

Applications代表的是client从server中拉取到的注册信息表,里面包含了所有的Application:

@XStreamImplicit
private final AbstractQueue<Application> applications;
private final Map<String, Application> appNameApplicationMap;

Jersey

Eureka Client 与 Eureka Server 间的通信,及各个 Eureka Server 间的通信,使用的是 Jersey 框架完成的。
Jersey 框架是一个开源的 RESTful 框架。其功能与 SpringMVC 的相同。不同的是 SpringMVC 的处理器是 Controller,而 Jersey 的是 Resource。

EurekaClient

EurekaClient的实现类是CloudEurekaClient,根据前面说的我们进入到他的构造方法。

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
        EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
        ApplicationEventPublisher publisher) {
    super(applicationInfoManager, config, args);
    this.applicationInfoManager = applicationInfoManager;
    this.publisher = publisher;
    this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
            "eurekaTransport");
    ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

进入到super我们可以看到其父类是DiscoveryClient,跟代码到

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    // ...参数args处理
    // ...

    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    // ...监控数据

    // ...判断如果配置为不注册eureka就直接返回

    try {
            // 初始化定时任务的执行线程池
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        // 指定eureka client到eureka server的请求使用的eurekaJerseyClient
        scheduleServerEndpointTask(eurekaTransport, args);

          // ...     
    } 
    // ...

    // 重点步骤1:下载注册列表(服务发现)
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // ...

    // 重点步骤2:服务注册,这里是有条件的,默认是不在启动时强制注册的
    // clientConfig.shouldEnforceRegistrationAtInit 一般设置为false,因为如果开启且注册失败的话会导致程序启动失败
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // 重点步骤3:开启定时任务,这里定时任务有三个:
    // 1. 定时更新注册列表
    // 2. 定时续约(心跳),这里如果在心跳时返回服务未注册,那么回去执行服务注册
    // 3. 定时检测数据和续约信息(client状态)并发送给server
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}

从以上代码我们可以知道初始化EurekaClient也就是初始化DiscoveryClient对象主要三步:

  1. 下载服务列表(服务发现)
  2. 服务注册,这里是有条件的,默认是不在启动时强制注册的
  3. 启动定时任务,定时任务有3个。

eureka client 主要步骤

下面我们来详细分析三个步骤。

下载服务列表并更新本地缓存(服务发现)

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    // ...

        // 从本地缓存中获取注册表(Applications维护了Map<String/* appName */, Application>)
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0) // 第一次获取到的本地applications是个空Map
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            // 全量获取
            getAndStoreFullRegistry();
        } else {
                // 增量获取
            getAndUpdateDelta(applications);
        }
        // hashcode处理,这里是把所有的注册列表中的instance聚集到一个Map中,key为状态,value是数量,然后遍历Map拼接成一个字符串
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    // ...

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

从这里可以看到,程序启动时因为本地没有Applications信息,所以获取注册列表为空列表,触发全量获取,那么先来看全量获取:

private void getAndStoreFullRegistry() throws Throwable {
    // ...获取本地版本号

    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }

    // ...记录更新版本号
}

这里我们从前面已知EurekaClient和EurekaServer之间的交互是通过jersey实现的,我们找到实现类AbstractJerseyEurekaHttpClient

public EurekaHttpResponse<Applications> getApplications(String... regions) {
    return getApplicationsInternal("apps/", regions);
}

private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        // 构造请求内容
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        // 这里可以看到获取注册信息是使用的 GET 请求
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            // 使用 Applications 对象接收返回
            applications = response.getEntity(Applications.class);
        }
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        // ...记录日志,关闭流。
    }
}

这个getApplicationsInternal在增量获取注册表时也会用到。

我们知道后面第三步我们会开启三个定时任务,其中一个就是定时更新注册信息,而从fetchRegistry方法的判断:

if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0) // 第一次获取到的本地applications是个空Map
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta

中我们可以得到,一般配置文件中如果配置eureka.client.disable-delta=false的话,后续的下载注册列表动作就是增量同步了,而这个配置默认为false。
那么我们再来看看增量同步是如何操作的:

private void getAndUpdateDelta(Applications applications) throws Throwable {
    // 增量更新的注册列表
    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        // 获取到增量同步的注册信息
        delta = httpResponse.getEntity();
    }

    if (delta == null) {
        // 如果接收到的增量同步信息为空就执行全量同步,这里为空的原因是server端禁止增量同步
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 执行增量更新,看下面实现
                updateDelta(delta);
                // 根据更新后的注册表生成一个新的hashcode,还是根据instance状态构建成一个Map,然后遍历Map构建成一个字符串
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // 如果更新后的hashcode与server端的hashcode不同,那么就执行全量更新
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        // ...忽略更新
    }
}

// 执行增量更新
private void updateDelta(Applications delta) {
    int deltaCount = 0;
    for (Application app : delta.getRegisteredApplications()) {
        // 遍历server application
        for (InstanceInfo instance : app.getInstances()) {
            // 遍历server instance

            // 这里获取的是本地的applications
            Applications applications = getApplications();
            // 根据instance获得它所在的region
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);

            // 如果instance所在region和当前region不相同就将applications置为远程region的服务列表
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }

            ++deltaCount;
            // 根据更新的类型,执行相应的更新操作,这里增加和修改执行的操作都是相同的,都是执行的addInstance操作
            if (ActionType.ADDED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }

                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

            } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }

                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

            } else if (ActionType.DELETED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    existingApp.removeInstance(instance);
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }

    getApplications().setVersion(delta.getVersion());
    // 打乱instance(当前region列表)
    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

    for (Applications applications : remoteRegionVsApps.values()) {
        applications.setVersion(delta.getVersion());
        // 打乱instance(其他region列表)
        applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    }
}

// 增量更新后的客户端hashcode与server端的hashcode不同,执行全量更新
private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {
        // ...

    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    Applications serverApps = httpResponse.getEntity();

    // ...
}
public EurekaHttpResponse<Applications> getDelta(String... regions) {
    return getApplicationsInternal("apps/delta", regions);
}

这里可以看到增量同步和全量同步的过程中,从server端下载列表的区别仅仅是请求的url不同,而在接受返回处理时增量同步处理更想复杂:

  1. 如果接收返回为空,代表server端禁止增量同步,那么client端需要再进行一次全量同步
  2. 执行增量更新后,会重新计算hashcode,而计算hashcode实际上就是将所有的instance根据状态的不同集合成一个Map,key是状态,value是当前状态的instance个数,然后遍历这个Map构造成一个字符串,所以hashcode能够反映出当前所有服务列表
  3. 根据计算的hashcode和server端的hashcode作对比,如果不相同说明更新丢失,那么会再去执行一次全量更新

这里的增量更新过程比较复杂,遍历从server端更新的instance,并根据该instance在本地缓存中查找,如果没有的话就在本地缓存中建立该instance和所对应的application,如果同步到其他region的信息,也会更新本地缓存中的远程region下的服务注册信息。在同步过程中,根据操作类型执行相应的操作,如果是删除操作,就执行Application.removeInstance删除本地缓存中相应的instance信息,删除instance之后如果instance所在的application空了,那么也会相应的删除;如果是新增或修改的话,就执行Application.addInstance方法添加相应instance,为什么新增或修改都会执行这个方法呢,这里我们要看他的实现:

public void addInstance(InstanceInfo i) {
    instancesMap.put(i.getId(), i);
    synchronized (instances) {
        instances.remove(i);
        instances.add(i);
        isDirty = true;
    }
}

这里我们看到addInstance的操作是先删除后添加,所以在新增和修改时都会调用这个方法的原因,那么为什么要这样写呢,原因是InstanceInfo类中重写了equals方法,只对比了id属性,而Application在维护Map的同时也维护了一个Set,这个Set保存了所有的Instance,如果直接替换用到Instance.equals方法就没法更新,所以使用了以上的方式。

服务注册

上面我们说了如果配置了eureka.client.should-enforce-registration-at-init=true,就会在程序启动时进行服务注册,默认情况下该配置false,而且不建议修改为true,因为如果注册失败会导致程序启动失败。那么我们看看客户端是如何进行服务注册的:

boolean register() throws Throwable {
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        throw e;
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
}

从这里我们可以看到服务注册实际上就是向server端发送了一个POST请求,将本地的instanceInfo发送给了server端。

启动定时任务

前面我们说了开启的定时任务有三个:

  1. 定时更新注册列表
  2. 定时续约(心跳),这里如果在心跳时返回服务未注册,那么回去执行服务注册
  3. 定时检测数据和续约信息(client状态)并发送给server

我们先来看看定时任务的结构:

private void initScheduledTasks() {
    // 1. 定时更新注册列表
    if (clientConfig.shouldFetchRegistry()) {
        // 获取配置
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // 2. 心跳,定时续约
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // 3. 监控本机状态,定时同步给server
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

这里我们可以看到三个定时任务都是应用的ScheduledExecutorService.schedule(Runnable command, long delay, TimeUnit unit),而这个方法只是延迟执行,且只执行一次,那么是如何做到定时执行的呢?这就要看TimedSupervisorTask这个对象了:

public class TimedSupervisorTask extends TimerTask {
    // ...各种计数器

    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor executor;
    private final long timeoutMillis;
    private final Runnable task;

    private final AtomicLong delay;
    private final long maxDelay;

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;

        // ...初始化计数器
    }

    @Override
    public void run() {
        Future<?> future = null;
        try {
            // 正常执行,最长阻塞timeoutMillis后返回
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            timeoutCounter.increment();

            // 这里可以看到如果执行超时,会将delay属性*2后和配置中的最大延迟作比较,如果小于最大延迟,会将*2后的值放回delay中,否则将maxDelay放回delay
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            rejectedCounter.increment();
        } catch (Throwable e) {
            throwableCounter.increment();
        } finally {
            // 如果当前任务未完成,这里强制取消
            if (future != null) {
                future.cancel(true);
            }

            if (!scheduler.isShutdown()) {
                // 再重新启动一次定时任务,此时如果是超时的情况下,delay会被调整为*2,最大是maxDelay
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

从这里我们可以知道EurekaClient中的定时任务是每次执行一次之后在启动下一次的任务,这样来完成定时任务执行的。

这里我们先来看看定时更新注册表:

class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

@VisibleForTesting
void refreshRegistry() {
    try {
        // 是否从远程region下载信息
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // 从配置文件中读取需要拉取的远程region
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        // 常用配置下,一般不会下载远程region的注册表信息,所以这里一般为空
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) { // 如果当前拉取的远程region和配置文件中的不相同(配置文件中的更新了)
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        // 更新最新的需要下载的远程region
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    }
                }
            } else {
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }

        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        // ...
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}

更新注册表前面已经详细看过了,这里就不再赘述了。

再来看看定时续约的任务:

private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        // 向server端发送心跳
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            long timestamp = instanceInfo.setIsDirtyWithTime();
            // 如果当前服务未注册,这里server端就会返回404,然后执行注册
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        WebResource webResource = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("status", info.getStatus().toString())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.put(ClientResponse.class);
        EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity()) {
            eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
        }
        return eurekaResponseBuilder.build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
}

这里的续约比较简单,就是将当前服务的状态通过PUT请求发送给server端,这里需要注意的是,如果当前服务未注册(程序启动时不强制注册),这里server端会返回404,客户端接收到404返回后会进行注册,所以通常服务的注册发生在第一次续约过程中。

最后我们来看看监控本服务信息并同步给server的定时任务,该任务包含两个启动方法,我们先来看看任务的结构:

class InstanceInfoReplicator implements Runnable {
    private final DiscoveryClient discoveryClient;
    private final InstanceInfo instanceInfo;

    private final int replicationIntervalSeconds;
    private final ScheduledExecutorService scheduler;
    private final AtomicReference<Future> scheduledPeriodicRef;

    private final AtomicBoolean started;
    private final RateLimiter rateLimiter;
    private final int burstSize;
    private final int allowedRatePerMinute;

    InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
        this.discoveryClient = discoveryClient;
        this.instanceInfo = instanceInfo;
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                        .setDaemon(true)
                        .build());

        this.scheduledPeriodicRef = new AtomicReference<Future>();

        this.started = new AtomicBoolean(false);
        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;

        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
    }

    public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    public void stop() {
        shutdownAndAwaitTermination(scheduler);
        started.set(false);
    }

    private void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown();
        try {
            if (!pool.awaitTermination(3, TimeUnit.SECONDS)) {
                pool.shutdownNow();
            }
        } catch (InterruptedException e) {
            logger.warn("InstanceInfoReplicator stop interrupted");
        }
    }

    public boolean onDemandUpdate() {
        // 令牌桶实现,burstSize代表峰值流量,allowedRatePerMinute代表令牌桶中每分钟允许的流量
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {    
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            latestPeriodic.cancel(false);
                        }

                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }

    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
}

从前面的代码中我们可以看到第一处启动时在instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());,其中eureka.client.initial-instance-info-replication-interval-seconds属性默认是40秒。
第二处启动是在applicationInfoManager.registerStatusChangeListener(statusChangeListener);,向applicationInfoManager注册了监听器,当监听器发现当前client的状态发生了变化,会触发InstanceInfoReplicator.onDemandUpdate方法,从代码中可以发现,该方法触发会将当前正在等待执行的任务取消(软取消,如果已经执行,会等他执行完),然后重新执行。

具体执行刷新:

void refreshInstanceInfo() {
    // 刷新数据中心,就是client端的地址、IP等信息
    applicationInfoManager.refreshDataCenterInfoIfRequired();
    // 刷新续约信息,如果以下两个配置发生变化,就会触发刷新
    // 1. eureka.instance.lease-expiration-duration-in-seconds
    // 2. eureka.instance.lease-renewal-interval-in-seconds
    applicationInfoManager.refreshLeaseInfoIfRequired();

    InstanceStatus status;
    try {
        status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
    } catch (Exception e) {
        status = InstanceStatus.DOWN;
    }

    if (null != status) {
        // 这里会触发StatusChangeEvent,进而触发InstanceInfoReplicator.onDemandUpdate执行
        applicationInfoManager.setInstanceStatus(status);
    }
}

只要以上任意一个发生变化,就会触发刷新,在instanceInfoReplicator.run方法中就会执行重新注册流程。

  1. 服务下架

在自动配置类EurekaClientAutoConfiguration中:

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
        search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
        EurekaClientConfig config, EurekaInstanceConfig instance,
        @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
    ApplicationInfoManager appManager;
    if (AopUtils.isAopProxy(manager)) {
        appManager = ProxyUtils.getTargetObject(manager);
    }
    else {
        appManager = manager;
    }
    CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
            config, this.optionalArgs, this.context);
    cloudEurekaClient.registerHealthCheck(healthCheckHandler);
    return cloudEurekaClient;
}

可以看到EurekaClient,销毁方法为shutdown

@PreDestroy
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        // 释放监听器
        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
        // 停止三个定时任务
        cancelScheduledTasks();

        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
               // 更新当前服务状态为DOWN
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            // 向server端发起服务下线请求 DELETE
            unregister();
        }

        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();
    }
}
  1. 平滑下架

上面一点我们发现服务下架过程中虽然向eureka server端发起了服务下架的请求,那么在server端也会同步删除该instance的信息,但是因为eureka是注重AP的服务发现,每个client端会在本地保存一段时间的注册列表,所以这样做显然无法做到平滑下线。

EurekaClient配合actuator也可以做到平滑下线,EurekaClient实现了一个Endpoint监控,其bean定义是在ServiceRegistryAutoConfiguration完成的:

@Configuration(proxyBeanMethods = false)
public class ServiceRegistryAutoConfiguration {

    @ConditionalOnBean(ServiceRegistry.class)
    @ConditionalOnClass(Endpoint.class)
    protected class ServiceRegistryEndpointConfiguration {

        @Autowired(required = false)
        private Registration registration;

        @Bean
        @ConditionalOnEnabledEndpoint
        public ServiceRegistryEndpoint serviceRegistryEndpoint(
                ServiceRegistry serviceRegistry) {
            ServiceRegistryEndpoint endpoint = new ServiceRegistryEndpoint(
                    serviceRegistry);
            endpoint.setRegistration(this.registration);
            return endpoint;
        }
    }
}
@Endpoint(id = "service-registry")

// ...

@WriteOperation // 这里代表使用POST方式请求
public ResponseEntity<?> setStatus(String status) {
    // ...

    this.serviceRegistry.setStatus(this.registration, status);
    return ResponseEntity.ok().build();
}
public void setStatus(EurekaRegistration registration, String status) {
    InstanceInfo info = registration.getApplicationInfoManager().getInfo();

    if ("CANCEL_OVERRIDE".equalsIgnoreCase(status)) {
        // 这个操作会将overriddenStatus置为UNKNOWN
        registration.getEurekaClient().cancelOverrideStatus(info);
        return;
    }

    InstanceInfo.InstanceStatus newStatus = InstanceInfo.InstanceStatus.toEnum(status);
    registration.getEurekaClient().setStatus(newStatus, info);
}
public EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info) {
    String urlPath = "apps/" + appName + '/' + id + "/status";
    ClientResponse response = null;
    try {
        Builder requestBuilder = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString())
                .getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.delete(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
}
public EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info) {
    String urlPath = "apps/" + appName + '/' + id + "/status";
    ClientResponse response = null;
    try {
        Builder requestBuilder = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("value", newStatus.name())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString())
                .getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.put(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
}
正文到此结束