- 度量
度量指标一般可以分为计数器(Counter)、计时器(Timer)、仪表盘(Gauge)、摘要(Summary)。
度量指标在servo里用Monitor来抽象,在spectator里用Meter来抽象。
计数器单调递增,用于记录事件发生次数,比如某个页面访问数。
计时器用于记录一个事件前后时间,获得事件处理时间。
仪表盘可增可减,用于记录某个时间点的一个事件类型的数值,比如CPU使用率。
摘要用于一段时间内的采样数据,在servo中用Counter和Gauge组合实现,在spectator为DistributionSummary。
当然,在Micrometer和Prometheus对上述内容又稍有差异,此处不做深入探究。
- servo
spring-cloud-netflix为我们提供了ServoMetricsAutoConfiguration(条件是spectator不存在),引入了配置类MetricsInterceptorConfiguration。
在MetricsInterceptorConfiguration里定义了MetricsHandlerInterceptor负责记录接收HTTP请求前后的耗时,定义了MetricsClientHttpRequestInterceptor为使用RestTemplate发出的请求记录耗时。
而ServoMetricsAutoConfiguration本身还定义了其它bean,比如ServoMetricsConfigBean、MonitorRegistry、ServoMetricNaming、ServoMetricReader等。
netflix的代码中有使用@Monitor注解,然后注册的,比如DiscoveryClient使用EurekaHttpClients构造EurekaHttpClient时,作为包装的RetryableEurekaHttpClient、SessionedEurekaHttpClient都是此形式:
# eureka-client-1.6.2.jar!/com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient
public RetryableEurekaHttpClient(String name,
EurekaTransportConfig transportConfig,
ClusterResolver clusterResolver,
TransportClientFactory clientFactory,
ServerStatusEvaluator serverStatusEvaluator,
int numberOfRetries) {
this.name = name;
this.transportConfig = transportConfig;
this.clusterResolver = clusterResolver;
this.clientFactory = clientFactory;
this.serverStatusEvaluator = serverStatusEvaluator;
this.numberOfRetries = numberOfRetries;
Monitors.registerObject(name, this);
}
@Monitor(name = METRIC_TRANSPORT_PREFIX + "quarantineSize",
description = "number of servers quarantined", type = DataSourceType.GAUGE)
public long getQuarantineSetSize() {
return quarantineSet.size();
}
# servo-core-0.10.1.jar!/com.netflix.servo.monitor.Monitors
public static CompositeMonitor<?> newObjectMonitor(String id, Object obj) {
final TagList tags = getMonitorTags(obj); // 1. 反射获取类中首个有@MonitorTag字段或方法
List<Monitor<?>> monitors = new ArrayList<Monitor<?>>();
addMonitors(monitors, id, tags, obj); // 2. 将超类里为Monitor类型的字段,或有@Monitor注解的字段和方法也加入监控
final Class<?> c = obj.getClass();
final String objectId = (id == null) ? DEFAULT_ID : id;
return new BasicCompositeMonitor(newObjectConfig(c, objectId, tags), monitors);
}
private static MonitorConfig newObjectConfig(Class<?> c, String id, TagList tags) { // 3. 创建MonitorConfig
MonitorConfig.Builder builder = MonitorConfig.builder(id);
builder.withTag("class", c.getSimpleName());
if (tags != null) {
builder.withTags(tags);
}
return builder.build();
}
public static void registerObject(String id, Object obj) {
DefaultMonitorRegistry.getInstance().register(newObjectMonitor(id, obj)); // 4. 注册
}
static void addMonitors(List<Monitor<?>> monitors, String id, TagList tags, Object obj) {
for (Class<?> c = obj.getClass(); c != null; c = c.getSuperclass()) {
addMonitorFields(monitors, id, tags, obj, c);
addAnnotatedFields(monitors, id, tags, obj, c);
}
}
private static TagList getMonitorTags(Object obj) {
try {
Class<?> c = obj.getClass();
Field[] fields = c.getDeclaredFields();
for (Field field : fields) {
final MonitorTags anno = field.getAnnotation(MonitorTags.class);
if (anno != null) {
field.setAccessible(true);
return (TagList) field.get(obj);
}
}
Method[] methods = c.getDeclaredMethods();
for (Method method : methods) {
final MonitorTags anno = method.getAnnotation(MonitorTags.class);
if (anno != null) {
method.setAccessible(true);
return (TagList) method.invoke(obj);
}
}
} catch (Exception e) {
throw Throwables.propagate(e);
}
return null;
}
从上述代码可知,我们可以直接使用DefaultMonitorRegistry.getInstance().register方式来注册监控。在RemoteRegionRegistry有使用MetricsCollectingEurekaHttpClient的代码如下:
# eureka-client-1.6.2.jar!/com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient
public static TransportClientFactory createFactory(final TransportClientFactory delegateFactory) {
final Map<RequestType, EurekaHttpClientRequestMetrics> metricsByRequestType = initializeMetrics(); // 1. 为不同的请求类型(Register、GetApplications、SendHeartBeat)创建对应的响应计时器、连接数计数器
final ExceptionsMetric exceptionMetrics = new ExceptionsMetric(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "exceptions");
return new TransportClientFactory() {
@Override
public EurekaHttpClient newClient(EurekaEndpoint endpoint) {
return new MetricsCollectingEurekaHttpClient( // 2. 创建EurekaHttpClient
delegateFactory.newClient(endpoint),
metricsByRequestType,
exceptionMetrics,
false
);
}
@Override
public void shutdown() {
shutdownMetrics(metricsByRequestType);
exceptionMetrics.shutdown();
}
};
}
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
Stopwatch stopwatch = requestMetrics.latencyTimer.start(); // 3. 请求前记录开始时间
try {
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate); // 4. 发出请求,接收响应
requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); // 5. 将对应的请求状态计数器加1
return httpResponse;
} catch (Exception e) {
requestMetrics.connectionErrors.increment(); // 连接错误计数加1
exceptionsMetric.count(e); // 异常类型对应的计数加1
throw e;
} finally {
stopwatch.stop(); // 6. 请求后记录结束时间
}
}
private static Map<RequestType, EurekaHttpClientRequestMetrics> initializeMetrics() {
Map<RequestType, EurekaHttpClientRequestMetrics> result = new EnumMap<>(RequestType.class);
try {
for (RequestType requestType : RequestType.values()) {
result.put(requestType, new EurekaHttpClientRequestMetrics(requestType.name()));
}
} catch (Exception e) {
logger.warn("Metrics initialization failure", e);
}
return result;
}
static class EurekaHttpClientRequestMetrics {
enum Status {x100, x200, x300, x400, x500, Unknown}
private final Timer latencyTimer;
private final Counter connectionErrors;
private final Map<Status, Counter> countersByStatus;
EurekaHttpClientRequestMetrics(String resourceName) {
this.countersByStatus = createStatusCounters(resourceName);
latencyTimer = new BasicTimer( // 1.1 创建计时器
MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "latency")
.withTag("id", resourceName)
.withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
.build(),
TimeUnit.MILLISECONDS
);
ServoUtil.register(latencyTimer); // 1.2 注册到MonitorRegistry:DefaultMonitorRegistry.getInstance().register(latencyTimer)
this.connectionErrors = new BasicCounter(
MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "connectionErrors")
.withTag("id", resourceName)
.withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
.build()
);
ServoUtil.register(connectionErrors);
}
void shutdown() {
ServoUtil.unregister(latencyTimer, connectionErrors);
ServoUtil.unregister(countersByStatus.values());
}
private static Map<Status, Counter> createStatusCounters(String resourceName) {
Map<Status, Counter> result = new EnumMap<>(Status.class);
for (Status status : Status.values()) {
BasicCounter counter = new BasicCounter(
MonitorConfig.builder(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "request")
.withTag("id", resourceName)
.withTag("class", MetricsCollectingEurekaHttpClient.class.getSimpleName())
.withTag("status", status.name())
.build()
);
ServoUtil.register(counter);
result.put(status, counter);
}
return result;
}
}
至于默认的DefaultMonitorRegistry.getInstance,代码如下:
# servo-core-0.10.1.jar!/com.netflix.servo.DefaultMonitorRegistry
private static final MonitorRegistry INSTANCE = new DefaultMonitorRegistry(); // 1. 预加载DefaultMonitorRegistry实例
public static MonitorRegistry getInstance() {
return INSTANCE;
}
DefaultMonitorRegistry() {
this(System.getProperties()); // 2. 使用系统属性配置
}
DefaultMonitorRegistry(Properties props) {
final String className = props.getProperty(REGISTRY_CLASS_PROP); // 3. spring-cloud-netflix的ServoMetricsConfigBean设置为BasicMonitorRegistry,它只负责将monitors保存在内存
final String registryName = props.getProperty(REGISTRY_NAME_PROP, DEFAULT_REGISTRY_NAME);
if (className != null) {
MonitorRegistry r;
try {
Class<?> c = Class.forName(className);
r = (MonitorRegistry) c.newInstance();
} catch (Throwable t) {
LOG.error(
"failed to create instance of class " + className + ", "
+ "using default class "
+ JmxMonitorRegistry.class.getName(),
t);
r = new JmxMonitorRegistry(registryName);
}
registry = r;
} else {
registry = new JmxMonitorRegistry(registryName, // 4. 默认使用JMX查看、修改
getObjectNameMapper(props));
}
}
public void register(Monitor<?> monitor) {
registry.register(monitor);
}
那么,JMX对应的监控注册过程如下:
# servo-core-0.10.1.jar!/com.netflix.servo.jmx.JmxMonitorRegistry
public JmxMonitorRegistry(String name) {
this(name, ObjectNameMapper.DEFAULT);
}
public JmxMonitorRegistry(String name, ObjectNameMapper mapper) {
this.name = name;
this.mapper = mapper;
mBeanServer = ManagementFactory.getPlatformMBeanServer();
monitors = new ConcurrentHashMap<MonitorConfig, Monitor<?>>();
}
public void register(Monitor<?> monitor) {
try {
List<MonitorMBean> beans = MonitorMBean.createMBeans(name, monitor, mapper); // 1. 创建DynamicMBean:domain:name=,tag=
for (MonitorMBean bean : beans) {
register(bean.getObjectName(), bean);
}
monitors.put(monitor.getConfig(), monitor);
updatePending.set(true);
} catch (Exception e) {
LOG.warn("Unable to register Monitor:" + monitor.getConfig(), e);
}
}
private void register(ObjectName objectName, DynamicMBean mbean) throws Exception {
synchronized (mBeanServer) {
if (mBeanServer.isRegistered(objectName)) {
mBeanServer.unregisterMBean(objectName);
}
mBeanServer.registerMBean(mbean, objectName); // 2. 注册MBean
}
}
- spectator
servo已经快被淘汰,java8的应用推荐使用spectator。
spectator没有注解可用,只能通过注入Registry方式,创建Counter、Timer、DistributionSummary,然后注册Metric。
spring-cloud-netflix给我们提供了SpectatorMetricsAutoConfiguration,在此自动配置类里定义了ServoMetricsConfigBean、MonitorRegistry、Registry、ServoMonitorCache、SpectatorMetricServices、MetricReaderPublicMetrics、MetricsTagProvider。
此外,还引入了配置类MetricsInterceptorConfiguration,即MetricsHandlerInterceptor对服务请求计时、MetricsClientHttpRequestInterceptor对使用RestTemplate请求服务计时。
其中,SpectatorMetricServices在MetricRepositoryAutoConfiguration前生效,使spring-boot中定义的GaugeService和CounterService不生效。
定义的MetricReaderPublicMetrics使用ServoMetricReader,从servo的MonitorRegistry里获取Monitor,转换为Metric,通过spring-boot的MetricsEndpoint暴露出去(EndpointAutoConfiguration定义的此bean)。
在spring-boot中,通过MetricExportAutoConfiguration定义了metricWritersMetricExporter,此bean负责将MetricReader、MetricWriter、Exporter归拢,作为SchedulingConfigurer,在触发configureTasks时,为Exporter生成IntervalTask。
这个间隔任务会以包装了Exporter的MetricExporters.ExportRunner类型作为Runnable对象,最终执行时调用Exporter.export。
如果我们的应用使用了@EnableAtlas注解,便会引入AtlasConfiguration配置类,此类定义了AtlasExporter,它使用MonitorRegistryMetricPoller从MonitorRegistry获取监控信息,使用AtlasMetricObserver来推送数据到atlas。
# spring-cloud-netflix-core-1.3.6.RELEASE.jar!/org.springframework.cloud.netflix.metrics.atlas.AtlasMetricObserver
public void update(List<Metric> rawMetrics) {
if (!config.isEnabled()) { // 1. netflix.atlas.enabled默认为true
logger.debug("Atlas metric observer disabled. Not sending metrics.");
return;
}
if (rawMetrics.isEmpty()) {
logger.debug("Metrics list is empty, no data being sent to server.");
return;
}
List<Metric> metrics = sanitizeTags(addTypeTagsAsNecessary(rawMetrics)); // 2. 将Metric根据类型(GAUGE/RATE/NORMALIZED,COUNTER),额外添加atlas标签(atlas.dstype),然后将Tag的key、value转换成atlas的下划线风格
for (int i = 0; i < metrics.size(); i += config.getBatchSize()) {
List<Metric> batch = metrics.subList(i,
Math.min(metrics.size(), config.getBatchSize() + i)); // 3. 分批
logger.debug("Sending a metrics batch of size " + batch.size());
sendMetricsBatch(batch);
}
}
PublishMetricsBatchStatus sendMetricsBatch(List<Metric> metrics) {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
JsonGenerator gen = smileFactory.createGenerator(output, JsonEncoding.UTF8); // 4. 使用jackson-smile的二进制格式减少传输大小
gen.writeStartObject();
writeCommonTags(gen); // 5. 写commonTags,默认为空
if (writeMetrics(gen, metrics) == 0) // 6. 写metrics
return PublishMetricsBatchStatus.NothingToDo; // short circuit this batch if no valid/numeric metrics existed
gen.writeEndObject();
gen.flush();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.valueOf("application/x-jackson-smile"));
HttpEntity<byte[]> entity = new HttpEntity<>(output.toByteArray(), headers);
try {
ResponseEntity<Map> response = restTemplate.exchange(uri, HttpMethod.POST, entity, Map.class); // 7. POST请求atlas,推送数据
if(response.getStatusCode() == HttpStatus.ACCEPTED) {
// partial success processing the metrics batch
List<String> messages = (List<String>) response.getBody().get("message");
if(messages != null) {
for (String message : messages) {
logger.error("Failed to write metric to atlas: " + message);
}
}
return PublishMetricsBatchStatus.PartialSuccess;
}
}
catch (HttpClientErrorException e) {
logger.error("Failed to write metrics to atlas: " + e.getResponseBodyAsString());
return PublishMetricsBatchStatus.Failure;
}
catch (RestClientException e) {
logger.error("Failed to write metrics to atlas", e);
return PublishMetricsBatchStatus.Failure;
}
}
catch (IOException e) {
return PublishMetricsBatchStatus.Failure;
}
return PublishMetricsBatchStatus.Success;
}