余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

阅读代码深入原理16——Spring Cloud Netflix之servo

xiyangw 2022-11-24 16:32 25 浏览 0 评论

  • 度量

度量指标一般可以分为计数器(Counter)、计时器(Timer)、仪表盘(Gauge)、摘要(Summary)。

度量指标在servo里用Monitor来抽象,在spectator里用Meter来抽象。

阅读代码深入原理16——Spring Cloud Netflix之servo

计数器单调递增,用于记录事件发生次数,比如某个页面访问数。

计时器用于记录一个事件前后时间,获得事件处理时间。

仪表盘可增可减,用于记录某个时间点的一个事件类型的数值,比如CPU使用率。

摘要用于一段时间内的采样数据,在servo中用Counter和Gauge组合实现,在spectator为DistributionSummary。

当然,在Micrometer和Prometheus对上述内容又稍有差异,此处不做深入探究。

  • servo

spring-cloud-netflix为我们提供了ServoMetricsAutoConfiguration(条件是spectator不存在),引入了配置类MetricsInterceptorConfiguration。

在MetricsInterceptorConfiguration里定义了MetricsHandlerInterceptor负责记录接收HTTP请求前后的耗时,定义了MetricsClientHttpRequestInterceptor为使用RestTemplate发出的请求记录耗时。

而ServoMetricsAutoConfiguration本身还定义了其它bean,比如ServoMetricsConfigBean、MonitorRegistry、ServoMetricNaming、ServoMetricReader等。

netflix的代码中有使用@Monitor注解,然后注册的,比如DiscoveryClient使用EurekaHttpClients构造EurekaHttpClient时,作为包装的RetryableEurekaHttpClient、SessionedEurekaHttpClient都是此形式:

# eureka-client-1.6.2.jar!/com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient
    public RetryableEurekaHttpClient(String name,
                                     EurekaTransportConfig transportConfig,
                                     ClusterResolver clusterResolver,
                                     TransportClientFactory clientFactory,
                                     ServerStatusEvaluator serverStatusEvaluator,
                                     int numberOfRetries) {
        this.name = name;
        this.transportConfig = transportConfig;
        this.clusterResolver = clusterResolver;
        this.clientFactory = clientFactory;
        this.serverStatusEvaluator = serverStatusEvaluator;
        this.numberOfRetries = numberOfRetries;
        Monitors.registerObject(name, this);
    }
	
    @Monitor(name = METRIC_TRANSPORT_PREFIX + "quarantineSize",
            description = "number of servers quarantined", type = DataSourceType.GAUGE)
    public long getQuarantineSetSize() {
        return quarantineSet.size();
    }	


# servo-core-0.10.1.jar!/com.netflix.servo.monitor.Monitors
    public static CompositeMonitor<?> newObjectMonitor(String id, Object obj) {
        final TagList tags = getMonitorTags(obj); // 1. 反射获取类中首个有@MonitorTag字段或方法
        List<Monitor<?>> monitors = new ArrayList<Monitor<?>>();
        addMonitors(monitors, id, tags, obj); // 2. 将超类里为Monitor类型的字段,或有@Monitor注解的字段和方法也加入监控
        final Class<?> c = obj.getClass();
        final String objectId = (id == null) ? DEFAULT_ID : id;
        return new BasicCompositeMonitor(newObjectConfig(c, objectId, tags), monitors);
    }
	
    private static MonitorConfig newObjectConfig(Class<?> c, String id, TagList tags) { // 3. 创建MonitorConfig
        MonitorConfig.Builder builder = MonitorConfig.builder(id);
        builder.withTag("class", c.getSimpleName());
        if (tags != null) {
            builder.withTags(tags);
        }
        return builder.build();
    }
	
    public static void registerObject(String id, Object obj) {
        DefaultMonitorRegistry.getInstance().register(newObjectMonitor(id, obj)); // 4. 注册
    }
	
    static void addMonitors(List<Monitor<?>> monitors, String id, TagList tags, Object obj) {
        for (Class<?> c = obj.getClass(); c != null; c = c.getSuperclass()) {
            addMonitorFields(monitors, id, tags, obj, c);
            addAnnotatedFields(monitors, id, tags, obj, c);
        }
    }
	
    private static TagList getMonitorTags(Object obj) {
        try {
            Class<?> c = obj.getClass();
            Field[] fields = c.getDeclaredFields();
            for (Field field : fields) {
                final MonitorTags anno = field.getAnnotation(MonitorTags.class);
                if (anno != null) {
                    field.setAccessible(true);
                    return (TagList) field.get(obj);
                }
            }
            Method[] methods = c.getDeclaredMethods();
            for (Method method : methods) {
                final MonitorTags anno = method.getAnnotation(MonitorTags.class);
                if (anno != null) {
                    method.setAccessible(true);
                    return (TagList) method.invoke(obj);
                }
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
        return null;
    }

从上述代码可知,我们可以直接使用DefaultMonitorRegistry.getInstance().register方式来注册监控。在RemoteRegionRegistry有使用MetricsCollectingEurekaHttpClient的代码如下:

# eureka-client-1.6.2.jar!/com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient
    public static TransportClientFactory createFactory(final TransportClientFactory delegateFactory) {
        final Map<RequestType, EurekaHttpClientRequestMetrics> metricsByRequestType = initializeMetrics(); // 1. 为不同的请求类型(Register、GetApplications、SendHeartBeat)创建对应的响应计时器、连接数计数器
        final ExceptionsMetric exceptionMetrics = new ExceptionsMetric(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "exceptions");
        return new TransportClientFactory() {
            @Override
            public EurekaHttpClient newClient(EurekaEndpoint endpoint) {
                return new MetricsCollectingEurekaHttpClient( // 2. 创建EurekaHttpClient
                        delegateFactory.newClient(endpoint),
                        metricsByRequestType,
                        exceptionMetrics,
                        false
                );
            }
            @Override
            public void shutdown() {
                shutdownMetrics(metricsByRequestType);
                exceptionMetrics.shutdown();
            }
        };
    }
	
    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
        Stopwatch stopwatch = requestMetrics.latencyTimer.start(); // 3. 请求前记录开始时间
        try {
            EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate); // 4. 发出请求,接收响应
            requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); // 5. 将对应的请求状态计数器加1
            return httpResponse;
        } catch (Exception e) {
            requestMetrics.connectionErrors.increment(); // 连接错误计数加1
            exceptionsMetric.count(e); // 异常类型对应的计数加1
            throw e;
        } finally {
            stopwatch.stop(); // 6. 请求后记录结束时间
        }
    }
    private static Map<RequestType, EurekaHttpClientRequestMetrics> initializeMetrics() {
        Map<RequestType, EurekaHttpClientRequestMetrics> result = new EnumMap<>(RequestType.class);
        try {
            for (RequestType requestType : RequestType.values()) {
                result.put(requestType, new EurekaHttpClientRequestMetrics(requestType.name()));
            }
        } catch (Exception e) {
            logger.warn("Metrics initialization failure", e);
        }
        return result;
    }	
	
    static class EurekaHttpClientRequestMetrics {
        enum Status {x100, x200, x300, x400, x500, Unknown}
        private final Timer latencyTimer;
        private final Counter connectionErrors;
        private final Map<Status, Counter> countersByStatus;
        EurekaHttpClientRequestMetrics(String resourceName) {
            this.countersByStatus = createStatusCounters(resourceName);
            latencyTimer = new BasicTimer( // 1.1 创建计时器
                    MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "latency")
                            .withTag("id", resourceName)
                            .withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
                            .build(),
                    TimeUnit.MILLISECONDS
            );
            ServoUtil.register(latencyTimer); // 1.2 注册到MonitorRegistry:DefaultMonitorRegistry.getInstance().register(latencyTimer)
            this.connectionErrors = new BasicCounter(
                    MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "connectionErrors")
                            .withTag("id", resourceName)
                            .withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
                            .build()
            );
            ServoUtil.register(connectionErrors);
        }
        void shutdown() {
            ServoUtil.unregister(latencyTimer, connectionErrors);
            ServoUtil.unregister(countersByStatus.values());
        }
        private static Map<Status, Counter> createStatusCounters(String resourceName) {
            Map<Status, Counter> result = new EnumMap<>(Status.class);
            for (Status status : Status.values()) {
                BasicCounter counter = new BasicCounter(
                        MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "request")
                                .withTag("id", resourceName)
                                .withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
                                .withTag("status", status.name())
                                .build()
                );
                ServoUtil.register(counter);
                result.put(status, counter);
            }
            return result;
        }
    }

至于默认的DefaultMonitorRegistry.getInstance,代码如下:

# servo-core-0.10.1.jar!/com.netflix.servo.DefaultMonitorRegistry
    private static final MonitorRegistry INSTANCE = new DefaultMonitorRegistry(); // 1. 预加载DefaultMonitorRegistry实例
	
    public static MonitorRegistry getInstance() {
        return INSTANCE;
    }
    DefaultMonitorRegistry() {
        this(System.getProperties()); // 2. 使用系统属性配置
    }
    DefaultMonitorRegistry(Properties props) {
        final String className = props.getProperty(REGISTRY_CLASS_PROP); // 3. spring-cloud-netflix的ServoMetricsConfigBean设置为BasicMonitorRegistry,它只负责将monitors保存在内存
        final String registryName = props.getProperty(REGISTRY_NAME_PROP, DEFAULT_REGISTRY_NAME);
        if (className != null) {
            MonitorRegistry r;
            try {
                Class<?> c = Class.forName(className);
                r = (MonitorRegistry) c.newInstance();
            } catch (Throwable t) {
                LOG.error(
                        "failed to create instance of class " + className + ", "
                                + "using default class "
                                + JmxMonitorRegistry.class.getName(),
                        t);
                r = new JmxMonitorRegistry(registryName);
            }
            registry = r;
        } else {
            registry = new JmxMonitorRegistry(registryName, // 4. 默认使用JMX查看、修改
                    getObjectNameMapper(props));
        }
    }
	
    public void register(Monitor<?> monitor) {
        registry.register(monitor);
    }

那么,JMX对应的监控注册过程如下:

# servo-core-0.10.1.jar!/com.netflix.servo.jmx.JmxMonitorRegistry
    public JmxMonitorRegistry(String name) {
        this(name, ObjectNameMapper.DEFAULT);
    }
    public JmxMonitorRegistry(String name, ObjectNameMapper mapper) {
        this.name = name;
        this.mapper = mapper;
        mBeanServer = ManagementFactory.getPlatformMBeanServer();
        monitors = new ConcurrentHashMap<MonitorConfig, Monitor<?>>();
    }
	
    public void register(Monitor<?> monitor) {
        try {
            List<MonitorMBean> beans = MonitorMBean.createMBeans(name, monitor, mapper); // 1. 创建DynamicMBean:domain:name=,tag=
            for (MonitorMBean bean : beans) {
                register(bean.getObjectName(), bean);
            }
            monitors.put(monitor.getConfig(), monitor);
            updatePending.set(true);
        } catch (Exception e) {
            LOG.warn("Unable to register Monitor:" + monitor.getConfig(), e);
        }
    }
	
    private void register(ObjectName objectName, DynamicMBean mbean) throws Exception {
        synchronized (mBeanServer) {
            if (mBeanServer.isRegistered(objectName)) {
                mBeanServer.unregisterMBean(objectName);
            }
            mBeanServer.registerMBean(mbean, objectName); // 2. 注册MBean
        }
    }
  • spectator

servo已经快被淘汰,java8的应用推荐使用spectator。

spectator没有注解可用,只能通过注入Registry方式,创建Counter、Timer、DistributionSummary,然后注册Metric。

spring-cloud-netflix给我们提供了SpectatorMetricsAutoConfiguration,在此自动配置类里定义了ServoMetricsConfigBean、MonitorRegistry、Registry、ServoMonitorCache、SpectatorMetricServices、MetricReaderPublicMetrics、MetricsTagProvider。

此外,还引入了配置类MetricsInterceptorConfiguration,即MetricsHandlerInterceptor对服务请求计时、MetricsClientHttpRequestInterceptor对使用RestTemplate请求服务计时。

其中,SpectatorMetricServices在MetricRepositoryAutoConfiguration前生效,使spring-boot中定义的GaugeService和CounterService不生效。

定义的MetricReaderPublicMetrics使用ServoMetricReader,从servo的MonitorRegistry里获取Monitor,转换为Metric,通过spring-boot的MetricsEndpoint暴露出去(EndpointAutoConfiguration定义的此bean)。

在spring-boot中,通过MetricExportAutoConfiguration定义了metricWritersMetricExporter,此bean负责将MetricReader、MetricWriter、Exporter归拢,作为SchedulingConfigurer,在触发configureTasks时,为Exporter生成IntervalTask。

这个间隔任务会以包装了Exporter的MetricExporters.ExportRunner类型作为Runnable对象,最终执行时调用Exporter.export。

如果我们的应用使用了@EnableAtlas注解,便会引入AtlasConfiguration配置类,此类定义了AtlasExporter,它使用MonitorRegistryMetricPoller从MonitorRegistry获取监控信息,使用AtlasMetricObserver来推送数据到atlas。

# spring-cloud-netflix-core-1.3.6.RELEASE.jar!/org.springframework.cloud.netflix.metrics.atlas.AtlasMetricObserver
	public void update(List<Metric> rawMetrics) {
		if (!config.isEnabled()) { // 1. netflix.atlas.enabled默认为true
			logger.debug("Atlas metric observer disabled. Not sending metrics.");
			return;
		}
		if (rawMetrics.isEmpty()) {
			logger.debug("Metrics list is empty, no data being sent to server.");
			return;
		}
		List<Metric> metrics = sanitizeTags(addTypeTagsAsNecessary(rawMetrics)); // 2. 将Metric根据类型(GAUGE/RATE/NORMALIZED,COUNTER),额外添加atlas标签(atlas.dstype),然后将Tag的key、value转换成atlas的下划线风格
		for (int i = 0; i < metrics.size(); i += config.getBatchSize()) {
			List<Metric> batch = metrics.subList(i,
					Math.min(metrics.size(), config.getBatchSize() + i)); // 3. 分批
			logger.debug("Sending a metrics batch of size " + batch.size());
			sendMetricsBatch(batch);
		}
	}
	
	PublishMetricsBatchStatus sendMetricsBatch(List<Metric> metrics) {
		try {
			ByteArrayOutputStream output = new ByteArrayOutputStream();
			JsonGenerator gen = smileFactory.createGenerator(output, JsonEncoding.UTF8); // 4. 使用jackson-smile的二进制格式减少传输大小
			gen.writeStartObject();
			writeCommonTags(gen); // 5. 写commonTags,默认为空
			if (writeMetrics(gen, metrics) == 0) // 6. 写metrics
				return PublishMetricsBatchStatus.NothingToDo; // short circuit this batch if no valid/numeric metrics existed
			gen.writeEndObject();
			gen.flush();
			HttpHeaders headers = new HttpHeaders();
			headers.setContentType(MediaType.valueOf("application/x-jackson-smile"));
			HttpEntity<byte[]> entity = new HttpEntity<>(output.toByteArray(), headers);
			try {
				ResponseEntity<Map> response = restTemplate.exchange(uri, HttpMethod.POST, entity, Map.class); // 7. POST请求atlas,推送数据
				if(response.getStatusCode() == HttpStatus.ACCEPTED) {
					// partial success processing the metrics batch
					List<String> messages = (List<String>) response.getBody().get("message");
					if(messages != null) {
						for (String message : messages) {
							logger.error("Failed to write metric to atlas: " + message);
						}
					}
					return PublishMetricsBatchStatus.PartialSuccess;
				}
			}
			catch (HttpClientErrorException e) {
				logger.error("Failed to write metrics to atlas: " + e.getResponseBodyAsString());
				return PublishMetricsBatchStatus.Failure;
			}
			catch (RestClientException e) {
				logger.error("Failed to write metrics to atlas", e);
				return PublishMetricsBatchStatus.Failure;
			}
		}
		catch (IOException e) {
			return PublishMetricsBatchStatus.Failure;
		}
		
		return PublishMetricsBatchStatus.Success;
	}

相关推荐

spring利用spring.handlers解析自定义配置(spring validation 自定义)

一、问题我们在spring的xml配置文件里经常定义各种各样的配置(tx、bean、mvc、bean等等)。以及集成第三方框架时,也会看到一些spring之外的配置,例如dubbo的配置、securi...

「Spring源码分析」AOP源码解析(上篇)(spring源码深度解析(第2版))

前言前面写了六篇文章详细地分析了SpringBean加载流程,这部分完了之后就要进入一个比较困难的部分了,就是AOP的实现原理分析。为了探究AOP实现原理,首先定义几个类,一个Dao接口:1&nbs...

Spring 解析注册BeanDefinition这一篇就Over
Spring 解析注册BeanDefinition这一篇就Over

一、简介:学习过Spring框架的人一定都会听过Spring的IoC(控制反转)、DI(依赖注入)这两个概念,对于初学Spring的人来说,总觉得IoC、...

2023-03-20 14:53 xiyangw

域、模块、空间、闭包,你真的懂了吗?(模块控制域与作用域的关系)

Javascript有一个特性叫做域。尽管对于初学者来说理解域是有难度的,但我会尽力用最简单的方式让你理解域。理解域能让你的代码更优秀,减少错误,及有助于你做出更强大的模式设计。什么是域域是在运行时,...

这一次搞懂Spring自定义标签以及注解解析原理
这一次搞懂Spring自定义标签以及注解解析原理

前言在上一篇文章中分析了Spring是如何解析默认标签的,并封装为BeanDefinition注册到缓存中,这一篇就来看看对于像context这种自定义标签是如...

2023-03-20 14:53 xiyangw

前端基础进阶(七)-前端工程师最容易出错的问题-this关键字
前端基础进阶(七)-前端工程师最容易出错的问题-this关键字

我们在学习JavaScript的时候,因为对一些概念不是很清楚,但是又会通过一些简洁的方式把它给记下来,那么这样自己记下来的概念和真正的概念产生了很强的偏差.当...

2023-03-20 14:52 xiyangw

深入K8s:守护进程DaemonSet及其源码分析(k8s 进程)
深入K8s:守护进程DaemonSet及其源码分析(k8s 进程)

建议学习:膜拜!阿里内部都在强推的K8S(kubernetes)学习指南,不能再详细了最近也一直在加班,处理项目中的事情,发现问题越多越是感觉自己的能力不足,...

2023-03-20 14:52 xiyangw

Spring 是如何解析 bean 标签的?(spring beans标签)
Spring 是如何解析 bean 标签的?(spring beans标签)

前情回顾上回「SpringIoC容器初始化(2)」说到了Spring如何解析我们定义的<bean>标签,代码跟进了一层又一层,跋山涉水,...

2023-03-20 14:52 xiyangw

快速了解JavaScript文本框操作(javascript文本框代码)
快速了解JavaScript文本框操作(javascript文本框代码)

HTML中使用<input>元素表示单行输入框和<textarea>元素表示多行文本框。HTML中使用的<input&...

2023-03-20 14:51 xiyangw

荐读|30道JavaOOP面试题,可以和面试官扯皮了
荐读|30道JavaOOP面试题,可以和面试官扯皮了

面试是我们每个人都要经历的事情,大部分人且不止一次,今天给大家准备了30道JavaOOP面试题,希望能够帮助到对Java感兴趣的同学,让大家在找工作的时候能够...

2023-03-20 14:51 xiyangw

源码系列——mybatis源码刨析总结,下(mybatis源码分析)
源码系列——mybatis源码刨析总结,下(mybatis源码分析)

接上文简答题一.1.Mybatis动态sql是做什么的?1.动态sql就是根据条件标签动态的拼接sql,包括判空,循环,拼接等2.哪些动态sql?动态sql大...

2023-03-20 14:50 xiyangw

Java面试题(第二弹)(java面试题及答案整理)
Java面试题(第二弹)(java面试题及答案整理)

1.抽象类和接口的区别?接口可以被多重implements,抽象类只能被单一extends接口只有定义,抽象类可以有定义和实现接口的字段定义默认为:public...

2023-03-20 14:50 xiyangw

mybatis3 源码深度解析-动态 sql 实现原理(sql数据库基础知识)
mybatis3 源码深度解析-动态 sql 实现原理(sql数据库基础知识)

大纲动态sql使用示例SqlSource和BoundSql以及实现类LanguageDriver以及实现类SqlNode以及实现类动态sql解...

2023-03-20 14:50 xiyangw

第43节 Text、Comment及CDATASection(第43节 Text、Comment及CDATASection)
第43节 Text、Comment及CDATASection(第43节 Text、Comment及CDATASection)

本内容是《Web前端开发之Javascript视频》的课件,请配合大师哥《Javascript》视频课程学习。文本节点用Text类型表示,包含的是可以按字面解释...

2023-03-20 14:49 xiyangw

Qt读写三种文件(qt读取文件数据并赋值给变量)

第一种INI配置文件.ini文件是InitializationFile的缩写,即初始化文件。除了windows现在很多其他操作系统下面的应用软件也有.ini文件,用来配置应用软件以实现不同用户的要...

取消回复欢迎 发表评论: