服务提供者注册自己到注册中心,服务消费者拉取服务信息到本地,然后通过服务名客户端负载均衡到对应的服务
服务提供者利用心跳续约注册中心
注册中心集群部署,数据同步
实现中间层服务器的负载平衡和故障转移
Eureka spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
// 实例化InstanceRegistry extends PeerAwareInstanceRegistryImpl
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
// 管理集群
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
}
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
// org.springframework.web.context.support.ServletContextAwareProcessor#postProcessBeforeInitialization后置处理器处理ServletContextAware接口,设置servletContext
public void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
}
SmartLifecycle继承Lifecycle接口,会在spring的finishRefresh阶段,getLifecycleProcessor().onRefresh();调用lifecycle bean的start方法,从而启动eureka 服务器
使用jersey框架接收http请求
com.netflix.eureka.resources.InstanceResource
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
L2缓存
this.readWriteCacheMap =
CacheBuilder
.newBuilder()
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
// 设置过期时间,默认180s
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
当从LoadingCache中读取一个指定key的记录时,如果该记录不存在,则LoadingCache可以自动执行加载数据到缓存的操作
eureka注册中心的response缓存一共有3层缓存,第一层为只读缓存,第二层为读写缓存,第三层为registry本地注册表缓存。只读缓存每30s拉取读写缓存的值,读写缓存写入180s后过期,如果要获取的key没有value值时,则通过registry注册表缓存获取数据。
1.只读缓存定时更新,遍历readOnlyCacheMap,可以处理删除的情况,如果新增的数据,请求时不命中,则会到二层缓存
com.netflix.eureka.registry.ResponseCacheImpl#getCacheUpdateTask
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
配置 | 默认 | 说明 |
---|---|---|
eureka.server.useReadOnlyResponseCache |
true | Client 从 readOnlyCacheMap 更新数据,false 则跳过 readOnlyCacheMap 直接从readWriteCacheMap更新 |
eureka.server.responsecCacheUpdateIntervalMs |
30000 | readWriteCacheMap 更新至 readOnlyCacheMap 周期,默认30s |
eureka.server.evictionIntervalTimerInMs |
60000 | 清理未续约节点 (evict) 周期,默认60s |
eureka.instance.leaseExpirationDurationInSeconds |
90 | 清理未续约节点超时时间,默认90s |
com.netflix.eureka.resources.ApplicationResource#addInstance
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, boolean)
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
}
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 获取instance信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 续约,设置当前时间和leaseDuration(默认90s)
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 更新registry
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清空readWriteCacheMap二级缓存
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 如果当前处理的是同步请求,不需要把信息同步给同伴
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 任务分发器
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
com.netflix.eureka.resources.InstanceResource#renewLease
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#renew
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
// 心跳数据同步
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
com.netflix.eureka.registry.AbstractInstanceRegistry#renew
/**
* Marks the given instance of the given app name as renewed, and also marks whether it originated from
* replication.
*
* @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
*/
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
public void renew() {
// 更新lastUpdateTimestamp,续约到当前时间+duration
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
com.netflix.eureka.resources.InstanceResource#cancelLease
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#cancel
public boolean cancel(String appName, String serverId, boolean isReplication) {
handleCancelation(appName, serverId, isReplication);
return super.cancel(appName, serverId, isReplication);
}
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#cancel
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}
return false;
}
com.netflix.eureka.registry.AbstractInstanceRegistry#cancel
public boolean cancel(String appName, String id, boolean isReplication) {
return internalCancel(appName, id, isReplication);
}
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#internalCancel
protected boolean internalCancel(String appName, String id, boolean isReplication) {
handleCancelation(appName, id, isReplication);
return super.internalCancel(appName, id, isReplication);
}
/**
* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
* cancel request is replicated to the peers. This is however not desired for expires which would be counted
* in the remote peers as valid cancellations, so self preservation mode would not kick-in.
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 从map中移除
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 清空readWriteCacheMap二级缓存
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
com.netflix.eureka.lease.Lease#cancel
/**
* Cancels the lease by updating the eviction time.
*/
public void cancel() {
if (evictionTimestamp <= 0) {
evictionTimestamp = System.currentTimeMillis();
}
}
com.netflix.eureka.resources.ApplicationsResource#getContainers
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
com.netflix.eureka.registry.ResponseCacheImpl#getGZIP
com.netflix.eureka.registry.ResponseCacheImpl#getValue
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
// 先从readOnlyCacheMap里拿
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
// 再从readWriteCacheMap里拿,如果没有从registry里拿
payload = readWriteCacheMap.get(key);
// 更新readOnlyCacheMap
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
Eviction(失效服务剔除)用来定期(默认为每60秒)在Eureka Server检测失效的服务,检测标准就是超过一定时间没有Renew的服务。
默认失效时间为90秒,也就是如果有服务超过90秒没有向Eureka Server发起Renew请求的话,就会被当做失效服务剔除掉。
失效时间可以通过eureka.instance.leaseExpirationDurationInSeconds进行配置,定期扫描时间可以通过eureka.server.evictionIntervalTimerInMs进行配置。
com.netflix.eureka.EurekaBootStrap#contextInitialized
com.netflix.eureka.EurekaBootStrap#initEurekaServerContext
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
com.netflix.eureka.registry.AbstractInstanceRegistry#postInit
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
/* visible for testing */ class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
@Override
public void evict() {
evict(0l);
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
// 遍历app
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
// 遍历实例
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
// System.currentTimeMillis() > lastUpdateTimestamp + duration
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
// 自我保护措施
int registrySize = (int) getLocalRegistrySize();
// 服务数 * 续约百分比阈值85%
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
// 剔除服务数量限制,服务数 - 服务数 * 续约百分比阈值85%
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
// 洗牌算法
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 同上注销服务逻辑
internalCancel(appName, id, false);
}
}
}
com.netflix.eureka.lease.Lease#isExpired(long)
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
在前面的Register、Renew、Cancel接口实现中,我们看到了都会有replicateToPeers操作,这个就是用来做Peer之间的状态同步
接收到Service Provider请求的Eureka Server,把请求再次转发到其它的Eureka Server,调用同样的接口,传入同样的参数,除了会在header中标记isReplication=true,从而避免重复的replicate
com.netflix.eureka.EurekaBootStrap#contextInitialized
com.netflix.eureka.EurekaBootStrap#initEurekaServerContext
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
// 重试
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 睡眠
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 从其他节点,获取所有app
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
// 遍历app
for (InstanceInfo instance : app.getInstances()) {
// 遍历app的实例
try {
// 检查实例和server是否在同一个region
if (isRegisterable(instance)) {
// 服务注册到本地,同上服务注册,isReplication=true,不需要于其他节点同步
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
public void cancel(final String appName, final String id) throws Exception {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("cancel", appName, id),
new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.cancel(appName, id);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
}
}
},
expiryTime
);
}
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
先把任务包装成TaskHolder,再添加到acceptorQueue
class AcceptorExecutor<ID, T> {
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
void process(ID id, T task, long expiryTime) {
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
// 将processingOrder里maxBatchingSize个任务放到List<TaskHolder<ID, T>> holders里,然后放到batchWorkQueue
assignBatchWork();
// 将processingOrder剩下的任务,放到singleItemWorkQueue里
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
}
实例化PeerEurekaNode的时候,会启动两个分配器,作用是将任务插入acceptorQueue,并启动AcceptorExecutor(单线程,作用是将acceptorQueue队列的任务打包到batchWorkQueue和不打包到singleItemWorkQueue队列),最后启动任务执行线程池(TaskExecutors.singleItemExecutors,TaskExecutors.batchExecutors两个线程池,队列分别是singleItemWorkQueue、batchWorkQueue)
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
com.netflix.eureka.util.batcher.TaskDispatchers
public class TaskDispatchers {
public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
int maxBufferSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
// 创建AcceptorExecutor执行器,单线程,执行AcceptorRunner任务
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
int maxBufferSize,
int workloadSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
}
批量任务
com.netflix.eureka.util.batcher.TaskExecutors.BatchWorkerRunnable#run
public void run() {
try {
while (!isShutdown.get()) {
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
// return batchWorkQueue;
BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
List<TaskHolder<ID, T>> result;
do {
result = workQueue.poll(1, TimeUnit.SECONDS);
} while (!isShutdown.get() && result == null);
return (result == null) ? new ArrayList<>() : result;
}
单个任务
com.netflix.eureka.util.batcher.TaskExecutors.SingleTaskWorkerRunnable
public void run() {
try {
while (!isShutdown.get()) {
BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem();
TaskHolder<ID, T> taskHolder;
while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) {
if (isShutdown.get()) {
return;
}
}
metrics.registerExpiryTime(taskHolder);
if (taskHolder != null) {
ProcessingResult result = processor.process(taskHolder.getTask());
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(taskHolder, result);
break;
case PermanentError:
logger.warn("Discarding a task of {} due to permanent error", workerName);
}
metrics.registerTaskResult(result, 1);
}
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
}
每10分钟做一次集群数据同步,updatePeerEurekaNodes(resolvePeerUrls)
com.netflix.eureka.DefaultEurekaServerContext#initialize
com.netflix.eureka.cluster.PeerEurekaNodes#start
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 定时同步任务
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
scheduleRenewalThresholdUpdateTask()这个方法,每15分钟执行一次。重置心跳阀值是Eureka Server自我保护机制的内容。
com.netflix.eureka.DefaultEurekaServerContext#initialize
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#init
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
同上失效服务剔除
PeerEurekaNodes的start方法,在DefaultEurekaServerContext的初始化代码中被调用。它会解析配置文件中配置的其他eureka server的地址,基于URL地址构造一个一个的PeerEurekaNode,然后将其他节点的信息保存到本地。默认是每隔10分钟,会定时基于这个配置刷新集群配置信息。
在初始化集群的节点信息后,还需要同步其他节点的注册表到本地。也就是EurekaBootstrap初始化代码里的registry.syncUp();,因为自己本来也是一个eureka client,[所以在启动初始化的时候,就已经从任意一个其他的eureka server节点拉取到注册表在本地 (eureka client第一次启动的时候,会从eureka server端抓取全量的注册表,在本地进行缓存。后续每隔30秒从eureka server端抓取增量的注册表信息,和本地缓存进行合并),在这里只需要将缓存的实例信息取出来,然后在挨个本地注册一次。
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 如果第一次没有在本地的eureka client 中获取任何注册表
// 那么就等待30秒,看DiscoveryClient.fetchRegistry是否已经执行完成。
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 这是读取的eureka client在本地初始化的时候拉取的全量注册表缓存。
Applications apps = eurekaClient.getApplications();
那么在源码里我们可以看到,这里有一个重试的机制,里面还包含了sleep的代码,其实就是因为eureka client在本地的缓存可能还没有生成成功,就先执行了这里的代码,所以会等到30秒后再重试,看是否缓存已经有了数据。
eureka server处理注册请求的代码,在ApplicationResource.addInstance方法中,调用PeerAwareInstanceRegistryImpl.register方法进行注册,那么replicateToPeers方法,就会将注册数据同步到其他节点,此时这里的isReplication
参数是false。
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
// 将注册信息同步到其他集群节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
跟到这个方法里面看一下,它会循环所有配置的集群节点信息,并排除自己。然后带上服务实例的注册信息,分别调用其他节点的注册接口,但是这里和client调用是有区别的,他底层的调用发起类是JerseyReplicationClient,在发起http请求的时候,一定会带上请求头:webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
,那么其他节点在收到请求的时候,上面的isReplication
就是肯定是true了。当isReplication
是true的时候,不会像其他节点进行注册。
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
eureka client第一次启动的时候,会从eureka server端抓取全量的注册表,在本地进行缓存。后续每隔30秒从eureka server端抓取增量的注册表信息,和本地缓存进行合并。
在eureka client初始化的时候,会全量的抓取一次注册表,然后在initScheduledTasks里启动了一个定时任务,每隔30秒会定时增量同步一次注册表的信息,具体的执行类叫做CacheRefreshThread
。
GET apps/delta
eureka提供了delta参数,在client端及server端都有。client端主要是控制刷新registry的时候,是否使用调用/apps/delta接口,然后根据返回数据的ActionType来作用于本地数据。而server端则提供/apps/delta接口,它的主要逻辑是在registry的修改操作都会放recentlyChangedQueue存放RecentlyChangedItem事件,然后有个定时任务去剔除距离上次更新时间超过指定阈值的item;而查询接口则是从recentlyChangedQueue获取数据然后返回。
com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 心跳线程
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 更新缓存线程
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
// 准备endpoint server,采用jersy服务器
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
初始化所有任务,3个定时任务
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 定时任务1:TimedSupervisorTask任务中用线程池执行任务CacheRefreshThread;获取注册信息,registryFetchIntervalSeconds秒获取一次
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
// 定时任务2:心跳
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 同步节点副本
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 定时任务3:同步副本
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
com.netflix.discovery.DiscoveryClient#refreshRegistry
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes);
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
leaseRenewalIntervalInSeconds 30 每30秒续约一次
leaseExpirationDurationInSeconds 90 90秒内没续约,则判定下线
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient#execute
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 按顺序取serviceUrl
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
com.netflix.discovery.InstanceInfoReplicator#run
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 本地实例有修改,重新注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 继续定时执行
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
void refreshInstanceInfo() {
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
Service Consumer在启动时会从Eureka Server获取所有服务列表,并在本地缓存。需要注意的是,需要确保配置eureka.client.shouldFetchRegistry=true
由于在本地有一份缓存,所以需要定期更新,定期更新频率可以通过eureka.client.registryFetchIntervalSeconds配置。
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
//用Stopwatch做耗时分析
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 取出本地缓存的,之气获取的服务列表信息
Applications applications = getApplications();
//判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新:
//1. 是否禁用增量更新;
//2. 是否对某个region特别关注;
//3. 外部调用时是否通过入参指定全量更新;
//4. 本地还未缓存有效的服务列表信息;
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//这些详细的日志可以看出触发全量更新的原因
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
//全量更新
getAndStoreFullRegistry();
} else {
//增量更新
getAndUpdateDelta(applications);
}
//重新计算和设置一致性hash码
applications.setAppsHashCode(applications.getReconcileHashCode());
//日志打印所有应用的所有实例数之和
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
//将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写
onCacheRefreshed();
//检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态,
//当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态,
//上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件
updateInstanceRemoteStatus();
return true;
}
https://juejin.im/post/5cf5c3aef265da1b7a4b5f97
https://www.jianshu.com/p/3a3a1a5891ec
https://www.infoq.cn/article/jlDJQ*3wtN2PcqTDyokh
https://zhuanlan.zhihu.com/p/24829766
https://www.jianshu.com/p/e112c134f966
http://www.saily.top/2020/03/28/springcloud/eureka05/
https://blog.csdn.net/boling_cavalry/article/details/82813180