原创

Spring Cloud源码解析2-Eureka Server

启动类

按照逻辑先从spring.factories文件入手,找到启动类EurekaServerAutoConfiguration,先来看改启动类的注解:

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")

从这里可以看到,要启动EurekaServer需要spring容器中包含EurekaServerMarkerConfiguration.Marker的bean实例,我们再来看EnableEurekaServer

@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
    @Bean
    public Marker eurekaServerMarkerBean() {
        return new Marker();
    }
    class Marker {
    }
}

从这里可以看出,想要启动EurekaServer必须在启动类中添加注解EnableEurekaServer

在该启动类中有一部分比较重要:

@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
        ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
      // 从配置中读取eureka.instance.registry.expected-number-of-clients-sending-renews,默认为1
            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
        ServerCodecs serverCodecs,
        ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
    return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
            this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
            replicationClientAdditionalFilters);
}

@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
        PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
            registry, peerEurekaNodes, this.applicationInfoManager);
}

这里我们不做详解,首先我们要明确后面要用到的类:

  1. 对于com.netflix.eureka.registry.InstanceRegistry接口实现类是org.springframework.cloud.netflix.eureka.server.InstanceRegistry

  2. PeerEurekaNodes的实现类是RefreshablePeerEurekaNodes

  3. EurekaServerContext的实现类是DefaultEurekaServerContext

  4. EurekaServerContext在初始化阶段

    @PostConstruct
    @Override
    public void initialize() {
      // ...
      registry.init(peerEurekaNodes);
      // ...
    }
    

    此处会执行注册表初始化:

    @Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
      this.numberOfReplicationsLastMin.start();
      this.peerEurekaNodes = peerEurekaNodes;
      // 重点:创建ResponseCache,主要有两个:
      // 1. readWriteCacheMap
      // 2. readOnlyCacheMap
      initializedResponseCache();
      // 启动定期更新自我保护阈值,当发现续约的实例小于阈值时,更新阈值以判断是否启动自我保护
      scheduleRenewalThresholdUpdateTask();
      // 如果开启了从其他region获取注册表功能且配置了eureka.server.remote-region-urls-with-name
      // 这里会在server启动时从配置的远程region获取它的注册表
      initRemoteRegionRegistry();
     // ...
    }
    

    其中,最主要的是initializedResponseCache

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        // 这里从配置文件中获得是否使用只读Map,默认为true
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
         // ...
         // 初始化构建读写Map,这里读写Map使用的是guava的LoadingCache实现
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key, Value>() {
                            @Override
                            public void onRemoval(RemovalNotification<Key, Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader<Key, Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
    
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
         // ...
    }
    // 注意:此处不做readOnlyCacheMap的初始化,从代码中可以看出,这里只是定期从readOnlyCacheMap中取出数据和readWriteCacheMap中数据作对比,如果不一致就更新
    // 这个定时任务的本质是定期保持readOnlyCacheMap数据与readWriteCacheMap数据同步
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                for (Key key : readOnlyCacheMap.keySet()) {
                    // ...
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    }
                     // ...catch exception
                }
            }
        };
    }
    

    需要注意的是,此处只保持了两个Map数据的一致性,而readOnlyCacheMap中的数据是怎样写入的后面我们会详细说。

三个Resource

从前面EurekaClient的源码解析中我们知道Client和Server之间的交互与Server和Server之间的交互都是通过Jersey实现的,而Jersey的控制器使用的是Resource,那么我们就来详细了解下Server端处理Client端发来的请求(Server到Server的请求也是通过这几个Resource实现),是如何处理的。

首先我们先了解下有几个处理请求的Resource,这里我们整理如下:

  • ApplicationsResource:处理注册表相关的请求,有两个:全量获取注册表和增量获取注册表,ApplicationResource就是根据ApplicationsResource+appId提供能力的。

  • ApplicationResource:主要处理的是应用相关的请求,有两个:根据应用Id(服务名称)获取某一个应用(一个实例的集群)的注册信息和服务注册(新增一个实例),InstanceResource就是根据ApplicationResource+instanceId提供能力的。

  • InstanceResource:主要处理的是实例状态相关的请求,比如获取实例信息、续约(心跳)、修改状态、服务下线等操作。

InstanceResource

实例状态修改

@PUT
@Path("status")
public Response statusUpdate(
        @QueryParam("value") String newStatus,
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    try {
        // 根据应用名称和实例ID从注册表中获取实例
        if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
            return Response.status(Status.NOT_FOUND).build();
        }
        // 修改状态
        boolean isSuccess = registry.statusUpdate(app.getName(), id,
                InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                "true".equals(isReplication));

        // ...处理返回
    } 
    // ...catch exception
}

registry.getInstanceByAppAndId这里会经常调用,这里我们需要详细分析,通过详解我们可以深入理解EurekaServer的自我保护机制。

public InstanceInfo getInstanceByAppAndId(String appName, String id, boolean includeRemoteRegions) {
    // Server端保持的注册表形式是Map<String/* appName */, Map<String/* instanceId */, Lease<InstanceInfo>/* 实例续约信息 */>>

    // 根据appName从当前region中获取应用下所有实例续约信息
    Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName);
    Lease<InstanceInfo> lease = null;
    if (leaseMap != null) {
        // 根据instanceId获取实例续约信息
        lease = leaseMap.get(id);
    }
    if (lease != null && (!isLeaseExpirationEnabled() || !lease.isExpired())) {
        /*
          这里比较绕,能够查到该instance的续约信息且
            1. 配置文件中配置启动了自我保护且因为续约数量不达标触发了自我保护(那么即便续约信息即便过期也会返回)
            2. 配置文件中没有启动自我保护或者启动保护但续约数量达标没有触发,就需要判断续约信息是否过期,如果续约信息没过期(lease.isExpired()==false)才返回
         */
        return decorateInstanceInfo(lease);
    } else if (includeRemoteRegions) {
        // 从远程region获取服务信息(这里就不再是续约信息了,因为续约关系是建立在同一个region中的client和server端的,而这里只不过是同步到的服务信息)
        for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
            Application application = remoteRegistry.getApplication(appName);
            if (application != null) {
                return application.getByInstanceId(id);
            }
        }
    }
    return null;
}
// 组装成InstanceInfo返回
private InstanceInfo decorateInstanceInfo(Lease<InstanceInfo> lease) {
    InstanceInfo info = lease.getHolder();
    // 默认配置
    int renewalInterval = LeaseInfo.DEFAULT_LEASE_RENEWAL_INTERVAL;
    int leaseDuration = LeaseInfo.DEFAULT_LEASE_DURATION;

    // 个性化配置
    if (info.getLeaseInfo() != null) {
        renewalInterval = info.getLeaseInfo().getRenewalIntervalInSecs();
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }

    info.setLeaseInfo(LeaseInfo.Builder.newBuilder()
            .setRegistrationTimestamp(lease.getRegistrationTimestamp())
            .setRenewalTimestamp(lease.getLastRenewalTimestamp())
            .setServiceUpTimestamp(lease.getServiceUpTimestamp())
            .setRenewalIntervalInSecs(renewalInterval)
            .setDurationInSecs(leaseDuration)
            .setEvictionTimestamp(lease.getEvictionTimestamp()).build());

    info.setIsCoordinatingDiscoveryServer();
    return info;
}
public boolean isLeaseExpirationEnabled() {
    // 读取配置文件的eureka.server.enable-self-preservation,默认为true
    if (!isSelfPreservationModeEnabled()) {
        // 这里如果关闭了自我保护,就返回true
        return true;
    }
    // 这里是开启了自我保护,如果续约数量<阈值就返回false,如果续约数量>阈值就返回true
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

这段代码有三点需要关注的:

  1. 首先要理解Server端注册表与Client端注册表不同,Client端注册表信息使用Applications,而在Server端使用的Map<String/* appName */, Map<String/* instanceId */, Lease<InstanceInfo>/* 实例续约信息 */>>形式组成,不是直接存储的InstanceInfo,而是存储它的续约信息Lease,可以通过Lease.getHolder得到相应的InstanceInfo
  2. if (lease != null && (!isLeaseExpirationEnabled() || !lease.isExpired())) {这里的判断因为内外层使用了多个非操作导致比较绕,这里我们分析什么时候返回结果:
    • 配置文件中配置启动了自我保护且因为续约数量不达标触发了自我保护(那么即便续约信息即便过期也会返回)
    • 配置文件中没有启动自我保护或者启动保护但续约数量达标没有触发,就需要判断续约信息是否过期,如果续约信息没过期(lease.isExpired()==false)才返回
  3. 如果允许从远程region中获取实例信息,这里也会从remoteRegistry,在上面讲启动类的过程中我们看到Server端启动时,根据配置,从远程服务器上拉取到InstanceInfo并记录到remoteRegistry,因为这里从远程服务器上拉取到的信息就是Applications,所以不用像本地注册表这样需要装饰后返回。

前面我们知道registry的实现类是InstanceRegistry,然后我们看registry.statusUpdate可以找到其父类的实现方法:

public boolean statusUpdate(final String appName, final String id,
                            final InstanceStatus newStatus, String lastDirtyTimestamp,
                            final boolean isReplication) {
    // 进行本地状态修改
    if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
        // 同步给其他的server节点
        replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
        return true;
    }
    return false;
}

这里留意这种写法,在server端所有的写操作其实都是这种写法,先做写操作,然后同步到其他的server节点,如果是同步到其他节点的操作,isReplication即为true。

我们先来看看如果进行状态修改的:

public boolean statusUpdate(String appName, String id,
                            InstanceStatus newStatus, String lastDirtyTimestamp,
                            boolean isReplication) {
    try {
        // 注意这里的读写锁,我们在读完所有的请求处理后,会进行详细说明,两个方面:
        // 1. 加锁的时机
        // 2. 修改状态加读锁的原因
        read.lock();
        // 从注册表中获取续约信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> lease = null;
        if (gMap != null) {
            lease = gMap.get(id);
        }
        if (lease == null) {
            return false;
        } else {
            // 这里任何操作其实都是一种形式的续约,所以这里更新续约的时间
            lease.renew();
            InstanceInfo info = lease.getHolder();
            // ...
            if ((info != null) && !(info.getStatus().equals(newStatus))) {
                // 这里因为是状态的修改,所以如果新状态是UP的话,说明Client服务重新上架,修改它的上架时间
                if (InstanceStatus.UP.equals(newStatus)) {
                    lease.serviceUp();
                }
                // 修改overriddenStatus
                overriddenInstanceStatusMap.put(id, newStatus);
                info.setOverriddenStatus(newStatus);

                long replicaDirtyTimestamp = 0;
                info.setStatusWithoutDirty(newStatus);
                if (lastDirtyTimestamp != null) {
                    replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                }
                // 如果client端传入的lastDirtyTimestamp>缓存中的lastDirtyTimestamp,更新本地缓存
                if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                    info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                }
                // 这里更新了InstanceInfo的ActionType,在客户端拉取更新时,会根据相应的ActionType做出相应的修改
                info.setActionType(ActionType.MODIFIED);
                // 这里注意recentlyChangedQueue,后面我们会对他进行详细讲解,概念就是最近更新队列,在client端增量更新时,实际上就是从recentlyChangedQueue拉取
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                info.setLastUpdatedTimestamp();
                invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
            }
            return true;
        }
    } finally {
        read.unlock();
    }
}

这里主要关注两点:

  1. 方法开始节点的读锁,两点疑问:

    • 为什么加锁
    • 写操作加读锁的原因

    这两点疑问后面我们在过完所有处理后总结来看

  2. 关于recentlyChangedQueue,这个队列字面意思是最近修改队列,在客户端发起增量更新请求时,实际上不是遍历server端本地注册表,而是将recentlyChangedQueue返回给Client,Server端是怎样进行维护的也是需要将整个过程完全解析后才能精确的讲解。

其他的就是根据上传的newStatuslastDirtyTimestamp修改本地的overriddenStatusstatuslastDirtyTimestamp,最后更新lastUpdatedTimestamp

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 如果为空或者从其他server同步而来,就不会继续同步其他节点了
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 过滤自己
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 同步给其他节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        switch (action) {
            // ...
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            // ...
        }
    }
    // ...catch exception
}
public void statusUpdate(final String appName, final String id,
                         final InstanceStatus newStatus, final InstanceInfo info) {
    // 计算
    long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
    batchingDispatcher.process(
            taskId("statusUpdate", appName, id),
            new InstanceReplicationTask(targetHost, Action.StatusUpdate, info, null, false) {
                @Override
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.statusUpdate(appName, id, newStatus, info);
                }
            },
            expiryTime
    );
}
// 构造同步请求到其他的server节点
public EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info) {
    String urlPath = "apps/" + appName + '/' + id + "/status";
    ClientResponse response = null;
    try {
        Builder requestBuilder = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("value", newStatus.name())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString())
                .getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.put(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        // ...close response
    }
}

删除overridden状态(重置节点状态)

这里首先要清楚一个概念,在服务注册完成后,server端中的overriddenStatus状态是UNKNOWNstatus状态shi UP、DOWN...,而这时,server端的overriddenStatusMap中是没有这个节点记录的。所以什么时候才需要删除overriddenStatus呢?答案就是,只有修改过状态的节点彩会产生overriddenStatus!=UNKNOWN的情况,也就是说删除overridden状态实际就是删除server端的overriddenStatusMap中的节点状态,并将该节点的overriddenStatus状态重置为UNKNOWN,而该节点的实际状态status字段则由参数中的value属性而定。

@DELETE
@Path("status")
public Response deleteStatusUpdate(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("value") String newStatusValue,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    try {
        // 根据应用名称和实例ID从注册表中获取实例
        if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
            return Response.status(Status.NOT_FOUND).build();
        }

        InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);
        // 删除overriddenStatus
        boolean isSuccess = registry.deleteStatusOverride(app.getName(), id,
                newStatus, lastDirtyTimestamp, "true".equals(isReplication));

        // ...build response
    }
    // ...catch exception
}

这里删除overriddenStatus有两个重要的步骤:

  1. 根据appNameinstanceId查找instanceInfo
  2. 删除实例的overriddenStatus

其中第一步查找instanceInfo前面我们已经说过了,这里就不再赘述,继续向下跟踪,这里注意InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);这里删除overriddenStatus实际上就是将状态置为InstanceStatus.UNKNOWN

public boolean deleteStatusOverride(String appName, String id,
                                    InstanceStatus newStatus,
                                    String lastDirtyTimestamp,
                                    boolean isReplication) {
    // 删除overriddenStatus
    if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
        // 同步到其他eureka server节点
        replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}
public boolean deleteStatusOverride(String appName, String id,
                                    InstanceStatus newStatus,
                                    String lastDirtyTimestamp,
                                    boolean isReplication) {
    try {
        read.lock();
        // 从注册表中获取续约信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> lease = null;
        if (gMap != null) {
            lease = gMap.get(id);
        }
        if (lease == null) {
            return false;
        } else {
            // 这里任何操作其实都是一种形式的续约,所以这里更新续约的时间
            lease.renew();
            InstanceInfo info = lease.getHolder();

            // ...

            // 从overriddenInstanceStatusMap中删除该实例的状态
            InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id);
            if (currentOverride != null && info != null) { // 这里代表该实例曾经修改过状态
                // 将info的overriddenStatus和status修改为UNKNOWN
                info.setOverriddenStatus(InstanceStatus.UNKNOWN);
                info.setStatusWithoutDirty(newStatus);
                // 更新lastDirtyTimestamp
                long replicaDirtyTimestamp = 0;
                if (lastDirtyTimestamp != null) {
                    replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                }
                if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                    info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                }
                // 这里更新了InstanceInfo的ActionType,在客户端拉取更新时,会根据相应的ActionType做出相应的修改
                info.setActionType(ActionType.MODIFIED);
                // 这里注意recentlyChangedQueue,后面我们会对他进行详细讲解,概念就是最近更新队列,在client端增量更新时,实际上就是从recentlyChangedQueue拉取
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                // 更新lastUpdatedTimestamp
                info.setLastUpdatedTimestamp();
                invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
            }
            return true;
        }
    } finally {
        read.unlock();
    }
}

删除overriddenStatus的操作和修改状态的过程类似,只不过这里是删除overriddenInstanceStatusMap的值,然后将instanceInfo中的状态还原为UNKNOWN,最后更新时间戳,然后将该Lease<InstanceInfo>添加到recentlyChangedQueue

在同步到其他Server节点的过程中,根据操作类型的不同区分:

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        // ...
        switch (action) {
            // ...
            case DeleteStatusOverride:
                    // 从本地注册表获取instanceInfo
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    // 删除其他Server节点的overriddenStatus
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    }
    // ...catch exception
}
public EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info) {
    String urlPath = "apps/" + appName + '/' + id + "/status";
    ClientResponse response = null;
    try {
        Builder requestBuilder = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString())
                .getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.delete(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        // ...close response
    }
}

实例续约

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // 续约
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    if (!isSuccess) {
        // 这里返回false是指该实例信息没有查询到,返回404
        // 前面我们说了客户端在续约过程中接收到服务端返回404后会执行注册
        return Response.status(Status.NOT_FOUND).build();
    }

    Response response;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        // 这里要结合上面的renew的过程来看,首先这里重点关注的对象是两个eureka server之间调用
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    // 这里要结合后面的代码一起看,因为是server之间同步时使用,这里的overriddenStatus是发起同步请求的server中的overriddenStatus
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            // 修改本地实例的overriddenStatus
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    return response;
}

先来看第一个步骤:续约

public boolean renew(final String appName, final String serverId, boolean isReplication) {
    // 获取本地注册表并转换成Applications
    // 从Applications找到当前续约的instanceId,找到后发出一个EurekaInstanceRenewedEvent
    List<Application> applications = getSortedApplications();
    for (Application input : applications) {
        if (input.getName().equals(appName)) {
            InstanceInfo instance = null;
            for (InstanceInfo info : input.getInstances()) {
                if (info.getId().equals(serverId)) {
                    instance = info;
                    break;
                }
            }
            publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                    instance, isReplication));
            break;
        }
    }
    return super.renew(appName, serverId, isReplication);
}
public boolean renew(final String appName, final String id, final boolean isReplication) {
    // 续约
    if (super.renew(appName, id, isReplication)) {
        // 同步到其他Server节点
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}
public boolean renew(String appName, String id, boolean isReplication) {
    // 获取请求续约的实例续约信息
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        // 如果没有找到就返回false
        return false;
    } else {
        // 通过实例续约信息得到该实例信息
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // 通过计算规则计算出instance状态,注意此处instance和lease都是用的本地注册表中的信息
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                // 如果计算得到的状态是UNKNOWN,返回为false,说明之前该实例已经执行过删除状态,不再接受续约请求了,需要重新注册
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                // 如果计算得到的状态与注册表中的实例状态不相同,则以当前计算出的状态为准
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        // 更新续约时间
        leaseToRenew.renew();
        return true;
    }
}
protected InstanceInfo.InstanceStatus getOverriddenInstanceStatus(InstanceInfo r,
                                                                Lease<InstanceInfo> existingLease,
                                                                boolean isReplication) {
    // 获取规则并执行
    InstanceStatusOverrideRule rule = getInstanceInfoOverrideRule();
    return rule.apply(r, existingLease, isReplication).status();
}
protected InstanceStatusOverrideRule getInstanceInfoOverrideRule() {
    return this.instanceStatusOverrideRule;
}

从这里可以看到这个规则使用的是PeerAwareInstanceRegistryImpl.instanceStatusOverrideRule属性,那么这个属性是什么时候赋值的呢?

这里就要回顾在启动类时我们有这段代码:

@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
        ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

可以看到在创建InstanceRegistry实例过程中,执行构造方法:

public InstanceRegistry(EurekaServerConfig serverConfig,
        EurekaClientConfig clientConfig, ServerCodecs serverCodecs,
        EurekaClient eurekaClient, int expectedNumberOfClientsSendingRenews,
        int defaultOpenForTrafficCount) {
    super(serverConfig, clientConfig, serverCodecs, eurekaClient);

    this.expectedNumberOfClientsSendingRenews = expectedNumberOfClientsSendingRenews;
    this.defaultOpenForTrafficCount = defaultOpenForTrafficCount;
}

继续执行父类构造方法:

public PeerAwareInstanceRegistryImpl(
        EurekaServerConfig serverConfig,
        EurekaClientConfig clientConfig,
        ServerCodecs serverCodecs,
        EurekaClient eurekaClient) {
    super(serverConfig, clientConfig, serverCodecs);
    this.eurekaClient = eurekaClient;
    this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
    // 初始化状态计算规则
    this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
            new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}

从这里我们可以看到使用的是FirstMatchWinsCompositeRule,而这是一个复合规则,里面包装了DownOrStartingRule、OverrideExistsRule、LeaseExistsRule

回到AbstractInstanceRegistry.getOverriddenInstanceStatus方法我们看到,就是执行了FirstMatchWinsCompositeRule.apply方法:

public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                  Lease<InstanceInfo> existingLease,
                                  boolean isReplication) {
    // 依次执行DownOrStartingRule、OverrideExistsRule、LeaseExistsRule的apply方法
    for (int i = 0; i < this.rules.length; ++i) {
        StatusOverrideResult result = this.rules[i].apply(instanceInfo, existingLease, isReplication);
        if (result.matches()) {
            return result;
        }
    }
    // 这里的defaultRule是AlwaysMatchInstanceStatusRule
    // 如果以上规则都不匹配,就执行AlwaysMatchInstanceStatusRule.apply方法
    return defaultRule.apply(instanceInfo, existingLease, isReplication);
}
DownOrStartingRule

public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                  Lease<InstanceInfo> existingLease,
                                  boolean isReplication) {
    // 如果instanceInfo的status是DOWN或者STARTING时返回MATCH,否则NO_MATCH
    if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
            && (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
        return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
    }
    return StatusOverrideResult.NO_MATCH;
}
OverrideExistsRule

public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
    // 这里的statusOverrides指的是registry中的overriddenStatusMap
    // 这里的意思是如果overriddenStatusMap中包含该实例ID,也就是该实例曾经被修改过状态,就返回被修改后的状态,否则返回NO_MATCH
    InstanceInfo.InstanceStatus overridden = statusOverrides.get(instanceInfo.getId());
    if (overridden != null) {
        return StatusOverrideResult.matchingStatus(overridden);
    }
    return StatusOverrideResult.NO_MATCH;
}
LeaseExistsRule

public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                  Lease<InstanceInfo> existingLease,
                                  boolean isReplication) {
    // 如果是从其他server端同步来的请求,就直接返回NO_MATCH
    if (!isReplication) { // 从Client端发起的请求
        // 取到本地注册表中的instanceInfo,如果本地缓存中的实例状态时UP或者OUT_OF_SERVICE就返回,否则返回NO_MATCH
        InstanceInfo.InstanceStatus existingStatus = null;
        if (existingLease != null) {
            existingStatus = existingLease.getHolder().getStatus();
        }
        if ((existingStatus != null)
                && (InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(existingStatus)
                || InstanceInfo.InstanceStatus.UP.equals(existingStatus))) {
            return StatusOverrideResult.matchingStatus(existingLease.getHolder().getStatus());
        }
    }
    return StatusOverrideResult.NO_MATCH;
}

所以这里可以总结服务续约时计算状态的规则:

  1. 如果本地注册表中的服务状态时DOWN或者STARTING时,直接返回。
  2. 如果本地注册表中的服务状态是UP、OUT_OF_SERVICE或者UNKNOWN时,需要看overriddenStatusMap中是否包含该实例,也就是说看是否修改过该实例的状态,如果有的话,以overriddenStatusMap中的状态为准。
  3. 如果本地注册表中的服务状态是UP、OUT_OF_SERVICE或者UNKNOWN时,且overriddenStatusMap中不包含该实例,就看本地注册表中该实例的状态,如果本地注册表中该实例状态为UP或者OUT_OF_SERVICE的话就返回。
  4. 如果以上都不是,那就指的是该实例状态被删除,重置为UNKNOWN,那就直接返回UNKNOWN。

下面我们再来看第二步:同步renew到其他server节点,续约过程的同步请求处理比较复杂,需要完整的看整个流程:

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            // ...
            case Heartbeat:
                    // 获取overriddenInstanceStatusMap中该实例的状态
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            // ...
        }
    }
    // ...catch exception
}
public void heartbeat(final String appName, final String id,
                      final InstanceInfo info, final InstanceStatus overriddenStatus,
                      boolean primeConnection) throws Throwable {
    if (primeConnection) {
        // 如果是首次连接,就直接发送心跳,不作任何处理
        replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        return;
    }
    ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
        @Override
        public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
            return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }

        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            // 这里的执行要结合完整的续约流程来看,包括InstanceResource中对结果的处理和replicationClient.sendHeartBeat中的处理
            super.handleFailure(statusCode, responseEntity);
            if (statusCode == 404) {
                // 首先处理404,和client端续约接收404处理一样,就向对方发起注册请求
                if (info != null) {
                    register(info);
                }
            } else if (config.shouldSyncWhenTimestampDiffers()) {
                // 如果配置了eureka.server.sync-when-timestamp-differs=true(默认即为true)
                // 在对方server返回当前server 409(Conflict)的同时也会返回对方所持有的instanceInfo,replicationClient中会处理这种情况并返回对方的instanceInfo
                // 这里就是对比本地和对方instanceInfo.lastDirtyTimestamp然后更新本地注册表中的instanceInfo
                InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                if (peerInstanceInfo != null) {
                    syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                }
            }
        }
    };
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        WebResource webResource = jerseyClient.getClient().resource(serviceUrl)
                .path(urlPath)
                .queryParam("status", info.getStatus().toString())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        // 上一个步骤中拿到了本地缓存中的overriddenStatus,这里发出同步请求时附加上
        // 在InstanceResource中会根据overriddenStatus和lastDirtyTimestamp判断是否要更新他本地的实例状态
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
        InstanceInfo infoFromPeer = null;
        // 当对方server返回409(Status.CONFLICT)时,将返回的instanceInfo(对方server持有的)返回,返回409的原因可见InstanceResource
        if (response.getStatus() == Status.CONFLICT.getStatusCode() && response.hasEntity()) {
            infoFromPeer = response.getEntity(InstanceInfo.class);
        }
        return anEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build();
    } finally {
        // ...close response
    }
}
private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
    try {
        if (infoFromPeer != null) {
            if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
                // 如果返回的instanceInfo与本地注册表中的instanceInfo状态不一致,就更新本地注册表中的状态
                registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
            }
            // 向本地注册表中注册该实例,这里的实例信息是从对方server中返回的新的instanceInfo
            registry.register(infoFromPeer, true);
        }
    }
    // ...catch exception
}

最后回到InstanceResource中看看续约完成之后的操作:

Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
    // 这里要结合上面的renew的过程来看,首先这里重点关注的对象是两个eureka server之间调用
    response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
    if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                // 这里要结合后面的代码一起看,因为是server之间同步时使用,这里的overriddenStatus是发起同步请求的server中的overriddenStatus
            && (overriddenStatus != null)
            && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
            && isFromReplicaNode) {
        // 修改本地实例的overriddenStatus
        registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
    }
} else {
    response = Response.ok().build();
}
private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
    InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
    if (appInfo != null) {
        if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
            Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};

            if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                // 如果发起同步Server的lastDirtyTimestamp>本地注册表中的实例lastDirtyTimestamp,说明本地的注册信息延迟
                // 直接返回404,这样可以让发起同步的Server再次发起注册请求,更新本地instanceInfo
                return Response.status(Status.NOT_FOUND).build();
            } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                if (isReplication) {
                    // 如果发起同步的server的lastDirtyTimestamp<本地注册表中的lastDirtyTimestamp,说明对方的信息延迟
                    // 这里返回409(Status.CONFLICT)并且返回本地注册表中的instanceInfo,发起方在接受到之后会拿这个instanceInfo更新他本地的注册表
                    return Response.status(Status.CONFLICT).entity(appInfo).build();
                } else {
                    // client端请求的信息延迟不需要关注,一般是因为网络抖动导致,客户端一定拥有最新的instanceInfo信息
                    return Response.ok().build();
                }
            }
        }

    }
    return Response.ok().build();
}
总结

EurekaServer处理续约过程比较复杂,这里我们来做下总结:

  1. A Server端接收Client端发起的续约请求,A Server端处理续约请求,如果不存在该实例信息,返回404后,Client端发起注册流程。
  2. 当A Server端存在该实例,就根据规则计算本地注册表中的instance状态,如果计算后的状态与本地状态不一致时,更新本地注册表中的实例状态。
  3. 处理续约后,将该续约请求同步给B Server端,在发起请求的过程中,附增上A Server端的overriddenStatus、status和lastDirtyTimestamp。
  4. 当B Server端接收到请求后,同样判断本地注册表中是否包含该实例信息,如果不包含返回404,A Server端接收到404后会和Client端相同发起注册流程。
  5. 如果B Server端中包含该实例信息,与A Server端处理相同,此处在完成续约处理后不需同步给其他Server。
  6. 当B Server端完成续约后,会针对请求参数和返回结果进行以下处理。
    1. 如果B Server端接收到A Server端的lastDirtyTimestamp,会和本地的lastDirtyTimestamp作对比,如果A Server端传递的lastDirtyTimestamp > 本地的lastDirtyTimestamp,说明本地的续约信息延迟,那么修改返回为404,并且修改本地InstanceInfo的状态,这样再通知A Server端向B Server端发起注册请求的同时更加确保本地注册表的实时性。
    2. 相反如果A Server端传递的lastDirtyTimestamp < 本地的lastDirtyTimestamp,说明A Server端的续约信息延迟,那么B Server端会返回409并且将本地的InstanceInfo返回给A Server端,A Server端收到409返回时,会用B Server端返回的InstanceInfo更新A Server端本地注册表。
    3. 如果两个Server的时间戳相同代表信息一致,那么B Server端直接返回200,A Server端不作处理。
  7. A Server端处理完同步之后返回Client。

服务下线(删除实例)

@DELETE
public Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    try {
        boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication));

        if (isSuccess) {
            return Response.ok().build();
        } else {
            return Response.status(Status.NOT_FOUND).build();
        }
    }
    // ...catch exception
}
public boolean cancel(String appName, String serverId, boolean isReplication) {
    // 发起EurekaInstanceCanceledEvent
    publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
    return super.cancel(appName, serverId, isReplication);
}
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    // 服务下线
    if (super.cancel(appName, id, isReplication)) {
        // 同步到其他Server
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        synchronized (lock) {
            // 更新自我保护触发阈值,这里在服务注册和服务下架中使用
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
        return true;
    }
    return false;
}

此处有三个步骤:

  1. 执行本地注册表的服务下线。
  2. 同步服务下线请求到其他Server。
  3. 更新EurekaServer自我保护触发的阈值。

先来看本地注册表服务下架:

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        read.lock();
        CANCEL.increment(isReplication);
        // 从本地注册表中删除该实例续约信息,并返回
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        synchronized (recentCanceledQueue) {
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        }
        // 从overriddenInstanceStatusMap中删除
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (leaseToCancel == null) {
            return false;
        } else {
            // 设置剔除时间
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                // 将该实例续约信息放到recentlyChangedQueue
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                // 修改更新时间
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            invalidateCache(appName, vip, svip);
            return true;
        }
    } finally {
        read.unlock();
    }
}

下架操作相对简单,再来看看同步操作:

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            // ...
        }
    }
    // ...catch exception
}
public void cancel(final String appName, final String id) throws Exception {
    long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
    batchingDispatcher.process(
            taskId("cancel", appName, id),
            new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                @Override
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.cancel(appName, id);
                }

                @Override
                public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                    super.handleFailure(statusCode, responseEntity);
                    if (statusCode == 404) {
                        logger.warn("{}: missing entry.", getTaskName());
                    }
                }
            },
            expiryTime
    );
}
// 构造同步下架请求
public EurekaHttpResponse<Void> cancel(String appName, String id) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder.delete(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        // ...close response
    }
}

最后来看看是如何更新自我保护触发阈值的:

synchronized (lock) {
    // 更新自我保护触发阈值,这里在服务注册和服务下架中使用,在server服务启动时开启定时任务自动检测发现心跳数少于阈值时也会启动
    if (this.expectedNumberOfClientsSendingRenews > 0) {
        // 这里服务下架会删除
        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
        updateRenewsPerMinThreshold();
    }
}
protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

这里的expectedNumberOfClientsSendingRenews属性,在启动类生成PeerAwareInstanceRegistry的bean过程中通过读取eureka.instance.registry.expected-number-of-clients-sending-renews,默认是1。

这里server端判断启动自我保护阈值公式:

阈值 = expectedNumberOfClientsSendingRenews * 每分钟服务应该上报的次数 * 阈值系数(默认0.85)

ApplicationResource

服务注册(某一个应用新增实例)

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    // 校验注册的实例属性信息
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getIPAddr())) {
        return Response.status(400).entity("Missing ip address").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }

    // 数据中心配置信息
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            }
        }
    }
    // 注册
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

重点在于注册流程:

public void register(final InstanceInfo info, final boolean isReplication) {
    publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
    super.register(info, isReplication);
}
public void register(final InstanceInfo info, final boolean isReplication) {
    // 默认90秒
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 将该实例注册到本地注册表
    super.register(info, leaseDuration, isReplication);
    // 同步到其他server
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

先来看注册流程:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        // 从本地注册表中获取该实例,如果实例所属的应用不存在,就创建该应用的Map
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        // 从应用Map中获取该实例的续约信息
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());

        if (existingLease != null && (existingLease.getHolder() != null)) { // 如果不为空说明是更新该实例信息
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();

            // 如果本地注册表中的lastDirtyTimestamp > 请求中的lastDirtyTimestamp,说明可能网络延迟造成了这个请求延迟到达,此时该实例信息已经更新了
            // 这样的话就用注册表中的instanceInfo替换请求中的instanceInfo
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                registrant = existingLease.getHolder();
            }
        } else {
            // 这里代表该请求中的instance是新的实例,自增expectedNumberOfClientsSendingRenews然后更新自我保护启动的阈值
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // 判断请求中的InstanceInfo(有可能是本地注册表中获取的)的overriddenStatus
        // 如果不是UNKNOWN,且该实例没有被手动触发修改状态,就将请求中的overriddenStatus写入overriddenInstanceStatusMap
        // 如果是新注册来的实例的话,因为初始化时overriddenStatus=UNKNOWN,所以不会走此处
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // 通过规则计算状态,注意这里计算状态和续约过程中有所不同,这里 instanceInfo 参数是从请求中传过来的(除非是上面代码中通过时间判断后修改为本地注册表中)
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // 如果计算后的状态为UP,就更新服务UP的时间戳为当前时间
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        // 添加到recentlyChangedQueue
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        // 修改更新时间为当前时间
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}

这里重点关注几个点:

  1. 当请求中的InstanceInfo是新注册的实例时,需要更新Server自我保护的阈值,前面我们分析了计算公式是:阈值 = expectedNumberOfClientsSendingRenews * 每分钟服务应该上报的次数 * 阈值系数(默认0.85),而且expectedNumberOfClientsSendingRenews参数默认值是1,按照此处代码中我们发现这个值比实际client的个数要大于1,这里我们理解因为每分钟服务应该上报的次数是用60/每个服务每次心跳时间间隔计算得出的,而这种计算是不精准且偏小的,所以这里是为了平衡此处计算数据问题所做的调整。
  2. 计算实例状态,前面续约时已经完整的了解了计算规则,这里有所不同是因为,参数中的instanceInfo不再是注册表中的实例信息了,而是从client端请求而来,那么来总结此处的计算规则如下:
    1. 如果请求中的实例服务状态时DOWN或者STARTING时,直接返回。
    2. 如果请求中的实例服务状态是UP、OUT_OF_SERVICE或者UNKNOWN时,需要看overriddenStatusMap中是否包含该实例,也就是说看是否修改过该实例的状态,如果有的话,以overriddenStatusMap中的状态为准。
    3. 如果请求中实例的服务状态是UP、OUT_OF_SERVICE或者UNKNOWN时,且overriddenStatusMap中不包含该实例,就看该实例状态,如果该实例状态为UP或者OUT_OF_SERVICE的话就返回。
    4. 如果以上都不是,那就指的是该实例状态被删除,重置为UNKNOWN,那就直接返回UNKNOWN。

再来看同步流程:

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            // ...
            case Register:
                node.register(info);
                break;
            // ...
        }
    }
    // ...catch exception
}
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        // ...catch exception
    }
}

ApplicationsResource

全量获取注册表

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {
    // 该请求查询的注册表是否是远程region的
    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
    String[] regions = null;
    if (!isRemoteRegionRequested) {
        EurekaMonitors.GET_ALL.increment();
    } else {
        // 通过请求参数regionsStr获得要获取的注册表相应的region
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
    }

    // 是否允许获取远程region的注册表,如果不允许就直接返回403
    if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
        return Response.status(Status.FORBIDDEN).build();
    }
    // 根据请求头中的信息获得返回格式是xml或者json
    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }

    // 构造缓存中的Key,ResponseCacheImpl.ALL_APPS指的是获取全量注册表信息
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    Response response;
    // 根据请求头中的信息确认是否压缩返回
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        response = Response.ok(responseCache.get(cacheKey)).build();
    }
    return response;
}

这里需要中点关注两点:

  1. 如何判断是否允许获取远程region:

    public boolean shouldAllowAccess(boolean remoteRegionRequired) {
        // 是否允许启动过程中从其他server中同步的数据为空,默认为true
        if (this.peerInstancesTransferEmptyOnStartup) {
            // 如果允许启动过程中从其他server中同步到的数据为空(这样的话会预留一段时间用来做数据同步),这段时间不允许client端获取注册表
            if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
                return false;
            }
        }
        // 如果请求中限制了从远程region中获取注册表
        if (remoteRegionRequired) {
            for (RemoteRegionRegistry remoteRegionRegistry : this.regionNameVSRemoteRegistry.values()) {
                // regionNameVSRemoteRegistry是当前server保存的其他region注册表,程序启动时初始化RemoteRegionRegistry会开启定时任务定期同步
                // 只有定时同步至少执行一次执行一次之后该字段才回返回true
                // 这里只要有任意一个远程region未做过同步,即返回false
                if (!remoteRegionRegistry.isReadyForServingData()) {
                    return false;
                }
            }
        }
        return true;
    }
    
  2. 如何获取注册表,这里无论是压缩还是不压缩都是调用的ResponseCacheImpl.getValue方法:

    // 这里使用的参数useReadOnlyCache是从配置文件中读取eureka.server.use-read-only-response-cache配置,默认为true
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                // 从readOnlyCacheMap中获取,如果readOnlyCacheMap中没有,
                // 就从readWriteCacheMap中获取,并且将得到的数据同步到eadOnlyCacheMap
    
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                // 未开启就直接从readWriteCacheMap中获取
                payload = readWriteCacheMap.get(key);
            }
        }
        // ...catch exception
        return payload;
    }
    

    启动类我们说到,在初始化ResponseCacheImpl时,开启了定时任务定时从readOnlyCacheMap取出数据并根据readWriteCacheMap中的数据更新。但是并没有写入readOnlyCacheMap的操作,从这里我们可以看出,getValue方法是数据第一次写入readOnlyCacheMap。这种做法就是为了队列稳定性。

这里我们看到数据从ResponseCacheImpl.readWriteCacheMap中获取,那么注册表中的数据是如何写入ResponseCacheImpl.readWriteCacheMap的呢?

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    // ...
    this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            // 此处readWriteCacheMap数据写入
                            Value value = generatePayload(key);
                            return value;
                        }
                    });
    // ...
}
private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {
                    // 下载全量注册表
                    // 这里下载注册表中一定包含本地注册表,是否包含远程注册表且包含哪些远程注册表需要根据请求参数regions判断
                    // 如果regions不为空则根据参数下载相应region注册表
                    // 如果参数为空则要根据server配置看是否下载其他远程region注册表
                    if (isRemoteRegionRequested) {
                        tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        // 根据请求参数中的region列表下载的远程region的注册表
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeAllAppsTimer.start();
                        // 下载本地注册表或者下载本地和所有远程注册表
                        // eureka.server.disable-transparent-fallback-to-other-region,默认为false
                        // 如果修改disableXXX为true,只下载本地注册表;否则下载本地和所有远程region注册表
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) { // 下载增量注册表
                    // ...
                } else {
                    tracer = serializeOneApptimer.start();
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;
            // ...
        }
        return new Value(payload);
    } finally {
        // ...stop tracer
    }
}

这里无论请求参数中是否有regions参数,都会执行AbstractInstanceRegistry.getApplicationsFromMultipleRegions方法。

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {

    boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;

    Applications apps = new Applications();
    apps.setVersion(1L);
    // 从本地注册表中获取实例并拼装成Applications
    for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
        Application app = null;

        if (entry.getValue() != null) {
            for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                if (app == null) {
                    app = new Application(lease.getHolder().getAppName());
                }
                // 将lease信息组装成instanceInfo
                app.addInstance(decorateInstanceInfo(lease));
            }
        }
        if (app != null) {
            apps.addApplication(app);
        }
    }
    // 这里如果包含远程region
    if (includeRemoteRegion) {
        // 遍历参数中的region列表
        for (String remoteRegion : remoteRegions) {
            RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
            if (null != remoteRegistry) { // 从server端保存的远程region注册表中查找
                // 从regionNameVSRemoteRegistry中取出相应region的注册表,并将该注册表添加到返回结果中
                Applications remoteApps = remoteRegistry.getApplications();
                for (Application application : remoteApps.getRegisteredApplications()) {
                    // 判断是否允许从该region中获取注册表
                    if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                        Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                        if (appInstanceTillNow == null) {
                            appInstanceTillNow = new Application(application.getName());
                            apps.addApplication(appInstanceTillNow);
                        }
                        for (InstanceInfo instanceInfo : application.getInstances()) {
                            appInstanceTillNow.addInstance(instanceInfo);
                        }
                    }
                }
            }
        }
    }
    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}

这里从远程region中获取注册表需要在server端进行相应的配置:

// 如果没有配置eureka.server.remote-region-app-whitelist或者配置的白名单列表包含remoteRegion,返回true
private boolean shouldFetchFromRemoteRegistry(String appName, String remoteRegion) {
    Set<String> whiteList = serverConfig.getRemoteRegionAppWhitelist(remoteRegion);
    if (null == whiteList) {
        whiteList = serverConfig.getRemoteRegionAppWhitelist(null); // see global whitelist.
    }
    return null == whiteList || whiteList.contains(appName);
}

增量获取注册表

@Path("delta")
@GET
public Response getContainerDifferential(
        @PathParam("version") String version,
        @HeaderParam(HEADER_ACCEPT) String acceptHeader,
        @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
        @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
        @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

    // 如果配置文件中配置`eureka.server.disable-delta=true`或者配置中的所有其他region还未同步数据完成,返回403
    if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
        return Response.status(Status.FORBIDDEN).build();
    }

    String[] regions = null;
    if (!isRemoteRegionRequested) {
        EurekaMonitors.GET_ALL_DELTA.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
        EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
    }

    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }
    // 构造缓存key
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS_DELTA,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    if (acceptEncoding != null
            && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        return Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        return Response.ok(responseCache.get(cacheKey))
                .build();
    }
}
// 这里使用的参数useReadOnlyCache是从配置文件中读取eureka.server.use-read-only-response-cache配置,默认为true
Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            // 从readOnlyCacheMap中获取,如果readOnlyCacheMap中没有,
            // 就从readWriteCacheMap中获取,并且将得到的数据同步到eadOnlyCacheMap

            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            // 未开启就直接从readWriteCacheMap中获取
            payload = readWriteCacheMap.get(key);
        }
    }
    // ...catch exception
    return payload;
}

增量下载注册表在ApplicationsResource中与全量下载大致相同,新增了一个判断是否允许增量下载注册表,剩下的流程都是:

  1. 构造缓存Key
  2. 通过Key从ResponseCacheImpl.readOnlyCacheMapResponseCacheImpl.readWriteCacheMap中获取。

其主要区别在与写入ResponseCacheImpl.readWriteCacheMap中的数据:

private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {
                    // ...
                } else if (ALL_APPS_DELTA.equals(key.getName())) {
                    // 增量下载注册表

                    if (isRemoteRegionRequested) { // 参数中包含远程region
                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else { // 增量下载本地recentlyChangedQueue,是否增量下载其他远程region的recentlyChangedQueue需要看配置
                        tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                } else {
                    tracer = serializeOneApptimer.start();
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;
            // ...
        }
        return new Value(payload);
    } finally {
        // ...close tracer
    }
}

这里可以看到,根据参数中regions是否为空个有一种实现,我们先来看如果参数中regions不为空的情况:

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    // 如果regions参数为空,则查询所有regions的注册表
    if (null == remoteRegions) {
        remoteRegions = allKnownRemoteRegions;
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;

    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        // 注意这里是写锁
        // 思考1: 为什么读操作用写锁,写操作用读锁
        // 思考2: 为什么全量下载注册表没有锁,而增量下载注册表有锁
        write.lock();
        // 这里可以看到全量下载注册表是从本地注册表中获取数据,而增量下载遍历的是recentlyChangedQueue
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        while (iter.hasNext()) {
            Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            /*
                这里其实我们可以发现,比如说我们执行下线一个client节点,这是我们下载全量注册表时是获取不到该实例的
                而在增量下载注册表时是可以获取到该实例的,获取到该实例后根据instanceInfo.actionType=CANCEL去删除
                客户端在接受全量下载注册表结果返回是进行全量替换,而增量下载返回处理是将得到的结果遍历然后处理client端本地维护的注册表信息
             */
            Application app = applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
        }

        if (includeRemoteRegion) {
            // 遍历请求中的region列表从本地维护的regionNameVSRemoteRegistry获取相应region的注册表信息
            for (String remoteRegion : remoteRegions) {
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    // server端每次向远程region server获取注册表也区分全量下载和增量下载,其逻辑和client端同步server端注册表逻辑相同
                    // 每次增量获取得到的注册表信息就缓存在remoteRegistry.applicationsDelta中,其存在形式是Applications
                    // 这里就是获取远程region server增量注册表
                    Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                    if (null != remoteAppsDelta) {
                        for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                            // 判断是否需要添加远程region server的注册表
                            // 和获取全量注册表逻辑相同,就是是否配置了白名单,如果配置了是否包含请求中的region
                            if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                // 合并本地注册表和远程region注册表信息返回
                                Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                                if (appInstanceTillNow == null) {
                                    appInstanceTillNow = new Application(application.getName());
                                    apps.addApplication(appInstanceTillNow);
                                }
                                for (InstanceInfo instanceInfo : application.getInstances()) {
                                    appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                                }
                            }
                        }
                    }
                }
            }
        }
        // 获取全量注册表,这里是为了计算hashCode,不能简单的利用recentlyChangedQueue和远程applicationsDelta进行计算
        // 因为client在做增量更新时会和这个code进行对比,如果对补上,则说明client端注册表信息错误,改成全量获取注册表
        Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

这里我们需要注意三点:

  1. 代码第一行的写锁,注意注释中的思考,后面我们会对AbstractInstanceRegisty中的读锁和写锁进行详细的分析和总结。

  2. Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();此处的applicationDelta信息是怎么来的?

    这个也要从启动类说起,在初始化instanceRegistry过程中执行了initRemoteRegionRegistry()

    // 加载配置文件信息,将远程region缓存到regionNameVSRemoteRegistry和allKnownRemoteRegions
    protected void initRemoteRegionRegistry() throws MalformedURLException {
        // 读取eureka.server.remote-region-urls-with-name
        Map<String, String> remoteRegionUrlsWithName = serverConfig.getRemoteRegionUrlsWithName();
        if (!remoteRegionUrlsWithName.isEmpty()) {
            allKnownRemoteRegions = new String[remoteRegionUrlsWithName.size()];
            int remoteRegionArrayIndex = 0;
            for (Map.Entry<String, String> remoteRegionUrlWithName : remoteRegionUrlsWithName.entrySet()) {
                // 遍历所有的远程region,进行初始化(就是开启同步定时任务)
                RemoteRegionRegistry remoteRegionRegistry = new RemoteRegionRegistry(
                        serverConfig,
                        clientConfig,
                        serverCodecs,
                        remoteRegionUrlWithName.getKey(),
                        new URL(remoteRegionUrlWithName.getValue()));
                regionNameVSRemoteRegistry.put(remoteRegionUrlWithName.getKey(), remoteRegionRegistry);
                allKnownRemoteRegions[remoteRegionArrayIndex++] = remoteRegionUrlWithName.getKey();
            }
        }
    }
    
    public RemoteRegionRegistry(EurekaServerConfig serverConfig,
                                EurekaClientConfig clientConfig,
                                ServerCodecs serverCodecs,
                                String regionName,
                                URL remoteRegionURL) {
        // ...构造同步远程region的client
    
        try {
            if (fetchRegistry()) {
                this.readyForServingData = true;
            } else {
                logger.warn("Failed to fetch remote registry. This means this eureka server is not ready for serving "
                        + "traffic.");
            }
        }
        // ...catch exception
    
        Runnable remoteRegionFetchTask = new Runnable() {
            @Override
            public void run() {
                try {
                    // 下载远程region注册表
                    if (fetchRegistry()) {
                        readyForServingData = true;
                    } else {
                        logger.warn("Failed to fetch remote registry. This means this eureka server is not "
                                + "ready for serving traffic.");
                    }
                } catch (Throwable e) {
                    logger.error(
                            "Error getting from remote registry :", e);
                }
            }
        };
    
        // ...执行定时任务,也是和client端调用server端相同,执行one-shot任务,如果任务超时,就将超时时间延长至两倍直至最大延迟时间
    }
    
    private boolean fetchRegistry() {
        // ...
        try {
            if (serverConfig.shouldDisableDeltaForRemoteRegions()
                    || (getApplications() == null)
                    || (getApplications().getRegisteredApplications().size() == 0)) {
                // 全量下载远程region注册表
                success = storeFullRegistry();
            } else {
                // 增量下载远程region注册表
                success = fetchAndStoreDelta();
            }
            logTotalInstances();
        }
        // ...
        return success;
    }
    

    先来看下载远程region的全量注册表:

    // 下载远程region全量注册表
    public boolean storeFullRegistry() {
        long currentGeneration = fetchRegistryGeneration.get();
        // 构造请求下载远程region的注册表,参数代表是否增量
        Applications apps = fetchRemoteRegistry(false);
        if (apps == null) {
            // ...返回结果为空,记录日志
        } else if (fetchRegistryGeneration.compareAndSet(currentGeneration, currentGeneration + 1)) {
            // 同时更新远程region在本地的全量注册表缓存和增量注册表缓存
            applications.set(apps);
            applicationsDelta.set(apps);
            return true;
        } else {
            // ...下载失败,记录日志
        }
        return false;
    }
    

    再来看看远程region注册表的增量下载:

    private boolean fetchAndStoreDelta() throws Throwable {
        long currGeneration = fetchRegistryGeneration.get();
        // 构造请求下载远程region的注册表,参数代表是否增量
        Applications delta = fetchRemoteRegistry(true);
    
        if (delta == null) {
            // ...返回结果为空,记录日志
        } else if (fetchRegistryGeneration.compareAndSet(currGeneration, currGeneration + 1)) {
            // 更新远程region在本地的增量注册表缓存
            this.applicationsDelta.set(delta);
        } else {
            // ...下载失败,记录日志
        }
    
        if (delta == null) {
            // 如果下载远程region增量注册表失败,则重新下载全量注册表
            return storeFullRegistry();
        } else {
            String reconcileHashCode = "";
            // 通过获取到的增量注册表信息更新本地缓存
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta); // 与client端操作类似,根据返回的InstanceInfo.actionType进行相应操作
                    reconcileHashCode = getApplications().getReconcileHashCode(); // 计算hashcode
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            }
            // 如果计算的hashcode和请求返回的hashcode不一致,代表本地的远程region注册表信息错误,重新下载全量注册表
            if ((!reconcileHashCode.equals(delta.getAppsHashCode()))) {
                return reconcileAndLogDifference(delta, reconcileHashCode);
            }
        }
    
        return delta != null;
    }
    

上面我们看完了在请求参数regions不为空的情况,现在再来看看如果请求参数regions为空时:

public Applications getApplicationDeltas() {
    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDelta().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        // 这里同样是获取写锁,不做赘述,后续完整总结锁
        write.lock();
        // 遍历本地的recentlyChangedQueue并将各个应用中的各个实例按照应用分组加入结果applicationInstancesMap
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        while (iter.hasNext()) {
            Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            Application app = applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
        }
        // 读取配置文件eureka.server.disable-transparent-fallback-to-other-region,该参数指的是是否关闭从其他region中获取数据,默认是false
        boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();

        if (!disableTransparentFallback) {
            // 获取本地全量注册表,此处参数为false代表只获取本地注册表
            Applications allAppsInLocalRegion = getApplications(false);
            // 将远程region的注册表加入到结果中
            for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                Applications applications = remoteRegistry.getApplicationDeltas();
                for (Application application : applications.getRegisteredApplications()) {
                    Application appInLocalRegistry = allAppsInLocalRegion.getRegisteredApplications(application.getName());
                    if (appInLocalRegistry == null) {
                        apps.addApplication(application);
                    }
                }
            }
        }
        // 计算结果集中的注册表的hashcode
        Applications allApps = getApplications(!disableTransparentFallback);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}
public Applications getApplications(boolean includeRemoteRegion) {
    Applications apps = new Applications();
    apps.setVersion(1L);
    // 遍历本地注册表获取全量注册表信息返回,
    for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
        Application app = null;

        if (entry.getValue() != null) {
            for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {

                Lease<InstanceInfo> lease = stringLeaseEntry.getValue();

                if (app == null) {
                    app = new Application(lease.getHolder().getAppName());
                }

                app.addInstance(decorateInstanceInfo(lease));
            }
        }
        if (app != null) {
            apps.addApplication(app);
        }
    }
    // 如果包含远程region注册表就遍历本地所有的远程region的注册表,加入到结果中返回
    if (includeRemoteRegion) {
        for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
            Applications applications = remoteRegistry.getApplications();
            for (Application application : applications.getRegisteredApplications()) {
                Application appInLocalRegistry = apps.getRegisteredApplications(application.getName());
                if (appInLocalRegistry == null) {
                    apps.addApplication(application);
                }
            }
        }
    }
    // 计算结果中所有的注册表的hashcode
    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}

全量及增量总结

从上面代码中可以看到,增量获取注册表与全量获取注册表相比

  1. 取数据的source不同,全量下载注册表是直接遍历的本地注册表;而增量获取注册表是从recentlyChangedQueue中取数据。
  2. 两者都是用了readOnlyCacheMapreadWriteCacheMap两层缓存设计结构,这种设计结构是为了集合读写的一致性。
  3. 获取增量注册表加了写锁,而获取全量注册表没有加锁,后面总结处会详细说明锁的使用。
  4. 两者都可以获取到远程region的注册表,而这里的远程region注册表实际上是server启动时,根据配置文件中的配置,开启了定时任务,定时请求配置中的远程region的注册表信息,而这里的请求也分为全量下载和增量下载,逻辑和客户端的类似,在server端的remoteRegionRegistry中缓存了配置文件中的各个region的全量注册表和增量注册表,根据是下载全量注册表或者增量注册表从相应的缓存中获取数据,添加到结果集中返回,而这里可以配置白名单,过滤掉不想输出的region。

锁的总结

从Server端接收请求处理来看,有些处理加了锁,有些没加锁,有些加了写锁,有些加了读锁。在读完所有处理之后我们可以来做下总结:

不加锁的处理有:

  • 实例续约(心跳)
  • 获取全量注册表

加读锁的处理有:

  • 修改实例状态
  • 删除overriddenStatus
  • 服务下线
  • 服务注册

加写锁的处理有:

  • 获取增量注册表

那么这里我们可能会有疑惑,为什么会有这种区分,还有就是为什么写操作加读锁,而读操作加写锁,这里我们整体来分析。

这里我们看到了有两个操作是不加锁的--续约和下载全量注册表,他们有个共同的特点,就是不操作recentlyChangedQueue,而其他操作都会读或写这个队列,而这两个操作只需要读或写本地注册表即可。而且我们知道在client多的情况下,续约操作会及其频繁,这里如果上锁会严重影响server的执行效率,所以eureka server在设计时考虑到这个问题,将续约操作设计为不操作recentlyChangedQueue而是只修改本地全量注册表。也就是说只有操作recentlyChangedQueue才需要加锁,那么为什么要加锁呢?

这里加锁的原因其实也好理解,这里是要尽可能的保证下载注册表的完整性,因为client端在稳定运行的过程中,基本上获取注册表行为都是增量下载,而在下载的过程中刚好有一些实例修改(比如服务下线),如果这里没有加锁的话,有可能增量下载的结果中就会有脏数据,这是我们不想看到的,所以这里引入了锁的概念。

如果是单纯的加锁,会严重影响这些处理的执行效率,所以eureka server在设计上引入的是读写锁,而再处理的时候其读写锁的使用也有不同,在读操作的时候加写锁,而在写操作的时候加读锁。为什么这样设计呢?这里其实也是考虑到写操作的执行效率,如果eureka server的设计是写操作加写锁的话,那对于所有的修改实例信息(注册、下线、修改状态等)都要阻塞排队执行,在设置了请求超时的前提下还有可能丢失,这对于注册中心来说是个比较大的问题,其他client端难以发现有哪些client出现问题。所以在设计上将写操作加读锁,而在读操作加写锁。

节点信息剔除

recentlyChangedQueue中数据过期

前面我们说了,在实例注册、修改状态、服务下线时都会将实例信息写入recentlyChangedQueue中,它维护了最近修改过的实例队列,那最近是指多久呢?

这里又要回顾启动类时,初始化InstanceRegistry时调用了

protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
    // ...
    // 启动定时任务,定时将过期了的实例信息从recentlyChangedQueue中清除
    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
            // 读取eureka.server.delta-retention-timer-interval-in-ms配置,默认为30秒
            serverConfig.getDeltaRetentionTimerIntervalInMs(),
            serverConfig.getDeltaRetentionTimerIntervalInMs());
}
private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {
        @Override
        public void run() {
            // 遍历recentlyChangedQueue
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                // 当实例上一次续约时间超过eureka.server.retention-time-in-m-s-in-delta-queue(默认3分钟)
                // 的配置后从recentlyChangedQueue中剔除
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }
    };
}

registry中数据过期

同样在EurekaServerAutoConfiguration中,通过@Import方式引入了EurekaServerInitializerConfiguration配置类,该类实现了SmartLifecycle接口,我们来看start方法:

public void start() {
    new Thread(() -> {
        try {
            // eureka server 初始化上下文
            eurekaServerBootstrap.contextInitialized(
                    EurekaServerInitializerConfiguration.this.servletContext);

            // ...
        }
    }).start();
}
public void contextInitialized(ServletContext context) {
    try {
        initEurekaEnvironment();
        // 这里初始化上下文
        initEurekaServerContext();
        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    // ...catch exception
}
protected void initEurekaServerContext() throws Exception {
    // ...
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    EurekaMonitors.registerAllStats();
}
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // ...
    super.postInit();
}
protected void postInit() {
    renewsLastMin.start();
    // evictionTaskRef是AtomicReference<EvictionTask>
    // 这里的意思是取出上一次的任务取消执行,然后开启新一轮的任务
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            // 读取eureka.server.eviction-interval-timer-in-ms配置,默认60秒
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

这里我们找到了剔除任务的启动,我们再来看看是如何执行的:

class EvictionTask extends TimerTask {

    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            // 执行剔除
            evict(compensationTimeMs);
        }
        // ...catch exception
    }

    // 计算补偿时间
    long getCompensationTimeMs() {
        // 记录当前时间和上一次执行时间
        long currNanos = getCurrentTimeNano();
        long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
        // 这里代表第一次执行,直接返回补偿时间为0
        if (lastNanos == 0l) {
            return 0l;
        }
        // 这里计算出本次执行时间和上一次执行时间的间隔
        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
        // 比较时间间隔和期望的时间间隔,如果大于说明本次执行因为上一次执行超时而推迟了
        // 那么本次执行的任务所覆盖的时间就应该是本应该执行的时间间隔(从配置中读取的) + 补偿时间
        long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
        return compensationTime <= 0l ? 0l : compensationTime;
    }

    long getCurrentTimeNano() {  // for testing
        return System.nanoTime();
    }

}

这里补偿时间的概念如下图:

实例过期计算中的补偿时间

接下来我们来看看是如何剔除过期实例的:

public void evict(long additionalLeaseMs) {
    // 这里是先看是否开启了自我保护,如果开启了再看是否触发了自我保护
    if (!isLeaseExpirationEnabled()) {
        // 如果程序开启且触发了自我保护,就不会在执行剔除
        return;
    }

    // 这以下就代表server未开启自我保护或者开启了自我保护但未触发

    // 遍历registry中所有实例的续约信息,并将已过期的实例聚集到expiredLeases
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                // 这里有对于实例过期的判断,在下一段代码中我们解释下
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // 计算本地注册表实例总量
    int registrySize = (int) getLocalRegistrySize();
    // 计算当前server触发自我保护时实例数量
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    // 通过两个值相减,得到evictionLimit就是指当前剔除实例的上限,server总在维护自己不触发自我保护,因为触发自我保护其实也是不再剔除
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // 每轮循环在所剩的过期实例中随机选择一个剔除
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            // 执行实例下线,和client端请求下线相同,执行本地下线之后同步给其他server
            internalCancel(appName, id, false);
        }
    }
}
public boolean isExpired(long additionalLeaseMs) {
    // evictionTimestamp代表执行过实例下线,或者之前的任务判断其已过期
    // 第二个判断就是对于时间戳的判断,当前时间大于上一次上报的时间+上报时间周期+补偿时间就判断为过期
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
正文到此结束