2021年7月9日星期五

Eureka功能和可用性解读

元数据

除了普通的基础设置之外,eureka支持自定义元数据。配置方式如下

eureka:	instance:			 metadata-map:							cluster: cl1							name: zhaozhen

获取元数据代码

  List<ServiceInstance> list = discoveryClient.getInstances("zhao-service-resume");  ServiceInstance serviceInstance = list.get(0);  list.stream().forEach(s->{   System.out.println(s.getMetadata());  });

在调用时通过断点可以知道具体的元数据。在实际使用过程中,我们可以针对配置的不同元数据采取不同的执行
file

可用性

从技术网站上搜到的一个面试题就有这样的问题:eureka怎么保证可用性.
众所周知,eureka采用的是AP模式,实现高可用最好的方式就是利用最少三台eureke server实例,实现两两之间的服务注册。从而达到同步数据的目的
那么这就涉及到如下的方面

  • eureka client和eureka server之间如何进行通信
  • eureka注册在客户端和服务端分别怎么操作实现可用性的
  • eureka续约/心跳在客户端和服务端分别怎么操作实现可用性的
  • eureka下线是怎么操作的

eureka client和eureka server之间如何进行通信

通过查询各种资料并追踪自动配置类发现,eureka和eureka之间的通信是采用类似springmvc的Jersey框架暴露接口进行通信的。通信的形式基本类似于我们使用http进行请求的方式。在EurekaServerAutoConfiguration中通过注入FilterRegistrationBean实现了在filter中加入包含了指定包名下的所有的Jersey的外部接口

/**	 * Register the Jersey filter	 */	@Bean	public FilterRegistrationBean jerseyFilterRegistration(			javax.ws.rs.core.Application eurekaJerseyApp) {		FilterRegistrationBean bean = new FilterRegistrationBean();		bean.setFilter(new ServletContainer(eurekaJerseyApp));		bean.setOrder(Ordered.LOWEST_PRECEDENCE);		bean.setUrlPatterns(				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));		return bean;	}	/**	 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources	 * required by the Eureka server.	 */	@Bean	public javax.ws.rs.core.Application jerseyApplication(Environment environment,			ResourceLoader resourceLoader) {		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(				false, environment);		// Filter to include only classes that have a particular annotation.		//		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));		// Find classes in Eureka packages (or subpackages)		//		Set<Class<?>> classes = new HashSet<>();		for (String basePackage : EUREKA_PACKAGES) {			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);			for (BeanDefinition bd : beans) {				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),						resourceLoader.getClassLoader());				classes.add(cls);			}		}		// Construct the Jersey ResourceConfig		//		Map<String, Object> propsAndFeatures = new HashMap<>();		propsAndFeatures.put(				// Skip static content used by the webapp				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");		DefaultResourceConfig rc = new DefaultResourceConfig(classes);		rc.setPropertiesAndFeatures(propsAndFeatures);		return rc;	}

代码中扫描的EUREKA_PACKAGES(private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
"com.netflix.eureka" };)即是Jersey框架的具体的接口类
file

另外可以提一点的就是,eureka对外暴露的dashboard依然采用的是springmvc的controller形式。具体的可以看到在EurekaServerAutoConfiguration中注入的EurekaController

	@Bean	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)	public EurekaController eurekaController() {		return new EurekaController(this.applicationInfoManager);	}

感兴趣的可以再研究下后续EurekaController的内部实现

eureka注册在客户端和服务端分别怎么操作实现可用性的

服务每隔30秒会向注册中⼼续约(⼼跳)⼀次(也称为报活),如果没有续约,租约在90秒后到期,然后服务会被失效。每隔30秒的续约操作我们称之为⼼跳检测
首先在服务端,通过上述的Jersey框架暴露的接口进行注册,在ApplicationResource中通过addInstance进行注册,在这个过程中另一个eureka server也相当于是一个eureka client,同样会进行注册
file
通过addInstance中的register方法,一直向下调试到PeerAwareInstanceRegistryImpl的replicateInstanceActionsToPeers相互注册方法

 /**  * Replicates all instance changes to peer eureka nodes except for  * replication traffic to this node.  *  */ private void replicateInstanceActionsToPeers(Action action, String appName,             String id, InstanceInfo info, InstanceStatus newStatus,             PeerEurekaNode node) {  try {   InstanceInfo infoFromRegistry = null;   CurrentRequestVersion.set(Version.V2);   switch (action) {    case Cancel:     node.cancel(appName, id);     break;    case Heartbeat:     InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);     infoFromRegistry = getInstanceByAppAndId(appName, id, false);     node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);     break;    case Register:     node.register(info);     break;    case StatusUpdate:     infoFromRegistry = getInstanceByAppAndId(appName, id, false);     node.statusUpdate(appName, id, newStatus, infoFromRegistry);     break;    case DeleteStatusOverride:     infoFromRegistry = getInstanceByAppAndId(appName, id, false);     node.deleteStatusOverride(appName, id, infoFromRegistry);     break;   }  } catch (Throwable t) {   logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);  } }

此时,注册时,进入的是Register

 public void register(final InstanceInfo info) throws Exception {  long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);  batchingDispatcher.process(    taskId("register", info),    new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {     public EurekaHttpResponse<Void> execute() {      return replicationClient.register(info);     }    },    expiryTime  ); }

查阅源码可知此处的getLeaseRenewalOf(info)的默认值为90秒,这就印证了90秒到期的说法

 private static int getLeaseRenewalOf(InstanceInfo info) {  return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000; }

发起请求

 @Override public EurekaHttpResponse<Void> register(InstanceInfo info) {  String urlPath = "apps/" + info.getAppName();  ClientResponse response = null;  try {   Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();   addExtraHeaders(resourceBuilder);   response = resourceBuilder     .header("Accept-Encoding", "gzip")     .type(MediaType.APPLICATION_JSON_TYPE)     .accept(MediaType.APPLICATION_JSON)     .post(ClientResponse.class, info);   return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();  } finally {   if (logger.isDebugEnabled()) {    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),      response == null ? "N/A" : response.getStatus());   }   if (response != null) {    response.close();   }  } }

发起请求的地址可以追踪到的是ApplicationsResource中的

 @Path("{appId}") public ApplicationResource getApplicationResource(   @PathParam("version") String version,   @PathParam("appId") String appId) {  CurrentRequestVersion.set(Version.toEnum(version));  return new ApplicationResource(appId, serverConfig, registry); }

此处重新构建了一个ApplicationResource对象。并将服务的信息配置等传递到application中,等待后续使用
分析完这一段之后,我对addInstance如何接收请求的还是有疑问,经过断点调试发现,这个过程实际上是通过EurekaServerAutoConfiguration引入的 EurekaServerInitializerConfiguration来完成的,

@Configurationpublic class EurekaServerInitializerConfiguration		implements ServletContextAware, SmartLifecycle, Ordered {}

EurekaServerInitializerConfiguration实现了SmartLifecycle方法,start方法会再容器初始化时执行。而start方法的内容

@Override	public void start() {		new Thread(new Runnable() {			@Override			public void run() {				try {					//TODO: is this class even needed now?					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);					log.info("Started Eureka Server");					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));					EurekaServerInitializerConfiguration.this.running = true;					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));				}				catch (Exception ex) {					// Help!					log.error("Could not initialize Eureka servlet context", ex);				}			}		}).start();	}

具体的业务内容在

	public void contextInitialized(ServletContext context) {		try {			initEurekaEnvironment();			initEurekaServerContext();			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);		}		catch (Throwable e) {			log.error("Cannot bootstrap eureka server :", e);			throw new RuntimeException("Cannot bootstrap eureka server :", e);		}	}

第一步initEurekaEnvironment为初始化环境,第二步initEurekaServerContext为业务操作
而随后的操作中最主要的是

		int registryCount = this.registry.syncUp();		this.registry.openForTraffic(this.applicationInfoManager, registryCount);		// Register all monitoring statistics.		EurekaMonitors.registerAllStats();

openForTraffic中主要是为开启服务通信做准备

 @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {  // Renewals happen every 30 seconds and for a minute it should be a factor of 2.  this.expectedNumberOfClientsSendingRenews = count;  updateRenewsPerMinThreshold();  logger.info("Got {} instances from neighboring DS node", count);  logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);  this.startupTime = System.currentTimeMillis();  if (count > 0) {   this.peerInstancesTransferEmptyOnStartup = false;  }  DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();  boolean isAws = Name.Amazon == selfName;  if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {   logger.info("Priming AWS connections for all replicas..");   primeAwsReplicas(applicationInfoManager);  }  logger.info("Changing status to UP");  applicationInfoManager.setInstanceStatus(InstanceStatus.UP);  super.postInit(); }

引发向addIntsance发起请求的就是 applicationInfoManager.setInstanceStatus(InstanceStatus.UP);这个方法内部执行一串事件
其中就有向addInstance发起请求的

public synchronized void setInstanceStatus(InstanceStatus status) {  InstanceStatus next = instanceStatusMapper.map(status);  if (next == null) {   return;  }  InstanceStatus prev = instanceInfo.setStatus(next);  if (prev != null) {   for (StatusChangeListener listener : listeners.values()) {    try {     listener.notify(new StatusChangeEvent(prev, next));    } catch (Exception e) {     logger.warn("failed to notify listener: {}", listener.getId(), e);    }   }  } }

DiscoveryClient类内部

   statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {    @Override    public String getId() {     return "statusChangeListener";    }    @Override    public void notify(StatusChangeEvent statusChangeEvent) {     if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||       InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {      // log at warn level if DOWN was involved      logger.warn("Saw local status change event {}", statusChangeEvent);     } else {      logger.info("Saw local status change event {}", statusChangeEvent);     }     instanceInfoReplicator.onDemandUpdate();    }   };

指向

 public void run() {  try {   discoveryClient.refreshInstanceInfo();   Long dirtyTimestamp = instanceInfo.isDirtyWithTime();   if (dirtyTimestamp != null) {    discoveryClient.register();    instanceInfo.unsetIsDirty(dirtyTimestamp);   }  } catch (Throwable t) {   logger.warn("There was a problem with the instance info replicator", t);  } finally {   Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);   scheduledPeriodicRef.set(next);  } }
<......

原文转载:http://www.shaoqun.com/a/859330.html

跨境电商:https://www.ikjzd.com/

汇通天下物流:https://www.ikjzd.com/w/2055

dhl:https://www.ikjzd.com/w/516

黑石集团:https://www.ikjzd.com/w/1339.html


元数据除了普通的基础设置之外,eureka支持自定义元数据。配置方式如下eureka: instance: metadata-map: cluster:cl1 name:zhaozhen获取元数据代码List<ServiceInstance>list=discoveryClient.getInstances("zhao-service-resume
parser:https://www.ikjzd.com/w/680
ask me:https://www.ikjzd.com/w/2459
网络星期一:https://www.ikjzd.com/w/80
作为亚马逊卖家,哪些技能是必备的呢?:https://www.ikjzd.com/articles/126846
把竞争对手从自己亚马逊Listing页面挤掉,备战2020节日季你不知道的3个广告技巧!:https://www.ikjzd.com/articles/126921
这样打造Listing,第二天他就出了200单!:https://www.ikjzd.com/articles/126920
​突发!字节跳动或同意完全放弃TikTok股份:https://www.ikjzd.com/articles/126919
欲望嫂子用修电脑引诱我:http://www.30bags.com/m/a/249581.html
女海王阵型!不告诉男朋友就勾搭富二代!数百万粉丝,Tik Tok女人,网络名人,做了很多轮:http://lady.shaoqun.com/a/410933.html
清迈的一个天桥小舒林成同志,被居民举报:http://lady.shaoqun.com/a/410934.html
女人为什么选择"暧昧"?倾听中年妇女的心声:http://lady.shaoqun.com/a/410935.html
独立站高昂流量成废品,如何操作才能起死回生?(上):https://www.ikjzd.com/articles/146487

没有评论:

发表评论

跨境电商资讯:外贸宣传平台有哪些(出口的

现在很多做外贸的人都非常关注 外贸企业怎么推广 ,而现在推广的途径和平台有很多,企业如果都做,成本和时间精力是一个问题,而且并不是所有的推广渠道都是有用的。今天云程网络就来为大家盘点几个有效的外贸推广渠道。 一、海外社交媒体营销 Facebook,领英等海外社交媒体营销在近几年得...