Eureka       

Eureka

模型

服务提供者注册自己到注册中心,服务消费者拉取服务信息到本地,然后通过服务名客户端负载均衡到对应的服务

服务提供者利用心跳续约注册中心

注册中心集群部署,数据同步

实现中间层服务器的负载平衡和故障转移

image-20191218142728451

导入到spring boot

image-20191218150622578

Eureka spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
      InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
  
  
  // 实例化InstanceRegistry extends PeerAwareInstanceRegistryImpl
  @Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}
  
  // 管理集群
  @Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs) {
		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}
}
@Configuration
public class EurekaServerInitializerConfiguration
      implements ServletContextAware, SmartLifecycle, Ordered {
     
    // org.springframework.web.context.support.ServletContextAwareProcessor#postProcessBeforeInitialization后置处理器处理ServletContextAware接口,设置servletContext
  	public void setServletContext(ServletContext servletContext) {
			this.servletContext = servletContext;
		}
  
  	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");

					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}
  
}

SmartLifecycle继承Lifecycle接口,会在spring的finishRefresh阶段,getLifecycleProcessor().onRefresh();调用lifecycle bean的start方法,从而启动eureka 服务器

EurekaServer

image-20210623133417971

controller

使用jersey框架接收http请求

com.netflix.eureka.resources.InstanceResource

数据结构

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

image-20200101123841606

L2缓存

this.readWriteCacheMap =
        CacheBuilder
  .newBuilder()
  .initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
  // 设置过期时间,默认180s
  .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                .removalListener(new RemovalListener<Key, Value>() {
                    @Override
                    public void onRemoval(RemovalNotification<Key, Value> notification) {
                        Key removedKey = notification.getKey();
                        if (removedKey.hasRegions()) {
                            Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                            regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                        }
                    }
                })
                .build(new CacheLoader<Key, Value>() {
                    @Override
                    public Value load(Key key) throws Exception {
                        if (key.hasRegions()) {
                            Key cloneWithNoRegions = key.cloneWithoutRegions();
                            regionSpecificKeys.put(cloneWithNoRegions, key);
                        }
                        Value value = generatePayload(key);
                        return value;
                    }
                });

当从LoadingCache中读取一个指定key的记录时,如果该记录不存在,则LoadingCache可以自动执行加载数据到缓存的操作

eureka注册中心的response缓存一共有3层缓存,第一层为只读缓存,第二层为读写缓存,第三层为registry本地注册表缓存。只读缓存每30s拉取读写缓存的值,读写缓存写入180s后过期,如果要获取的key没有value值时,则通过registry注册表缓存获取数据。

1.只读缓存定时更新,遍历readOnlyCacheMap,可以处理删除的情况,如果新增的数据,请求时不命中,则会到二层缓存

com.netflix.eureka.registry.ResponseCacheImpl#getCacheUpdateTask

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                            key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

缓存相关设置

配置 默认 说明
eureka.server.useReadOnlyResponseCache true Client 从 readOnlyCacheMap 更新数据,false 则跳过 readOnlyCacheMap 直接从readWriteCacheMap更新
eureka.server.responsecCacheUpdateIntervalMs 30000 readWriteCacheMap 更新至 readOnlyCacheMap 周期,默认30s
eureka.server.evictionIntervalTimerInMs 60000 清理未续约节点 (evict) 周期,默认60s
eureka.instance.leaseExpirationDurationInSeconds 90 清理未续约节点超时时间,默认90s

image-20210623145056518

服务注册

image-20200101084441474

com.netflix.eureka.resources.ApplicationResource#addInstance

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, boolean)

public void register(final InstanceInfo info, final boolean isReplication) {
   handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
   super.register(info, isReplication);
}
private void handleRegistration(InstanceInfo info, int leaseDuration,
      boolean isReplication) {
   log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
         + ", leaseDuration " + leaseDuration + ", isReplication "
         + isReplication);
   publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
         isReplication));
}

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        // 获取instance信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        // 续约,设置当前时间和leaseDuration(默认90s)
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 更新registry
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
      	// 清空readWriteCacheMap二级缓存
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}
/**
 * Replicates all eureka actions to peer eureka nodes except for replication
 * traffic to this node.
 *
 */
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
      	// 如果当前处理的是同步请求,不需要把信息同步给同伴
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 任务分发器
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

服务续约

image-20200101122014210

image-20200101135629127

com.netflix.eureka.resources.InstanceResource#renewLease

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#renew

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
      	// 心跳数据同步
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

com.netflix.eureka.registry.AbstractInstanceRegistry#renew

/**
 * Marks the given instance of the given app name as renewed, and also marks whether it originated from
 * replication.
 *
 * @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
 */
public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        renewsLastMin.increment();
        leaseToRenew.renew();
        return true;
    }
}
public void renew() {
  	// 更新lastUpdateTimestamp,续约到当前时间+duration
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

服务注销

image-20200101135738676

image-20200101141457850

com.netflix.eureka.resources.InstanceResource#cancelLease

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#cancel

public boolean cancel(String appName, String serverId, boolean isReplication) {
   handleCancelation(appName, serverId, isReplication);
   return super.cancel(appName, serverId, isReplication);
}

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#cancel

public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
        return true;
    }
    return false;
}

com.netflix.eureka.registry.AbstractInstanceRegistry#cancel

public boolean cancel(String appName, String id, boolean isReplication) {
    return internalCancel(appName, id, isReplication);
}

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#internalCancel

protected boolean internalCancel(String appName, String id, boolean isReplication) {
   handleCancelation(appName, id, isReplication);
   return super.internalCancel(appName, id, isReplication);
}
/**
 * {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
 * cancel request is replicated to the peers. This is however not desired for expires which would be counted
 * in the remote peers as valid cancellations, so self preservation mode would not kick-in.
 */
protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        read.lock();
        CANCEL.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
          	// 从map中移除
            leaseToCancel = gMap.remove(id);
        }
        synchronized (recentCanceledQueue) {
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        }
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
	          // 清空readWriteCacheMap二级缓存
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        read.unlock();
    }
}


		com.netflix.eureka.lease.Lease#cancel
     /**
     * Cancels the lease by updating the eviction time.
     */
    public void cancel() {
        if (evictionTimestamp <= 0) {
            evictionTimestamp = System.currentTimeMillis();
        }
    }

获取服务

image-20200101142413986

image-20200101142436762

com.netflix.eureka.resources.ApplicationsResource#getContainers

if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    response = Response.ok(responseCache.getGZIP(cacheKey))
            .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
            .header(HEADER_CONTENT_TYPE, returnMediaType)
            .build();
} else {
    response = Response.ok(responseCache.get(cacheKey))
            .build();
}
return response;

com.netflix.eureka.registry.ResponseCacheImpl#getGZIP

com.netflix.eureka.registry.ResponseCacheImpl#getValue

Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            // 先从readOnlyCacheMap里拿
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
              	// 再从readWriteCacheMap里拿,如果没有从registry里拿
                payload = readWriteCacheMap.get(key);
              	// 更新readOnlyCacheMap
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key : {}", key, t);
    }
    return payload;
}

失效服务剔除

image-20200101203717355

image-20200103112327943

Eviction(失效服务剔除)用来定期(默认为每60秒)在Eureka Server检测失效的服务,检测标准就是超过一定时间没有Renew的服务。

默认失效时间为90秒,也就是如果有服务超过90秒没有向Eureka Server发起Renew请求的话,就会被当做失效服务剔除掉。

失效时间可以通过eureka.instance.leaseExpirationDurationInSeconds进行配置,定期扫描时间可以通过eureka.server.evictionIntervalTimerInMs进行配置。

com.netflix.eureka.EurekaBootStrap#contextInitialized

com.netflix.eureka.EurekaBootStrap#initEurekaServerContext

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic

com.netflix.eureka.registry.AbstractInstanceRegistry#postInit

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}


   /* visible for testing */ class EvictionTask extends TimerTask {

        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
     
         @Override
    public void evict() {
        evict(0l);
    }

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
          	// 遍历app
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                  	// 遍历实例
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                      	// System.currentTimeMillis() > lastUpdateTimestamp + duration 
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
      	// 自我保护措施
        int registrySize = (int) getLocalRegistrySize();
      	// 服务数 * 续约百分比阈值85%
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
      	// 剔除服务数量限制,服务数 - 服务数 * 续约百分比阈值85%
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
              	// 洗牌算法
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
              	// 同上注销服务逻辑
                internalCancel(appName, id, false);
            }
        }
    }

com.netflix.eureka.lease.Lease#isExpired(long)

public boolean isExpired(long additionalLeaseMs) {
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}

服务同步

在前面的Register、Renew、Cancel接口实现中,我们看到了都会有replicateToPeers操作,这个就是用来做Peer之间的状态同步

接收到Service Provider请求的Eureka Server,把请求再次转发到其它的Eureka Server,调用同样的接口,传入同样的参数,除了会在header中标记isReplication=true,从而避免重复的replicate

server启动

image-20200103115540448

image-20200103115849637

com.netflix.eureka.EurekaBootStrap#contextInitialized

com.netflix.eureka.EurekaBootStrap#initEurekaServerContext

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp

public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;

  	// 重试
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
              	// 睡眠
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // 从其他节点,获取所有app
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
          	// 遍历app
            for (InstanceInfo instance : app.getInstances()) {
              	// 遍历app的实例
                try {
                  	// 检查实例和server是否在同一个region
                    if (isRegisterable(instance)) {
                      	// 服务注册到本地,同上服务注册,isReplication=true,不需要于其他节点同步
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

运行中同步(即上述注册、续约、注销等需要同步的服务)

image-20200103134950958

public void cancel(final String appName, final String id) throws Exception {
    long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
    batchingDispatcher.process(
            taskId("cancel", appName, id),
            new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                @Override
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.cancel(appName, id);
                }

                @Override
                public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                    super.handleFailure(statusCode, responseEntity);
                    if (statusCode == 404) {
                        logger.warn("{}: missing entry.", getTaskName());
                    }
                }
            },
            expiryTime
    );
}
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

先把任务包装成TaskHolder,再添加到acceptorQueue

class AcceptorExecutor<ID, T> {
  private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
  
  void process(ID id, T task, long expiryTime) {
        acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
        acceptedTasks++;
  }
  
  class AcceptorRunner implements Runnable {
    @Override
    public void run() {
      long scheduleTime = 0;
      while (!isShutdown.get()) {
        try {
          drainInputQueues();

          int totalItems = processingOrder.size();

          long now = System.currentTimeMillis();
          if (scheduleTime < now) {
            scheduleTime = now + trafficShaper.transmissionDelay();
          }
          if (scheduleTime <= now) {
            // 将processingOrder里maxBatchingSize个任务放到List<TaskHolder<ID, T>> holders里,然后放到batchWorkQueue
            assignBatchWork();
            // 将processingOrder剩下的任务,放到singleItemWorkQueue里
            assignSingleItemWork();
          }

          // If no worker is requesting data or there is a delay injected by the traffic shaper,
          // sleep for some time to avoid tight loop.
          if (totalItems == processingOrder.size()) {
            Thread.sleep(10);
          }
        } catch (InterruptedException ex) {
          // Ignore
        } catch (Throwable e) {
          // Safe-guard, so we never exit this loop in an uncontrolled way.
          logger.warn("Discovery AcceptorThread error", e);
        }
      }
    }
}

实例化PeerEurekaNode的时候,会启动两个分配器,作用是将任务插入acceptorQueue,并启动AcceptorExecutor(单线程,作用是将acceptorQueue队列的任务打包到batchWorkQueue和不打包到singleItemWorkQueue队列),最后启动任务执行线程池(TaskExecutors.singleItemExecutors,TaskExecutors.batchExecutors两个线程池,队列分别是singleItemWorkQueue、batchWorkQueue)

this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
        batcherName,
        config.getMaxElementsInPeerReplicationPool(),
        batchSize,
        config.getMaxThreadsForPeerReplication(),
        maxBatchingDelayMs,
        serverUnavailableSleepTimeMs,
        retrySleepTimeMs,
        taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
        targetHost,
        config.getMaxElementsInStatusReplicationPool(),
        config.getMaxThreadsForStatusReplication(),
        maxBatchingDelayMs,
        serverUnavailableSleepTimeMs,
        retrySleepTimeMs,
        taskProcessor
);

com.netflix.eureka.util.batcher.TaskDispatchers

public class TaskDispatchers {

    public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
                                                                                int maxBufferSize,
                                                                                int workerCount,
                                                                                long maxBatchingDelay,
                                                                                long congestionRetryDelayMs,
                                                                                long networkFailureRetryMs,
                                                                                TaskProcessor<T> taskProcessor) {
      	// 创建AcceptorExecutor执行器,单线程,执行AcceptorRunner任务
        final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
        );
        final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
        return new TaskDispatcher<ID, T>() {
            @Override
            public void process(ID id, T task, long expiryTime) {
                acceptorExecutor.process(id, task, expiryTime);
            }

            @Override
            public void shutdown() {
                acceptorExecutor.shutdown();
                taskExecutor.shutdown();
            }
        };
    }

    public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                             int maxBufferSize,
                                                                             int workloadSize,
                                                                             int workerCount,
                                                                             long maxBatchingDelay,
                                                                             long congestionRetryDelayMs,
                                                                             long networkFailureRetryMs,
                                                                             TaskProcessor<T> taskProcessor) {
        final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
        );
        final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
        return new TaskDispatcher<ID, T>() {
            @Override
            public void process(ID id, T task, long expiryTime) {
                acceptorExecutor.process(id, task, expiryTime);
            }

            @Override
            public void shutdown() {
                acceptorExecutor.shutdown();
                taskExecutor.shutdown();
            }
        };
    }
}

批量任务

com.netflix.eureka.util.batcher.TaskExecutors.BatchWorkerRunnable#run

public void run() {
    try {
        while (!isShutdown.get()) {
            List<TaskHolder<ID, T>> holders = getWork();
            metrics.registerExpiryTimes(holders);

            List<T> tasks = getTasksOf(holders);
            ProcessingResult result = processor.process(tasks);
            switch (result) {
                case Success:
                    break;
                case Congestion:
                case TransientError:
                    taskDispatcher.reprocess(holders, result);
                    break;
                case PermanentError:
                    logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
            }
            metrics.registerTaskResult(result, tasks.size());
        }
    } catch (InterruptedException e) {
        // Ignore
    } catch (Throwable e) {
        // Safe-guard, so we never exit this loop in an uncontrolled way.
        logger.warn("Discovery WorkerThread error", e);
    }
}
private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
    // return batchWorkQueue;
    BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
    List<TaskHolder<ID, T>> result;
    do {
        result = workQueue.poll(1, TimeUnit.SECONDS);
    } while (!isShutdown.get() && result == null);
    return (result == null) ? new ArrayList<>() : result;
}

单个任务

com.netflix.eureka.util.batcher.TaskExecutors.SingleTaskWorkerRunnable

    public void run() {
        try {
            while (!isShutdown.get()) {
                BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem();
                TaskHolder<ID, T> taskHolder;
                while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) {
                    if (isShutdown.get()) {
                        return;
                    }
                }
                metrics.registerExpiryTime(taskHolder);
                if (taskHolder != null) {
                    ProcessingResult result = processor.process(taskHolder.getTask());
                    switch (result) {
                        case Success:
                            break;
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(taskHolder, result);
                            break;
                        case PermanentError:
                            logger.warn("Discarding a task of {} due to permanent error", workerName);
                    }
                    metrics.registerTaskResult(result, 1);
                }
            }
        } catch (InterruptedException e) {
            // Ignore
        } catch (Throwable e) {
            // Safe-guard, so we never exit this loop in an uncontrolled way.
            logger.warn("Discovery WorkerThread error", e);
        }
    }
}

Eureka Server启动3个定时任务,

第一个是集群间的数据同步

每10分钟做一次集群数据同步,updatePeerEurekaNodes(resolvePeerUrls)

com.netflix.eureka.DefaultEurekaServerContext#initialize

com.netflix.eureka.cluster.PeerEurekaNodes#start

public void start() {
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        // 定时同步任务
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
}

第二个是更新心跳阈值

scheduleRenewalThresholdUpdateTask()这个方法,每15分钟执行一次。重置心跳阀值是Eureka Server自我保护机制的内容。

com.netflix.eureka.DefaultEurekaServerContext#initialize

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#init

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask

private void scheduleRenewalThresholdUpdateTask() {
    timer.schedule(new TimerTask() {
                       @Override
                       public void run() {
                           updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
}

第三个是定期清理失效节点

同上失效服务剔除

三级缓存

image-20191230170320763

集群信息初始化

PeerEurekaNodes的start方法,在DefaultEurekaServerContext的初始化代码中被调用。它会解析配置文件中配置的其他eureka server的地址,基于URL地址构造一个一个的PeerEurekaNode,然后将其他节点的信息保存到本地。默认是每隔10分钟,会定时基于这个配置刷新集群配置信息。

集群数据同步

在初始化集群的节点信息后,还需要同步其他节点的注册表到本地。也就是EurekaBootstrap初始化代码里的registry.syncUp();,因为自己本来也是一个eureka client,[所以在启动初始化的时候,就已经从任意一个其他的eureka server节点拉取到注册表在本地 (eureka client第一次启动的时候,会从eureka server端抓取全量的注册表,在本地进行缓存。后续每隔30秒从eureka server端抓取增量的注册表信息,和本地缓存进行合并),在这里只需要将缓存的实例信息取出来,然后在挨个本地注册一次。

for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
  if (i > 0) {
    try {
      // 如果第一次没有在本地的eureka client 中获取任何注册表
      // 那么就等待30秒,看DiscoveryClient.fetchRegistry是否已经执行完成。
      Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
    } catch (InterruptedException e) {
      logger.warn("Interrupted during registry transfer..");
      break;
    }
  }
  // 这是读取的eureka client在本地初始化的时候拉取的全量注册表缓存。
  Applications apps = eurekaClient.getApplications();

那么在源码里我们可以看到,这里有一个重试的机制,里面还包含了sleep的代码,其实就是因为eureka client在本地的缓存可能还没有生成成功,就先执行了这里的代码,所以会等到30秒后再重试,看是否缓存已经有了数据。

注册、下线、故障集群间同步

eureka server处理注册请求的代码,在ApplicationResource.addInstance方法中,调用PeerAwareInstanceRegistryImpl.register方法进行注册,那么replicateToPeers方法,就会将注册数据同步到其他节点,此时这里的isReplication参数是false。

public void register(final InstanceInfo info, final boolean isReplication) {
  int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
    leaseDuration = info.getLeaseInfo().getDurationInSecs();
  }
  super.register(info, leaseDuration, isReplication);
  // 将注册信息同步到其他集群节点
  replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

跟到这个方法里面看一下,它会循环所有配置的集群节点信息,并排除自己。然后带上服务实例的注册信息,分别调用其他节点的注册接口,但是这里和client调用是有区别的,他底层的调用发起类是JerseyReplicationClient,在发起http请求的时候,一定会带上请求头:webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");,那么其他节点在收到请求的时候,上面的isReplication就是肯定是true了。当isReplication是true的时候,不会像其他节点进行注册。

switch (action) {
  case Cancel:
    node.cancel(appName, id);
    break;
  case Heartbeat:
    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
    break;
  case Register:
    node.register(info);
    break;
  case StatusUpdate:
    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
    break;
  case DeleteStatusOverride:
    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
    node.deleteStatusOverride(appName, id, infoFromRegistry);
    break;
}
  1. 集群同步的机制:client可以找任意一个点发送请求,然后这个server会将请求同步到其他所有的节点上去,但是其他的server仅仅只会在本地执行,不会再往其他节点同步。
  2. 数据同步的异步批处理机制:有三个队列,第一个队列纯写入(acceptorQueue),第二个队列用来根据时间和大小来拆分队列(processingOrder),第三个队列用来放批处理任务(batchWorkQueue)—>任务批处理机制。

image-20210623132239252

全量更新

eureka client第一次启动的时候,会从eureka server端抓取全量的注册表,在本地进行缓存。后续每隔30秒从eureka server端抓取增量的注册表信息,和本地缓存进行合并。

增量更新

在eureka client初始化的时候,会全量的抓取一次注册表,然后在initScheduledTasks里启动了一个定时任务,每隔30秒会定时增量同步一次注册表的信息,具体的执行类叫做CacheRefreshThread

  1. 定时任务,30秒一次
  2. 因为本地有缓存的Applications,所以走增量抓取的逻辑
  3. 走eurekaHttpClient的getDelta接口,GET apps/delta
  4. ApplicationsResources.getContainerDifferential处理增量请求
  5. 一样是走多级缓存机制,key是ALL_APPS_DELTA。后面就是一样了,唯一的区别就是因为key不一样,所以在generatePayload方法里,执行的逻辑不一样,这里不再用registry.getApplications()获取全量数据,而是用registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));获取增量的注册表信息。
  6. 这儿有个recentlyChangedQueue,在状态变化的时候会往里边放数据,代表着最近有变化的服务实例,增量信息就是从这里边去抓取。在Registry初始化的时候有个定时任务,每隔30秒检查一次,这个队列里面的服务实例变更信息是否已经超过180秒了,如果超过会移除。所以这个队列里保留的其实是最近3分钟的服务实例变更数据。
  7. eureka client每隔30秒,去抓取增量注册表的时候,会拿到最近3分钟内有变化的服务实例的注册表。
  8. 抓取到的注册表和本地缓存的注册表进行合并,完成服务实例的增删改。updateDelta(delta);
  9. 对合并后的注册表计算一个hash值,之前返回的delta带了一个eureka server全量注册表的hash值。对这2个值进行对比,如果不一致,此时会从eureka server抓取全量的注册表到本地。

eureka提供了delta参数,在client端及server端都有。client端主要是控制刷新registry的时候,是否使用调用/apps/delta接口,然后根据返回数据的ActionType来作用于本地数据。而server端则提供/apps/delta接口,它的主要逻辑是在registry的修改操作都会放recentlyChangedQueue存放RecentlyChangedItem事件,然后有个定时任务去剔除距离上次更新时间超过指定阈值的item;而查询接口则是从recentlyChangedQueue获取数据然后返回。

image-20220105172044297

EurekaClient

image-20191230170320763

初始化

com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider)

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
        this.preRegistrationHandler = args.preRegistrationHandler;
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
        this.preRegistrationHandler = null;
    }
    
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    this.backupRegistryProvider = backupRegistryProvider;

    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());

        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

      	// 心跳线程
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
				
      	// 更新缓存线程
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        // 准备endpoint server,采用jersy服务器
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}

初始化所有任务,3个定时任务

/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
      // 定时任务1:TimedSupervisorTask任务中用线程池执行任务CacheRefreshThread;获取注册信息,registryFetchIntervalSeconds秒获取一次  
      scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        // 定时任务2:心跳
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        // 同步节点副本
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

       
 //  定时任务3:同步副本     
  instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

任务一:更新本地服务列表

com.netflix.discovery.DiscoveryClient#refreshRegistry

void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }

        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        if (logger.isDebugEnabled()) {
            StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode: ");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions? ");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                allAppsHashCodes.append(", Remote region: ");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(" , apps hashcode: ");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                    allAppsHashCodes);
        }
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}

任务二:心跳,即续约

leaseRenewalIntervalInSeconds 30 每30秒续约一次

leaseExpirationDurationInSeconds 90 90秒内没续约,则判定下线

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient#execute

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    List<EurekaEndpoint> candidateHosts = null;
    int endpointIdx = 0;
    for (int retry = 0; retry < numberOfRetries; retry++) {
        EurekaHttpClient currentHttpClient = delegate.get();
        EurekaEndpoint currentEndpoint = null;
        if (currentHttpClient == null) {
            if (candidateHosts == null) {
                candidateHosts = getHostCandidates();
                if (candidateHosts.isEmpty()) {
                    throw new TransportException("There is no known eureka server; cluster server list is empty");
                }
            }
            if (endpointIdx >= candidateHosts.size()) {
                throw new TransportException("Cannot execute request on any known server");
            }
						// 按顺序取serviceUrl
            currentEndpoint = candidateHosts.get(endpointIdx++);
            currentHttpClient = clientFactory.newClient(currentEndpoint);
        }

        try {
            EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
            if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                delegate.set(currentHttpClient);
                if (retry > 0) {
                    logger.info("Request execution succeeded on retry #{}", retry);
                }
                return response;
            }
            logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
        } catch (Exception e) {
            logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
        }

        // Connection error or 5xx from the server that must be retried on another server
        delegate.compareAndSet(currentHttpClient, null);
        if (currentEndpoint != null) {
            quarantineSet.add(currentEndpoint);
        }
    }
    throw new TransportException("Retry limit reached; giving up on completing the request");
}

任务三:检测本地实例,如果有更新,重新注册

com.netflix.discovery.InstanceInfoReplicator#run

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
          	// 本地实例有修改,重新注册
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
      	// 继续定时执行
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}
void refreshInstanceInfo() {
    applicationInfoManager.refreshDataCenterInfoIfRequired();
    applicationInfoManager.refreshLeaseInfoIfRequired();

    InstanceStatus status;
    try {
        status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
    } catch (Exception e) {
        logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
        status = InstanceStatus.DOWN;
    }

    if (null != status) {
        applicationInfoManager.setInstanceStatus(status);
    }
}

服务注册

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register

public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

image-20200104205539707

服务续约

image-20200104210906996

服务下线

image-20200104210945441

获取服务列表

Service Consumer在启动时会从Eureka Server获取所有服务列表,并在本地缓存。需要注意的是,需要确保配置eureka.client.shouldFetchRegistry=true

image-20200104211239994

更新服务列表

由于在本地有一份缓存,所以需要定期更新,定期更新频率可以通过eureka.client.registryFetchIntervalSeconds配置。

image-20200104211326747

  1. Eureka client从注册中心更新服务列表,然后自身会做缓存;
  2. 作为服务消费者,就是从这些缓存信息中获取的服务提供者的信息;
  3. 增量更新的服务以30秒为周期循环调用;
  4. 增量更新数据在服务端保存时间为3分钟,因此Eureka client取得的数据虽然被称为”增量更新”,仍然可能和30秒前取的数据一样,所以Eureka client要自己来处理重复信息;
  5. 由3、4两点可以推断出,Eureka client的增量更新,其实获取的是Eureka server最近三分钟内的变更,因此,如果Eureka client有超过三分钟没有做增量更新的话(例如网络问题),那么再调用增量更新接口时,那三分钟内Eureka server的变更就可能获取不到了,这就造成了Eureka server和Eureka client之间的数据不一致,需要有个方案来及时发现这个问题;
  6. 正常情况下,Eureka client多次增量更新后,最终的服务列表数据应该Eureka server保持一致,但如果期间发生异常,可能导致和Eureka server的数据不一致,为了暴露这个问题,Eureka server每次返回的增量更新数据中,会带有一致性哈希码,Eureka client用本地服务列表数据算出的一致性哈希码应该和Eureka server返回的一致,若不一致就证明增量更新出了问题导致Eureka client
  7. Eureka server上的服务列表信息不一致了,此时需要全量更新;
  8. Eureka server上的服务列表信息对外提供JSON/XML两种格式下载; Eureka client使用jersey的SDK,去下载JSON格式的服务列表信息;
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        //用Stopwatch做耗时分析
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // 取出本地缓存的,之气获取的服务列表信息
            Applications applications = getApplications();

            //判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新:
            //1. 是否禁用增量更新;
            //2. 是否对某个region特别关注;
            //3. 外部调用时是否通过入参指定全量更新;
            //4. 本地还未缓存有效的服务列表信息;
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
            	//这些详细的日志可以看出触发全量更新的原因
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                //全量更新
                getAndStoreFullRegistry();
            } else {
                //增量更新
                getAndUpdateDelta(applications);
            }
            //重新计算和设置一致性hash码
            applications.setAppsHashCode(applications.getReconcileHashCode());
            //日志打印所有应用的所有实例数之和
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        //将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写
        onCacheRefreshed();

        //检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态,
        //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态,
        //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件
        updateInstanceRemoteStatus();

        return true;
    }

参考

https://juejin.im/post/5cf5c3aef265da1b7a4b5f97

https://www.jianshu.com/p/3a3a1a5891ec

https://www.infoq.cn/article/jlDJQ*3wtN2PcqTDyokh

https://zhuanlan.zhihu.com/p/24829766

https://www.jianshu.com/p/e112c134f966

http://www.saily.top/2020/03/28/springcloud/eureka05/

https://blog.csdn.net/boling_cavalry/article/details/82813180