元数据
除了普通的基础设置之外,eureka支持自定义元数据。配置方式如下
eureka: instance: metadata-map: cluster: cl1 name: zhaozhen获取元数据代码
List<ServiceInstance> list = discoveryClient.getInstances("zhao-service-resume"); ServiceInstance serviceInstance = list.get(0); list.stream().forEach(s->{ System.out.println(s.getMetadata()); });在调用时通过断点可以知道具体的元数据。在实际使用过程中,我们可以针对配置的不同元数据采取不同的执行
可用性
从技术网站上搜到的一个面试题就有这样的问题:eureka怎么保证可用性.
众所周知,eureka采用的是AP模式,实现高可用最好的方式就是利用最少三台eureke server实例,实现两两之间的服务注册。从而达到同步数据的目的
那么这就涉及到如下的方面
- eureka client和eureka server之间如何进行通信
- eureka注册在客户端和服务端分别怎么操作实现可用性的
- eureka续约/心跳在客户端和服务端分别怎么操作实现可用性的
- eureka下线是怎么操作的
eureka client和eureka server之间如何进行通信
通过查询各种资料并追踪自动配置类发现,eureka和eureka之间的通信是采用类似springmvc的Jersey框架暴露接口进行通信的。通信的形式基本类似于我们使用http进行请求的方式。在EurekaServerAutoConfiguration中通过注入FilterRegistrationBean实现了在filter中加入包含了指定包名下的所有的Jersey的外部接口
/** * Register the Jersey filter */ @Bean public FilterRegistrationBean jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; } /** * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources * required by the Eureka server. */ @Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( false, environment); // Filter to include only classes that have a particular annotation. // provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); // Find classes in Eureka packages (or subpackages) // Set<Class<?>> classes = new HashSet<>(); for (String basePackage : EUREKA_PACKAGES) { Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); for (BeanDefinition bd : beans) { Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } } // Construct the Jersey ResourceConfig // Map<String, Object> propsAndFeatures = new HashMap<>(); propsAndFeatures.put( // Skip static content used by the webapp ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*"); DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures); return rc; }代码中扫描的EUREKA_PACKAGES(private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
"com.netflix.eureka" };)即是Jersey框架的具体的接口类
另外可以提一点的就是,eureka对外暴露的dashboard依然采用的是springmvc的controller形式。具体的可以看到在EurekaServerAutoConfiguration中注入的EurekaController
@Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); }感兴趣的可以再研究下后续EurekaController的内部实现
eureka注册在客户端和服务端分别怎么操作实现可用性的
服务每隔30秒会向注册中⼼续约(⼼跳)⼀次(也称为报活),如果没有续约,租约在90秒后到期,然后服务会被失效。每隔30秒的续约操作我们称之为⼼跳检测
首先在服务端,通过上述的Jersey框架暴露的接口进行注册,在ApplicationResource中通过addInstance进行注册,在这个过程中另一个eureka server也相当于是一个eureka client,同样会进行注册
通过addInstance中的register方法,一直向下调试到PeerAwareInstanceRegistryImpl的replicateInstanceActionsToPeers相互注册方法
/** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }此时,注册时,进入的是Register
public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }查阅源码可知此处的getLeaseRenewalOf(info)的默认值为90秒,这就印证了90秒到期的说法
private static int getLeaseRenewalOf(InstanceInfo info) { return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000; }发起请求
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }发起请求的地址可以追踪到的是ApplicationsResource中的
@Path("{appId}") public ApplicationResource getApplicationResource( @PathParam("version") String version, @PathParam("appId") String appId) { CurrentRequestVersion.set(Version.toEnum(version)); return new ApplicationResource(appId, serverConfig, registry); }此处重新构建了一个ApplicationResource对象。并将服务的信息配置等传递到application中,等待后续使用
分析完这一段之后,我对addInstance如何接收请求的还是有疑问,经过断点调试发现,这个过程实际上是通过EurekaServerAutoConfiguration引入的 EurekaServerInitializerConfiguration来完成的,
@Configurationpublic class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {}EurekaServerInitializerConfiguration实现了SmartLifecycle方法,start方法会再容器初始化时执行。而start方法的内容
@Override public void start() { new Thread(new Runnable() { @Override public void run() { try { //TODO: is this class even needed now? eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); }具体的业务内容在
public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }第一步initEurekaEnvironment为初始化环境,第二步initEurekaServerContext为业务操作
而随后的操作中最主要的是
int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats();openForTraffic中主要是为开启服务通信做准备
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }引发向addIntsance发起请求的就是 applicationInfoManager.setInstanceStatus(InstanceStatus.UP);这个方法内部执行一串事件
其中就有向addInstance发起请求的
public synchronized void setInstanceStatus(InstanceStatus status) { InstanceStatus next = instanceStatusMapper.map(status); if (next == null) { return; } InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }DiscoveryClient类内部
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };指向
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }<......原文转载:http://www.shaoqun.com/a/859330.html
跨境电商:https://www.ikjzd.com/
汇通天下物流:https://www.ikjzd.com/w/2055
dhl:https://www.ikjzd.com/w/516
黑石集团:https://www.ikjzd.com/w/1339.html
元数据除了普通的基础设置之外,eureka支持自定义元数据。配置方式如下eureka: instance: metadata-map: cluster:cl1 name:zhaozhen获取元数据代码List<ServiceInstance>list=discoveryClient.getInstances("zhao-service-resume
parser:https://www.ikjzd.com/w/680
ask me:https://www.ikjzd.com/w/2459
网络星期一:https://www.ikjzd.com/w/80
作为亚马逊卖家,哪些技能是必备的呢?:https://www.ikjzd.com/articles/126846
把竞争对手从自己亚马逊Listing页面挤掉,备战2020节日季你不知道的3个广告技巧!:https://www.ikjzd.com/articles/126921
这样打造Listing,第二天他就出了200单!:https://www.ikjzd.com/articles/126920
突发!字节跳动或同意完全放弃TikTok股份:https://www.ikjzd.com/articles/126919
欲望嫂子用修电脑引诱我:http://www.30bags.com/m/a/249581.html
女海王阵型!不告诉男朋友就勾搭富二代!数百万粉丝,Tik Tok女人,网络名人,做了很多轮:http://lady.shaoqun.com/a/410933.html
清迈的一个天桥小舒林成同志,被居民举报:http://lady.shaoqun.com/a/410934.html
女人为什么选择"暧昧"?倾听中年妇女的心声:http://lady.shaoqun.com/a/410935.html
独立站高昂流量成废品,如何操作才能起死回生?(上):https://www.ikjzd.com/articles/146487
没有评论:
发表评论