余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

开箱即用!动态修改RocketMQ线程池

xiyangw 2023-05-14 12:00 14 浏览 0 评论

前言

大家好,我是小郭,上一篇文章中提到了动态修改RocketMQ线程池,那我们如何才能够通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。

另外,从功能性以及健壮性而言还有许多值得我们思考的地方,可以通过查看线程池运行时指标、负载报警、配置日志管理来提高我们对线程池的管理。

开箱即用!动态修改RocketMQ线程池

SpringBoot RocketMQ 实践

版本信息:

rocketmq-spring-boot-starter : 2.2.2

spring-boot-starter :2.3.2.RELEASE

为了满足业务需求,我们初始化三个 Consumer。

并且设置成同一个 Topic 和不同的消费者组 ConsumerGroup,线程池数量都设置为 20。

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic", consumeThreadNumber = 20)
public class MessageConsume implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info(Thread.currentThread().getName());
        log.info("Message: {}", message);
    }
}

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic_1", consumeThreadNumber = 20)
public class MessageConsume implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info(Thread.currentThread().getName());
        log.info("Message: {}", message);
    }
}

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic_2", consumeThreadNumber = 20)
public class MessageConsume implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info(Thread.currentThread().getName());
        log.info("Message: {}", message);
    }
}
复制代码

生产者:

@GetMapping("/message/send")
public String sendMessage() {

    IntStream.range(0, 1000).forEach(i -> {
        rocketMQTemplate.convertAndSend("test-topic", new Date().toString());
    });
    return "success";
}
复制代码

执行结果:

我们可以看到三个消费者都监听到了Topic的消息,并进行了消费。但是这个时候,由于生产消息数量暴增,需要提高消费者的并行消费速度。

思考片刻,只有最常规的使用方式就是修改线程池后,重新启动消费者。但是目前生产正在跑着业务数据,重启消费者必定会造成大量数据堆积甚至是丢失。

这时候如果能有一个页面动态修改线程池数量,那一定是非常赞的!

如何进行动态修改 RocketMQ 线程数?

思路:

  1. 收集消费者端信息。
  2. 将消费者端的线程池实例注册到服务中。
  3. 定时心跳检查实例状态。
  4. 从容器中拿到对应线程池,调用原生方法进行参数修改。

流程图:

实现逻辑:

第一步,收集消费者端信息。

第二步,通过DefaultRocketMQListenerContainer类获取到Bean对象的集合,将消费者执行器保存到容器中。

@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
    Map<String, DefaultRocketMQListenerContainer> containerMap =
            ApplicationContextHolder.getBeansOfType(DefaultRocketMQListenerContainer.class);
    try {
        for (DefaultRocketMQListenerContainer container : containerMap.values()) {
            DefaultMQPushConsumer defaultMQPushConsumer = container.getConsumer();
            if (defaultMQPushConsumer != null) {
                ConsumeMessageService consumeMessageService = defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getConsumeMessageService();
                ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor");
                rocketmqConsumeExecutor.put(container.getConsumerGroup(), consumeExecutor);
            }
        }
    } catch (Exception ex) {
        log.error("Failed to get RocketMQ thread pool.", ex);
    }
}
复制代码

第三步,将消费者端实例注册到服务中。

第四步,调用接口将服务注册。

public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {
    ...
    register();
}

boolean register() {
    log.info("{}{} - registering service...", PREFIX, appPathIdentifier);
    String urlPath = BASE_PATH + "/apps/register/";
    Result registerResult;
    try {
        registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceInfo);
    } catch (Exception ex) {
        registerResult = Results.failure(ErrorCodeEnum.SERVICE_ERROR);
        log.error("{}{} - registration failed: {}", PREFIX, appPathIdentifier, ex.getMessage());
    }
    if (log.isInfoEnabled()) {
        log.info("{}{} - registration status: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ? "success" : "fail");
    }
    return registerResult.isSuccess();
}
复制代码

第五步,定时心跳检查消费者实例状态。

第六步,初始化定时任务线程池,实现心跳线程,检查实例信息逻辑。

public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {
    // ...
    initScheduledTasks();
}
// 定时任务线程池
private void initScheduledTasks() {
    scheduler.scheduleWithFixedDelay(new HeartbeatThread(), 30, 30, TimeUnit.SECONDS);
}
// 心跳线程
public class HeartbeatThread implements Runnable {

    @Override
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

private boolean renew() {
    Result renewResult;
    try {
        InstanceInfo.InstanceRenew instanceRenew = new InstanceInfo.InstanceRenew()
                .setAppName(instanceInfo.getAppName())
                .setInstanceId(instanceInfo.getInstanceId())
                .setLastDirtyTimestamp(instanceInfo.getLastDirtyTimestamp().toString())
                .setStatus(instanceInfo.getStatus().toString());
        // 检查实例信息是否正常
        renewResult = httpAgent.httpPostByDiscovery(BASE_PATH + "/apps/renew", instanceRenew);
        if (Objects.equals(ErrorCodeEnum.NOT_FOUND.getCode(), renewResult.getCode())) {
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class);
            adapterRegister.register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return renewResult.isSuccess();
    } catch (Exception ex) {
        log.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, ex);
        return false;
    }
}
// 注册实例信息
public void register() {
    Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
    List<ThreadPoolAdapterCacheConfig> threadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap);
    doRegister(threadPoolAdapterCacheConfigs);
}
复制代码

第七步,从容器中拿到对应线程池,调用原生方法进行参数修改。

最后,通过 threadPoolKey 拿到执行器,更新核心线程数和最大线程数。

@Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
    String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
    // 通过threadPoolKey拿到执行器
    ThreadPoolExecutor rocketMQConsumeExecutor = rocketmqConsumeExecutor.get(threadPoolKey);
    if (rocketMQConsumeExecutor != null) {
        int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
        int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize();
        // 更新核心线程数和最大线程数
        rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
        rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
        log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
                threadPoolKey,
                String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),
                String.format(CHANGE_DELIMITER, originalMaximumPoolSize, rocketMQConsumeExecutor.getMaximumPoolSize()));
        return true;
    }
    log.warn("[{}] RocketMQ consuming thread pool not found.", threadPoolKey);
    return false;
}
复制代码

我们在消费者端建立了三个消费者 Consumer,通过前端页面可以看到对应 RocketMQ 消费者线程池。

我们现在对第一个消费者进行参数的修改。

通过后台的日志,我们可以看到已经修改成功了。

2022-12-26 15:35:35.472  INFO 9726 --- [nio-8099-exec-2] c.h.s.s.c.ThreadPoolAdapterController    : [RocketMQ] Change third-party thread pool data. key: my-consumer_test-topic, coreSize: 100, maximumSize: 100
2022-12-26 15:35:35.473  INFO 9726 --- [nio-8099-exec-2] c.h.a.r.RocketMQThreadPoolAdapter        : [my-consumer_test-topic] RocketMQ consumption thread pool parameter change. coreSize: 20 => 100, maximumSize: 20 => 100
复制代码

我们重新进行消息的生产,看看具体的效果,我们看到线程数量已经发生了变化/

利用这种方式,我们就可以解决常规重启系统来修改参数的问题。

上面我们主要利用了Hippo4j对RocketMQ线程池数量进行了修改,为了提高应用的健壮性,我们还有可以利用Hippo4j做到实时查看线程池运行时指标、负载报警、配置日志管理等事情。

如果大家对具体的代码实现感兴趣,欢迎大家到以下两个平台进行指导~

GitHub:github.com/opengoofy/h…

Gitee:gitee.com/magegoofy/h…

什么是 Hippo4j

为了避免大家重复的造轮子,大家只要在项目中引入 Hippo4j ,就能够进行 Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。

Hippo4j 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力。

提供以下功能支持:

  • 全局管控 - 管理应用线程池实例。
  • 动态变更 - 应用运行时动态变更线程池参数,包括不限于:核心、最大线程数、阻塞队列容量、拒绝策略等。
  • 通知报警 - 内置四种报警通知策略,线程池活跃度、容量水位、拒绝策略以及任务执行时间超长。
  • 运行监控 - 实时查看线程池运行时数据,最近半小时线程池运行数据图表展示。
  • 功能扩展 - 支持线程池任务传递上下文;项目关闭时,支持等待线程池在指定时间内完成任务。
  • 多种模式 - 内置两种使用模式:依赖配置中心 和 无中间件依赖。
  • 容器管理 - Tomcat、Jetty、Undertow 容器线程池运行时查看和线程数变更。
  • 框架适配 - Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。

快速开始

对于本地演示目的,请参阅 Quick start

演示环境: console.hippo4j.cn/index.html

作者:小郭的技术笔记
链接:https://juejin.cn/post/7182017271399710778

相关推荐

辞旧迎新,新手使用Containerd时的几点须知

相信大家在2020年岁末都被Kubernetes即将抛弃Docker的消息刷屏了。事实上作为接替Docker运行时的Containerd在早在Kubernetes1.7时就能直接与Kubelet集成使...

分布式日志系统ELK+skywalking分布式链路完整搭建流程

开头在分布式系统中,日志跟踪是一件很令程序员头疼的问题,在遇到生产问题时,如果是多节点需要打开多节点服务器去跟踪问题,如果下游也是多节点且调用多个服务,那就更麻烦,再者,如果没有分布式链路,在生产日志...

Linux用户和用户组管理

1、用户账户概述-AAA介绍AAA指的是Authentication、Authorization、Accounting,即认证、授权和审计。?认证:验证用户是否可以获得权限,是3A的第一步,即验证身份...

linux查看最后N条日志

其实很简单,只需要用到tail这个命令tail-100catalina.out输入以上命令,就能列出catalina.out的最后100行。...

解决linux系统日志时间错误的问题

今天发现一台虚拟机下的系统日志:/var/log/messages,文件时间戳不对,跟正常时间差了12个小时。按网上说的执行了servicersyslogrestart重启syslog服务,还是不...

全程软件测试(六十二):软件测试工作如何运用Linux—读书笔记

从事过软件测试的小伙们就会明白会使用Linux是多么重要的一件事,工作时需要用到,面试时会被问到,简历中需要写到。对于软件测试人员来说,不需要你多么熟练使用Linux所有命令,也不需要你对Linux...

Linux运维之为Nginx添加错误日志(error_log)配置

Nginx错误日志信息介绍配置记录Nginx的错误信息是调试Nginx服务的重要手段,属于核心功能模块(nginx_core_module)的参数,该参数名字为error_log,可以放在不同的虚机主...

Linux使用swatchdog实时监控日志文件的变化

1.前言本教程主要讲解在Linux系统中如何使用swatchdog实时监控日志文件的变化。swatchdog(SimpleWATCHDOG)是一个简单的Perl脚本,用于监视类Unix系统(比如...

syslog服务详解

背景:需求来自于一个客户想将服务器的日志转发到自己的日志服务器上,所以希望我们能提供这个转发的功能,同时还要满足syslog协议。1什么是syslog服务1.1syslog标准协议如下图这里的fa...

linux日志文件的管理、备份及日志服务器的搭建

日志文件存放目录:/var/log[root@xinglog]#cd/var/log[root@xinglog]#lsmessages:系统日志secure:登录日志———————————...

运维之日志管理简介

日志简介在运维过程中,日志是必不可少的东西,通过日志可以快速发现问题所在。日志分类日志分类,对不同的日志进行不同维度的分析。操作系统日志操作系统是基础,应用都是在其之上;操作系统日志的分析,可以反馈出...

Apache Log4j 爆核弹级漏洞,Spring Boot 默认日志框架就能完美躲过

这两天沸沸扬扬的Log4j2漏洞门事件炒得热火朝天:突发!ApacheLog4j2报核弹级漏洞。。赶紧修复!!|Java技术栈|Java|SpringBoot|Spring...

Linux服务器存在大量log日志,如何快速定位错误?

来源:blog.csdn.net/nan1996jiang/articlep/details/109550303针对大量log日志快速定位错误地方tail/head简单命令使用:附加针对大量log日志...

Linux中查看日志文件的正确姿势,求你别tail走天下了!

作为一个后端开发工程师,在Linux中查看查看文件内容是基本操作了。尤其是通常要分析日志文件排查问题,那么我们应该如何正确打开日志文件呢?对于我这种小菜鸡来说,第一反应就是cat,tail,vi(或...

分享几款常用的付费日志系统,献给迷茫的你!

概述在前一篇文章中,我们分享了几款免费的日志服务器。他们各有各的特点,但是大家有不同的需求,有时免费的服务器不能满足大家的需要,下面推荐几款付费的日志服务器。1.Nagios日志服务器Nagio...

取消回复欢迎 发表评论: