diywap手机微网站内容管理系统,服装公司简介模板,免费发布信息平台大全,wordpress页脚改颜色题主的问题描述太绕了#xff0c;我们先把集群中的角色定义下#xff1a;Eureka架构比较细节的架构图如下所示#xff1a;在配置多个EurekaServer的Service Provider#xff0c;每次Service Provider启动的时候会选择一个Eureka Server#xff0c;之后如果这个Eureka Serv…题主的问题描述太绕了我们先把集群中的角色定义下Eureka架构比较细节的架构图如下所示在配置多个EurekaServer的Service Provider每次Service Provider启动的时候会选择一个Eureka Server之后如果这个Eureka Server挂了才会切换Eureka Server在当前使用的Eureka Server挂掉之前不会切换。被Service Provider选择用来发送请求Eureka Server其实比其他Server多了一项工作就是发客户端发来的请求转发到集群中其他的Eureka Server。其实这个压力并没有太大但是如果集群中实例个数比较多或者心跳间隔比较短的情况下的确有不小的压力。可以考虑每个服务配置的Eureka Server顺序不一样。但是其实仔细想想只是个请求转发能有多大压力啊。。。。最后我们详细分析下服务注册与取消的源代码(可以直接参考下我的博客关于Eureka的系列分析张哈希的博客 - CSDN博客blog.csdn.net)关于服务注册开启/关闭服务注册配置eureka.client.register-with-eureka true (默认)什么时候注册应用第一次启动时初始化EurekaClient时应用状态改变从STARTING变为UP会触发这个Listener调用instanceInfoReplicator.onDemandUpdate(); 可以推测出实例状态改变时也会通过注册接口更新实例状态信息statusChangeListener new ApplicationInfoManager.StatusChangeListener() {Overridepublic String getId() {return statusChangeListener;}Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN statusChangeEvent.getStatus() ||InstanceStatus.DOWN statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn(Saw local status change event {}, statusChangeEvent);} else {logger.info(Saw local status change event {}, statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};定时任务如果InstanceInfo发生改变也会通过注册接口更新信息public void run() {try {discoveryClient.refreshInstanceInfo();//如果实例信息发生改变则需要调用register更新InstanceInfoLong dirtyTimestamp instanceInfo.isDirtyWithTime();if (dirtyTimestamp ! null) {discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn(There was a problem with the instance info replicator, t);} finally {Future next scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}在定时renew时如果renew接口返回404(代表这个实例在EurekaServer上面找不到)可能是之前注册失败或者注册过期导致的。这时需要调用register重新注册boolean renew() {EurekaHttpResponse httpResponse;try {httpResponse eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug({} - Heartbeat status: {}, PREFIX appPathIdentifier, httpResponse.getStatusCode());//如果renew接口返回404(代表这个实例在EurekaServer上面找不到)可能是之前注册失败或者注册过期导致的if (httpResponse.getStatusCode() 404) {REREGISTER_COUNTER.increment();logger.info({} - Re-registering apps/{}, PREFIX appPathIdentifier, instanceInfo.getAppName());long timestamp instanceInfo.setIsDirtyWithTime();boolean success register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() 200;} catch (Throwable e) {logger.error({} - was unable to send heartbeat!, PREFIX appPathIdentifier, e);return false;}}向Eureka发送注册请求EurekaServer发生了什么主要有两个存储一个是之前提到过的registry还有一个最近变化队列后面我们会知道这个最近变化队列里面就是客户端获取增量实例信息的内容# 整体注册信息缓存private final ConcurrentHashMap registry new ConcurrentHashMap();# 最近变化队列private ConcurrentLinkedQueue recentlyChangedQueue new ConcurrentLinkedQueue();EurekaServer收到实例注册主要分两步调用父类方法注册同步到其他EurekaServer实例public void register(InstanceInfo info, boolean isReplication) {int leaseDuration 90;if (info.getLeaseInfo() ! null info.getLeaseInfo().getDurationInSecs() 0) {leaseDuration info.getLeaseInfo().getDurationInSecs();}//调用父类方法注册super.register(info, leaseDuration, isReplication);//同步到其他EurekaServer实例this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);}我们先看同步到其他EurekaServer实例其实就是注册到的EurekaServer再依次调用其他集群内的EurekaServer的Register方法将实例信息同步过去private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer action.getTimer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes Collections.EMPTY_LIST || isReplication) {return;}for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {continue;}replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();}}private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry null;CurrentRequestVersion.set(Version.V2);switch (action) {case Cancel:node.cancel(appName, id);break;case Heartbeat:InstanceStatus overriddenStatus overriddenInstanceStatusMap.get(id);infoFromRegistry getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:node.register(info);break;case StatusUpdate:infoFromRegistry getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:infoFromRegistry getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error(Cannot replicate information to {} for action {}, node.getServiceUrl(), action.name(), t);}}然后看看调用父类方法注册public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {//register虽然看上去好像是修改但是这里用的是读锁后面会解释read.lock();//从registry中查看这个app是否存在Map gMap registry.get(registrant.getAppName());//不存在就创建if (gMap null) {final ConcurrentHashMap gNewMap new ConcurrentHashMap();gMap registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap null) {gMap gNewMap;}}//查看这个app的这个实例是否已存在Lease existingLease gMap.get(registrant.getId());if (existingLease ! null (existingLease.getHolder() ! null)) {//如果已存在对比时间戳保留比较新的实例信息......} else {// 如果不存在证明是一个新的实例//更新自我保护监控变量的值的代码.....}Lease lease new Lease(registrant, leaseDuration);if (existingLease ! null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}//放入registrygMap.put(registrant.getId(), lease);//加入最近修改的记录队列recentlyChangedQueue.add(new RecentlyChangedItem(lease));//初始化状态记录时间等相关代码......//主动让Response缓存失效invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());} finally {read.unlock();}}总结起来就是主要三件事1.将实例注册信息放入或者更新registry2.将实例注册信息加入最近修改的记录队列3.主动让Response缓存失效我们来类比下服务取消服务取消CANCELprotected boolean internalCancel(String appName, String id, boolean isReplication) {try {//cancel虽然看上去好像是修改但是这里用的是读锁后面会解释read.lock();//从registry中剔除这个实例Map gMap registry.get(appName);Lease leaseToCancel null;if (gMap ! null) {leaseToCancel gMap.remove(id);}if (leaseToCancel null) {logger.warn(DS: Registry: cancel failed because Lease is not registered for: {}/{}, appName, id);return false;} else {//改变状态记录状态修改时间等相关代码......if (instanceInfo ! null) {instanceInfo.setActionType(ActionType.DELETED);//加入最近修改的记录队列recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));}//主动让Response缓存失效invalidateCache(appName, vip, svip);logger.info(Cancelled instance {}/{} (replication{}), appName, id, isReplication);return true;}} finally {read.unlock();}}总结起来也是主要三件事1.从registry中剔除这个实例2.将实例注册信息加入最近修改的记录队列3.主动让Response缓存失效这里我们注意到了这个最近修改队列我们来详细看看最近修改队列这个最近修改队列和消费者定时获取服务实例列表有着密切的关系private TimerTask getDeltaRetentionTask() {return new TimerTask() {Overridepublic void run() {Iterator it recentlyChangedQueue.iterator();while (it.hasNext()) {if (it.next().getLastUpdateTime() System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {it.remove();} else {break;}}}};}这个RetentionTimeInMSInDeltaQueue默认是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默认是180s官网写错了)可以看出这个队列是一个长度为180s的滑动窗口保存最近180s以内的应用实例信息修改后面我们会看到客户端调用获取增量信息实际上就是从这个queue中读取所以可能一段时间内读取到的信息都是一样的。关于服务与实例列表获取EurekaClient端我们从Ribbon说起EurekaClient也存在缓存应用服务实例列表信息在每个EurekaClient服务消费端都有缓存。一般的Ribbon的LoadBalancer会读取这个缓存来知道当前有哪些实例可以调用从而进行负载均衡。这个loadbalancer同样也有缓存。首先看这个LoadBalancer的缓存更新机制相关类是PollingServerListUpdaterfinal Runnable wrapperRunnable new Runnable() {Overridepublic void run() {if (!isActive.get()) {if (scheduledFuture ! null) {scheduledFuture.cancel(true);}return;}try {//从EurekaClient缓存中获取服务实例列表保存在本地缓存updateAction.doUpdate();lastUpdated System.currentTimeMillis();} catch (Exception e) {logger.warn(Failed one update cycle, e);}}};//定时调度scheduledFuture getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable,initialDelayMs,refreshIntervalMs,TimeUnit.MILLISECONDS);这个updateAction.doUpdate();就是从EurekaClient缓存中获取服务实例列表保存在BaseLoadBalancer的本地缓存protected volatile List allServerList Collections.synchronizedList(new ArrayList());public void setServersList(List lsrv) {//写入allServerList的代码这里略}Overridepublic List getAllServers() {return Collections.unmodifiableList(allServerList);}这里的getAllServers会在每个负载均衡规则中被调用例如RoundRobinRulepublic Server choose(ILoadBalancer lb, Object key) {if (lb null) {log.warn(no load balancer);return null;}Server server null;int count 0;while (server null count 10) {List reachableServers lb.getReachableServers();//获取服务实例列表调用的就是刚刚提到的getAllServersList allServers lb.getAllServers();int upCount reachableServers.size();int serverCount allServers.size();if ((upCount 0) || (serverCount 0)) {log.warn(No up servers available from load balancer: lb);return null;}int nextServerIndex incrementAndGetModulo(serverCount);server allServers.get(nextServerIndex);if (server null) {/* Transient. */Thread.yield();continue;}if (server.isAlive() (server.isReadyToServe())) {return (server);}// Next.server null;}if (count 10) {log.warn(No available alive servers after 10 tries from load balancer: lb);}return server;}这个缓存需要注意下有时候我们只修改了EurekaClient缓存的更新时间但是没有修改这个LoadBalancer的刷新本地缓存时间就是ribbon.ServerListRefreshInterval,这个参数可以设置的很小因为没有从网络读取就是从一个本地缓存刷到另一个本地缓存(如何配置缓存配置来实现服务实例快速下线快速感知快速刷新可以参考我的另一篇文章)。然后我们来看一下EurekaClient本身的缓存直接看关键类DiscoveryClient的相关源码我们这里只关心本地Region的多Region配置我们先忽略//本地缓存可以理解为是一个软链接private final AtomicReference localRegionApps new AtomicReference();private void initScheduledTasks() {//如果配置为需要拉取服务列表则设置定时拉取任务这个配置默认是需要拉取服务列表if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask(cacheRefresh,scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}//其他定时任务初始化的代码忽略}//定时从EurekaServer拉取服务列表的任务class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();}}void refreshRegistry() {try {//多Region配置处理代码忽略boolean success fetchRegistry(remoteRegionsModified);if (success) {registrySize localRegionApps.get().size();lastSuccessfulRegistryFetchTimestamp System.currentTimeMillis();}//日志代码忽略} catch (Throwable e) {logger.error(Cannot fetch registry from server, e);}}//定时从EurekaServer拉取服务列表的核心方法private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer FETCH_REGISTRY_TIMER.start();try {Applications applications getApplications();//判断如果是第一次拉取或者app列表为空就进行全量拉取否则就会进行增量拉取if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications null)|| (applications.getRegisteredApplications().size() 0)|| (applications.getVersion() -1)) //Client application does not have latest library supporting delta{getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);}applications.setAppsHashCode(applications.getReconcileHashCode());logTotalInstances();} catch (Throwable e) {logger.error(PREFIX appPathIdentifier - was unable to refresh its cache! status e.getMessage(), e);return false;} finally {if (tracer ! null) {tracer.stop();}}//缓存更新完成发送个event给观察者目前没啥用onCacheRefreshed();// 检查下远端的服务实例列表里面包括自己并且状态是否对这里我们不关心updateInstanceRemoteStatus();// registry was fetched successfully, so return truereturn true;}//全量拉取代码private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration fetchRegistryGeneration.get();Applications apps null;//访问/eureka/apps接口拉取所有服务实例信息EurekaHttpResponse httpResponse clientConfig.getRegistryRefreshSingleVipAddress() null? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());if (httpResponse.getStatusCode() Status.OK.getStatusCode()) {apps httpResponse.getEntity();}logger.info(The response status is {}, httpResponse.getStatusCode());if (apps null) {logger.error(The application is null for some reason. Not storing this information);} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration 1)) {localRegionApps.set(this.filterAndShuffle(apps));logger.debug(Got full registry with apps hashcode {}, apps.getAppsHashCode());} else {logger.warn(Not updating applications as another thread is updating it already);}}//增量拉取代码private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration fetchRegistryGeneration.get();Applications delta null;//访问/eureka/delta接口拉取所有服务实例增量信息EurekaHttpResponse httpResponse eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());if (httpResponse.getStatusCode() Status.OK.getStatusCode()) {delta httpResponse.getEntity();}if (delta null) {//如果delta为空拉取增量失败就全量拉取logger.warn(The server does not allow the delta revision to be applied because it is not safe. Hence got the full registry.);getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration 1)) {//这里设置原子锁的原因是怕某次调度网络请求时间过长导致同一时间有多线程拉取到增量信息并发修改//拉取增量成功检查hashcode是否一样不一样的话也会全量拉取logger.debug(Got delta update with apps hashcode {}, delta.getAppsHashCode());String reconcileHashCode ;if (fetchRegistryUpdateLock.tryLock()) {try {updateDelta(delta);reconcileHashCode getReconcileHashCode(applications);} finally {fetchRegistryUpdateLock.unlock();}} else {logger.warn(Cannot acquire update lock, aborting getAndUpdateDelta);}// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall}} else {logger.warn(Not updating application delta as another thread is updating it already);logger.debug(Ignoring delta update with apps hashcode {}, as another thread is updating it already, delta.getAppsHashCode());}}以上就是对于EurekaClient拉取服务实例信息的源代码分析总结EurekaClient 重要缓存如下EurekaClient第一次全量拉取定时增量拉取应用服务实例信息保存在缓存中。EurekaClient增量拉取失败或者增量拉取之后对比hashcode发现不一致就会执行全量拉取这样避免了网络某时段分片带来的问题。同时对于服务调用如果涉及到ribbon负载均衡那么ribbon对于这个实例列表也有自己的缓存这个缓存定时从EurekaClient的缓存更新EurekaServer端在EurekaServer端所有的读取请求都是读的ReadOnlyMap(这个可以配置) 有定时任务会定时从ReadWriteMap同步到ReadOnlyMap这个时间配置是#eureka server刷新readCacheMap的时间注意client读取的是readCacheMap这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上#默认30seureka.server.responseCacheUpdateInvervalMs3000相关代码if (shouldUseReadOnlyResponseCache) {timer.schedule(getCacheUpdateTask(),new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) responseCacheUpdateIntervalMs),responseCacheUpdateIntervalMs);}private TimerTask getCacheUpdateTask() {return new TimerTask() {Overridepublic void run() {logger.debug(Updating the client cache from response cache);for (Key key : readOnlyCacheMap.keySet()) {if (logger.isDebugEnabled()) {Object[] args {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};logger.debug(Updating the client cache from response cache for key : {} {} {} {}, args);}try {CurrentRequestVersion.set(key.getVersion());Value cacheValue readWriteCacheMap.get(key);Value currentCacheValue readOnlyCacheMap.get(key);if (cacheValue ! currentCacheValue) {readOnlyCacheMap.put(key, cacheValue);}} catch (Throwable th) {logger.error(Error while updating the client cache from response cache, th);}}}};}ReadWriteMap是一个LoadingCache将Registry中的服务实例信息封装成要返回的http响应(分别是经过gzip压缩和非压缩的)同时还有两个特殊keyALL_APPS和ALL_APPS_DELTA ALL_APPS就是所有服务实例信息 ALL_APPS_DELTA就是之前讲注册说的RecentlyChangedQueue里面的实例列表封装的http响应信息