余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

阅读代码深入原理16——Spring Cloud Netflix之servo

xiyangw 2022-11-24 16:32 102 浏览 0 评论

  • 度量

度量指标一般可以分为计数器(Counter)、计时器(Timer)、仪表盘(Gauge)、摘要(Summary)。

度量指标在servo里用Monitor来抽象,在spectator里用Meter来抽象。

计数器单调递增,用于记录事件发生次数,比如某个页面访问数。

计时器用于记录一个事件前后时间,获得事件处理时间。

仪表盘可增可减,用于记录某个时间点的一个事件类型的数值,比如CPU使用率。

摘要用于一段时间内的采样数据,在servo中用Counter和Gauge组合实现,在spectator为DistributionSummary。

当然,在Micrometer和Prometheus对上述内容又稍有差异,此处不做深入探究。

  • servo

spring-cloud-netflix为我们提供了ServoMetricsAutoConfiguration(条件是spectator不存在),引入了配置类MetricsInterceptorConfiguration。

在MetricsInterceptorConfiguration里定义了MetricsHandlerInterceptor负责记录接收HTTP请求前后的耗时,定义了MetricsClientHttpRequestInterceptor为使用RestTemplate发出的请求记录耗时。

而ServoMetricsAutoConfiguration本身还定义了其它bean,比如ServoMetricsConfigBean、MonitorRegistry、ServoMetricNaming、ServoMetricReader等。

netflix的代码中有使用@Monitor注解,然后注册的,比如DiscoveryClient使用EurekaHttpClients构造EurekaHttpClient时,作为包装的RetryableEurekaHttpClient、SessionedEurekaHttpClient都是此形式:

# eureka-client-1.6.2.jar!/com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient
    public RetryableEurekaHttpClient(String name,
                                     EurekaTransportConfig transportConfig,
                                     ClusterResolver clusterResolver,
                                     TransportClientFactory clientFactory,
                                     ServerStatusEvaluator serverStatusEvaluator,
                                     int numberOfRetries) {
        this.name = name;
        this.transportConfig = transportConfig;
        this.clusterResolver = clusterResolver;
        this.clientFactory = clientFactory;
        this.serverStatusEvaluator = serverStatusEvaluator;
        this.numberOfRetries = numberOfRetries;
        Monitors.registerObject(name, this);
    }
	
    @Monitor(name = METRIC_TRANSPORT_PREFIX + "quarantineSize",
            description = "number of servers quarantined", type = DataSourceType.GAUGE)
    public long getQuarantineSetSize() {
        return quarantineSet.size();
    }	


# servo-core-0.10.1.jar!/com.netflix.servo.monitor.Monitors
    public static CompositeMonitor<?> newObjectMonitor(String id, Object obj) {
        final TagList tags = getMonitorTags(obj); // 1. 反射获取类中首个有@MonitorTag字段或方法
        List<Monitor<?>> monitors = new ArrayList<Monitor<?>>();
        addMonitors(monitors, id, tags, obj); // 2. 将超类里为Monitor类型的字段,或有@Monitor注解的字段和方法也加入监控
        final Class<?> c = obj.getClass();
        final String objectId = (id == null) ? DEFAULT_ID : id;
        return new BasicCompositeMonitor(newObjectConfig(c, objectId, tags), monitors);
    }
	
    private static MonitorConfig newObjectConfig(Class<?> c, String id, TagList tags) { // 3. 创建MonitorConfig
        MonitorConfig.Builder builder = MonitorConfig.builder(id);
        builder.withTag("class", c.getSimpleName());
        if (tags != null) {
            builder.withTags(tags);
        }
        return builder.build();
    }
	
    public static void registerObject(String id, Object obj) {
        DefaultMonitorRegistry.getInstance().register(newObjectMonitor(id, obj)); // 4. 注册
    }
	
    static void addMonitors(List<Monitor<?>> monitors, String id, TagList tags, Object obj) {
        for (Class<?> c = obj.getClass(); c != null; c = c.getSuperclass()) {
            addMonitorFields(monitors, id, tags, obj, c);
            addAnnotatedFields(monitors, id, tags, obj, c);
        }
    }
	
    private static TagList getMonitorTags(Object obj) {
        try {
            Class<?> c = obj.getClass();
            Field[] fields = c.getDeclaredFields();
            for (Field field : fields) {
                final MonitorTags anno = field.getAnnotation(MonitorTags.class);
                if (anno != null) {
                    field.setAccessible(true);
                    return (TagList) field.get(obj);
                }
            }
            Method[] methods = c.getDeclaredMethods();
            for (Method method : methods) {
                final MonitorTags anno = method.getAnnotation(MonitorTags.class);
                if (anno != null) {
                    method.setAccessible(true);
                    return (TagList) method.invoke(obj);
                }
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
        return null;
    }

从上述代码可知,我们可以直接使用DefaultMonitorRegistry.getInstance().register方式来注册监控。在RemoteRegionRegistry有使用MetricsCollectingEurekaHttpClient的代码如下:

# eureka-client-1.6.2.jar!/com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient
    public static TransportClientFactory createFactory(final TransportClientFactory delegateFactory) {
        final Map<RequestType, EurekaHttpClientRequestMetrics> metricsByRequestType = initializeMetrics(); // 1. 为不同的请求类型(Register、GetApplications、SendHeartBeat)创建对应的响应计时器、连接数计数器
        final ExceptionsMetric exceptionMetrics = new ExceptionsMetric(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "exceptions");
        return new TransportClientFactory() {
            @Override
            public EurekaHttpClient newClient(EurekaEndpoint endpoint) {
                return new MetricsCollectingEurekaHttpClient( // 2. 创建EurekaHttpClient
                        delegateFactory.newClient(endpoint),
                        metricsByRequestType,
                        exceptionMetrics,
                        false
                );
            }
            @Override
            public void shutdown() {
                shutdownMetrics(metricsByRequestType);
                exceptionMetrics.shutdown();
            }
        };
    }
	
    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
        Stopwatch stopwatch = requestMetrics.latencyTimer.start(); // 3. 请求前记录开始时间
        try {
            EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate); // 4. 发出请求,接收响应
            requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); // 5. 将对应的请求状态计数器加1
            return httpResponse;
        } catch (Exception e) {
            requestMetrics.connectionErrors.increment(); // 连接错误计数加1
            exceptionsMetric.count(e); // 异常类型对应的计数加1
            throw e;
        } finally {
            stopwatch.stop(); // 6. 请求后记录结束时间
        }
    }
    private static Map<RequestType, EurekaHttpClientRequestMetrics> initializeMetrics() {
        Map<RequestType, EurekaHttpClientRequestMetrics> result = new EnumMap<>(RequestType.class);
        try {
            for (RequestType requestType : RequestType.values()) {
                result.put(requestType, new EurekaHttpClientRequestMetrics(requestType.name()));
            }
        } catch (Exception e) {
            logger.warn("Metrics initialization failure", e);
        }
        return result;
    }	
	
    static class EurekaHttpClientRequestMetrics {
        enum Status {x100, x200, x300, x400, x500, Unknown}
        private final Timer latencyTimer;
        private final Counter connectionErrors;
        private final Map<Status, Counter> countersByStatus;
        EurekaHttpClientRequestMetrics(String resourceName) {
            this.countersByStatus = createStatusCounters(resourceName);
            latencyTimer = new BasicTimer( // 1.1 创建计时器
                    MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "latency")
                            .withTag("id", resourceName)
                            .withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
                            .build(),
                    TimeUnit.MILLISECONDS
            );
            ServoUtil.register(latencyTimer); // 1.2 注册到MonitorRegistry:DefaultMonitorRegistry.getInstance().register(latencyTimer)
            this.connectionErrors = new BasicCounter(
                    MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "connectionErrors")
                            .withTag("id", resourceName)
                            .withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
                            .build()
            );
            ServoUtil.register(connectionErrors);
        }
        void shutdown() {
            ServoUtil.unregister(latencyTimer, connectionErrors);
            ServoUtil.unregister(countersByStatus.values());
        }
        private static Map<Status, Counter> createStatusCounters(String resourceName) {
            Map<Status, Counter> result = new EnumMap<>(Status.class);
            for (Status status : Status.values()) {
                BasicCounter counter = new BasicCounter(
                        MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "request")
                                .withTag("id", resourceName)
                                .withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
                                .withTag("status", status.name())
                                .build()
                );
                ServoUtil.register(counter);
                result.put(status, counter);
            }
            return result;
        }
    }

至于默认的DefaultMonitorRegistry.getInstance,代码如下:

# servo-core-0.10.1.jar!/com.netflix.servo.DefaultMonitorRegistry
    private static final MonitorRegistry INSTANCE = new DefaultMonitorRegistry(); // 1. 预加载DefaultMonitorRegistry实例
	
    public static MonitorRegistry getInstance() {
        return INSTANCE;
    }
    DefaultMonitorRegistry() {
        this(System.getProperties()); // 2. 使用系统属性配置
    }
    DefaultMonitorRegistry(Properties props) {
        final String className = props.getProperty(REGISTRY_CLASS_PROP); // 3. spring-cloud-netflix的ServoMetricsConfigBean设置为BasicMonitorRegistry,它只负责将monitors保存在内存
        final String registryName = props.getProperty(REGISTRY_NAME_PROP, DEFAULT_REGISTRY_NAME);
        if (className != null) {
            MonitorRegistry r;
            try {
                Class<?> c = Class.forName(className);
                r = (MonitorRegistry) c.newInstance();
            } catch (Throwable t) {
                LOG.error(
                        "failed to create instance of class " + className + ", "
                                + "using default class "
                                + JmxMonitorRegistry.class.getName(),
                        t);
                r = new JmxMonitorRegistry(registryName);
            }
            registry = r;
        } else {
            registry = new JmxMonitorRegistry(registryName, // 4. 默认使用JMX查看、修改
                    getObjectNameMapper(props));
        }
    }
	
    public void register(Monitor<?> monitor) {
        registry.register(monitor);
    }

那么,JMX对应的监控注册过程如下:

# servo-core-0.10.1.jar!/com.netflix.servo.jmx.JmxMonitorRegistry
    public JmxMonitorRegistry(String name) {
        this(name, ObjectNameMapper.DEFAULT);
    }
    public JmxMonitorRegistry(String name, ObjectNameMapper mapper) {
        this.name = name;
        this.mapper = mapper;
        mBeanServer = ManagementFactory.getPlatformMBeanServer();
        monitors = new ConcurrentHashMap<MonitorConfig, Monitor<?>>();
    }
	
    public void register(Monitor<?> monitor) {
        try {
            List<MonitorMBean> beans = MonitorMBean.createMBeans(name, monitor, mapper); // 1. 创建DynamicMBean:domain:name=,tag=
            for (MonitorMBean bean : beans) {
                register(bean.getObjectName(), bean);
            }
            monitors.put(monitor.getConfig(), monitor);
            updatePending.set(true);
        } catch (Exception e) {
            LOG.warn("Unable to register Monitor:" + monitor.getConfig(), e);
        }
    }
	
    private void register(ObjectName objectName, DynamicMBean mbean) throws Exception {
        synchronized (mBeanServer) {
            if (mBeanServer.isRegistered(objectName)) {
                mBeanServer.unregisterMBean(objectName);
            }
            mBeanServer.registerMBean(mbean, objectName); // 2. 注册MBean
        }
    }
  • spectator

servo已经快被淘汰,java8的应用推荐使用spectator。

spectator没有注解可用,只能通过注入Registry方式,创建Counter、Timer、DistributionSummary,然后注册Metric。

spring-cloud-netflix给我们提供了SpectatorMetricsAutoConfiguration,在此自动配置类里定义了ServoMetricsConfigBean、MonitorRegistry、Registry、ServoMonitorCache、SpectatorMetricServices、MetricReaderPublicMetrics、MetricsTagProvider。

此外,还引入了配置类MetricsInterceptorConfiguration,即MetricsHandlerInterceptor对服务请求计时、MetricsClientHttpRequestInterceptor对使用RestTemplate请求服务计时。

其中,SpectatorMetricServices在MetricRepositoryAutoConfiguration前生效,使spring-boot中定义的GaugeService和CounterService不生效。

定义的MetricReaderPublicMetrics使用ServoMetricReader,从servo的MonitorRegistry里获取Monitor,转换为Metric,通过spring-boot的MetricsEndpoint暴露出去(EndpointAutoConfiguration定义的此bean)。

在spring-boot中,通过MetricExportAutoConfiguration定义了metricWritersMetricExporter,此bean负责将MetricReader、MetricWriter、Exporter归拢,作为SchedulingConfigurer,在触发configureTasks时,为Exporter生成IntervalTask。

这个间隔任务会以包装了Exporter的MetricExporters.ExportRunner类型作为Runnable对象,最终执行时调用Exporter.export。

如果我们的应用使用了@EnableAtlas注解,便会引入AtlasConfiguration配置类,此类定义了AtlasExporter,它使用MonitorRegistryMetricPoller从MonitorRegistry获取监控信息,使用AtlasMetricObserver来推送数据到atlas。

# spring-cloud-netflix-core-1.3.6.RELEASE.jar!/org.springframework.cloud.netflix.metrics.atlas.AtlasMetricObserver
	public void update(List<Metric> rawMetrics) {
		if (!config.isEnabled()) { // 1. netflix.atlas.enabled默认为true
			logger.debug("Atlas metric observer disabled. Not sending metrics.");
			return;
		}
		if (rawMetrics.isEmpty()) {
			logger.debug("Metrics list is empty, no data being sent to server.");
			return;
		}
		List<Metric> metrics = sanitizeTags(addTypeTagsAsNecessary(rawMetrics)); // 2. 将Metric根据类型(GAUGE/RATE/NORMALIZED,COUNTER),额外添加atlas标签(atlas.dstype),然后将Tag的key、value转换成atlas的下划线风格
		for (int i = 0; i < metrics.size(); i += config.getBatchSize()) {
			List<Metric> batch = metrics.subList(i,
					Math.min(metrics.size(), config.getBatchSize() + i)); // 3. 分批
			logger.debug("Sending a metrics batch of size " + batch.size());
			sendMetricsBatch(batch);
		}
	}
	
	PublishMetricsBatchStatus sendMetricsBatch(List<Metric> metrics) {
		try {
			ByteArrayOutputStream output = new ByteArrayOutputStream();
			JsonGenerator gen = smileFactory.createGenerator(output, JsonEncoding.UTF8); // 4. 使用jackson-smile的二进制格式减少传输大小
			gen.writeStartObject();
			writeCommonTags(gen); // 5. 写commonTags,默认为空
			if (writeMetrics(gen, metrics) == 0) // 6. 写metrics
				return PublishMetricsBatchStatus.NothingToDo; // short circuit this batch if no valid/numeric metrics existed
			gen.writeEndObject();
			gen.flush();
			HttpHeaders headers = new HttpHeaders();
			headers.setContentType(MediaType.valueOf("application/x-jackson-smile"));
			HttpEntity<byte[]> entity = new HttpEntity<>(output.toByteArray(), headers);
			try {
				ResponseEntity<Map> response = restTemplate.exchange(uri, HttpMethod.POST, entity, Map.class); // 7. POST请求atlas,推送数据
				if(response.getStatusCode() == HttpStatus.ACCEPTED) {
					// partial success processing the metrics batch
					List<String> messages = (List<String>) response.getBody().get("message");
					if(messages != null) {
						for (String message : messages) {
							logger.error("Failed to write metric to atlas: " + message);
						}
					}
					return PublishMetricsBatchStatus.PartialSuccess;
				}
			}
			catch (HttpClientErrorException e) {
				logger.error("Failed to write metrics to atlas: " + e.getResponseBodyAsString());
				return PublishMetricsBatchStatus.Failure;
			}
			catch (RestClientException e) {
				logger.error("Failed to write metrics to atlas", e);
				return PublishMetricsBatchStatus.Failure;
			}
		}
		catch (IOException e) {
			return PublishMetricsBatchStatus.Failure;
		}
		
		return PublishMetricsBatchStatus.Success;
	}

相关推荐

前端开发之用以处理表单的jQuery控件之AJAX请求

介绍介绍我们的TFUMS的网页模板基本上都做好了,但是大家都发现了我们的模板里面的表单是不能提交的,这是因为缺少驱动程序,这个驱动程序就是指Javascript代码。在用户填写完表单项之后,点击了提交...

AJAX with JSP使用jQuery示例_ajax和jquery先学哪个

在这里,您将获得使用jQuery的JSP的AJAX示例。AJAX用于从服务器发送和接收数据,而无需重新加载页面。我们可以使用jQuery轻松实现AJAX。它为AJAX功能提供了各种方法。我使用Ecli...

华杉科技-jQuery与AJAX基础入门到实战精通

华杉科技提供的“jQuery与AJAX基础入门到实战精通”课程是一个涵盖了jQuery和AJAX技术的全面学习路径。下面是该课程的一个大致的学习大纲,以帮助你了解你将学到什么。1.jQuery基础入...

jQuery实现Ajax功能分析「与Flask后台交互」

本文实例讲述了jQuery实现Ajax功能。分享给大家供大家参考,具体如下:jQuery是一个小型的JavaScript库,它通常被用来简化DOM和JavaScript操作。通过在服务器...

jQuery - AJAX load() 方法_jqueryajax全部用法

jQueryload()方法jQueryload()方法是简单但强大的AJAX方法。load()方法从服务器加载数据,并把返回的数据放入被选元素中。语法:$(selector).load...

原生异步请求方法ajax,及jQuery相关方法,如何采用ES6封装ajax

知识已经过时了,可以直接跳到文章末尾看ES6封装ajax。怀念曾经的jQuery一.ajax方法jQuery:JavaScript代码包装成拿过来就能实现特定功能的代码库,基本淘汰了;json:简单...

JS类库Jquery(二):优雅的使用JQuery写Ajax实现前后端完美交互

Jquery虽然属于比较老的技术,但是相较于原生的JS写起来还是反方便很多,现在流行使用VUE等开源的框架,但是这并非不妨碍咱们进行Jquery的学习,前端程序员成长的过程中Jquery是必须了解的类...

Python Web详解:(Ajax+JSON+JQuery)

JOSN:JavascriptObjectNotation作用:主要约束前后端交互数据的格式JSON的格式表示单个对象使用{}采用键值对的格式保存数据键必须使用双引号引起来相当于...

JavaScript、Ajax、jQuery全部知识点,1分钟速懂!

本文将详细解读JavaScript、ajax、jQuery是什么?他们可以实现什么?1、JavaScript定义:javaScript的简写形式就是JS,是由Netscape公司开发的一种脚本语言,一...

一文读懂Ajax与Axios、jquery之间的关系与区别

1、关系1)Ajax与jQuery:jQuery提供了对Ajax技术的封装,使得使用Ajax变得更加方便。jQuery中的Ajax方法是对原生的Ajax技术(基于XMLHttpRequest对象)进行...

Javascript应用-jQuery Ajax DOM 元素、遍历、数据操作和方法

jQuery库拥有完整的Ajax兼容套件。其中的函数和方法允许在不刷新浏览器的情况下从服务器加载数据,具体如下:函数描述jQuery.ajax()执行异步HTTP(Ajax)请求。.aja...

Jquery中ajax的使用_jquery.ajax

声明:本栏目所使用的素材都是凯哥学堂VIP学员所写,学员有权匿名,对文章有最终解释权;凯哥学堂旨在促进VIP学员互相学习的基础上公开笔记。Jquery包装的ajax操作如下:$get$post操作...

全新web前端开发教程之Jquery Ajax

1、$.ajaxjquery调用ajax方法:格式:$.ajax({});参数:type:请求方式GET/POSTurl:请求地址urlasync:是否异步,默认是true表示异步data:发送到服务...

jquery对ajax的支持_ajax是什么

...

jquery中的Ajax请求详解各个参数_jquery ajax实例

比较适合初学的人$.ajax({url:"接收数据的页面地址",data:{参数:值,参数:值........},type:'post',或者getdataType:'json',async:t...

取消回复欢迎 发表评论: