余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

技术干货 | 源码解析 Github 上 14.1k Star 的 RocketMQ

xiyangw 2023-05-14 11:31 10 浏览 0 评论

前言

Apache RocketMQ 作为广为人知的开源消息中间件,诞生于阿里巴巴,于 2016 年捐赠给了 Apache。从 RocketMQ 4.0 到如今最新的 v4.7.1,不论是在阿里巴巴内部还是外部社区,都赢得了广泛的关注和好评。

技术干货 | 源码解析 Github 上 14.1k Star 的 RocketMQ

本文将站在发送方视角,通过阅读 RocketMQ Producer 源码,来分析在事务消息发送中 RocketMQ 是如何工作的。

需要说明的是,本文所贴代码,均来自 4.7.1 版本的 RocketMQ 源码。本文中所讨论的发送,仅指从 Producer 发送到 Broker 的过程,并不包含 Broker 将消息投递到 Consumer 的过程。

宏观概览

RocketMQ 事务消息发送流程:

结合源码来看,RocketMQ 的事务消息 TransactionMQProducer 的 sendMessageInTransaction 方法,实际调用了 DefaultMQProducerImpl 的 sendMessageInTransaction 方法。我们进入 sendMessageInTransaction 方法,整个事务消息的发送流程清晰可见。

首先,做发送前检查,并填入必要参数,包括设 prepare 事务消息。

源码清单-1

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener(); 
        if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

进入发送处理流程:

源码清单-2

    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

根据 broker 返回的处理结果决策本地事务是否执行,半消息发送成功则开始本地事务执行:

源码清单-3

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) { 
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) { 
                    log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg); 
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:  // 当备broker状态不可用时,半消息要回滚,不执行本地事务
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

本地事务执行结束,根据本地事务状态进行二阶段处理:

源码清单-4

    try {
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    // 组装发送结果
    // ...
    return transactionSendResult;
}

接下来,我们深入每个阶段代码分析。

深扒内幕

Ⅰ阶段发送

重点分析 send 方法。进入 send 方法后,我们发现,RocketMQ 的事务消息的一阶段,使用了 SYNC 同步模式:

源码清单-5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

这一点很容易理解,毕竟事务消息是要根据一阶段发送结果来决定要不要执行本地事务的,所以一定要阻塞等待 broker 的 ack。

我们进入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 方法的实现,通过读这个方法的代码,来尝试了解在事务消息的一阶段发送过程中 producer 的行为。

值得注意的是,这个方法并非为事务消息定制,甚至不是为 SYNC 同步模式定制的,因此读懂了这段代码,基本可以对 RocketMQ 的消息发送机制有了一个较为全面的认识。

这段代码逻辑非常通畅,不忍切片。为了节省篇幅,将代码中较为繁杂但信息量不大的部分以注释代替,尽可能保留流程的完整性。个人认为较为重要或是容易被忽略的部分,以注释标出,后文还有部分细节的详细解读。

源码清单-6

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    // 一、消息有效性校验。见后文
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;

    // 获取当前topic的发送路由信息,主要是要broker,如果没找到则从namesrv获取
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 二、发送重试机制。见后文
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            // 第一次发送是mq == null, 之后都是有broker信息的
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 三、rocketmq发送消息时如何选择队列?——broker异常规避机制 
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 发送核心代码
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // rocketmq 选择 broker 时的规避机制,开启 sendLatencyFaultEnable == true 才生效
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

                    switch (communicationMode) {
                    // 四、RocketMQ的三种CommunicationMode。见后文
                        case ASYNC: // 异步模式
                            return null;
                        case ONEWAY: // 单向模式
                            return null;
                        case SYNC: // 同步模式
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    // ...
                    // 自动重试
                } catch (MQClientException e) {
                    // ...
                    // 自动重试
                } catch (MQBrokerException e) {
                   // ...
                    // 仅返回码==NOT_IN_CURRENT_UNIT==205 时自动重试
                    // 其他情况不重试,抛异常
                } catch (InterruptedException e) {
                   // ...
                    // 不重试,抛异常
                }
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }

        // 组装返回的info信息,最后以MQClientException抛出
        // ... ...

        // 超时场景抛RemotingTooMuchRequestException
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        // 填充MQClientException异常信息
        // ...
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

一、消息有效性校验

源码清单-7

 Validators.checkMessage(msg, this.defaultMQProducer);

在此方法中校验消息的有效性,包括对 topic 和消息体的校验。topic 的命名必须符合规范,且避免使用内置的系统消息 TOPIC。消息体长度 > 0 && 消息体长度 <= 102410244 = 4M 。

源码清单-8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

二、发送重试机制

Producer 在消息发送不成功时,会自动重试,最多发送次数 = retryTimesWhenSendFailed + 1 = 3 次 。

值得注意的是,并非所有异常情况都会重试,从以上源码中可以提取到的信息告诉我们,在以下三种情况下,会自动重试:

  • 发生 RemotingException,MQClientException 两种异常之一时
  • 发生 MQBrokerException 异常,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 时
  • SYNC 模式下,未发生异常且发送结果状态非 SEND_OK

在每次发送消息之前,会先检查是否在前面这两步就已经耗时超长(超时时长默认 3000ms),若是,则不再继续发送并且直接返回超时,不再重试。这里说明了 2 个问题:

  • producer 内部自动重试对业务应用而言是无感知的,应用看到的发送耗时是包含所有重试的耗时在内的;
  • 一旦超时意味着本次消息发送已经以失败告终,原因是超时。这个信息最后会以 RemotingTooMuchRequestException 的形式抛出。

这里需要指出的是,在 RocketMQ 官方文档中指出,发送超时时长是 10s,即 10000ms,网上许多人对 rocketMQ 的超时时间解读也认为是 10s。然而代码中却明明白白写着 3000ms,最终我 debug 之后确认,默认超时时间确实是 3000ms。

三、broker 的异常规避机制

源码清单-9

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  

这行代码是发送前选择 queue 的过程。

这里涉及 RocketMQ 消息发送高可用的的一个核心机制,latencyFaultTolerance。这个机制是 Producer 负载均衡的一部分,通过 sendLatencyFaultEnable 的值来控制,默认是 false 关闭状态,不启动 broker 故障延迟机制,值为 true 时启用 broker 故障延迟机制,可由 Producer 主动打开。

选择队列时,开启异常规避机制,则根据 broker 的工作状态避免选择当前状态不佳的 broker 代理,不健康的 broker 会在一段时间内被规避,不开启异常规避机制时,则按顺序选取下一个队列,但在重试场景下会尽量选择不同于上次发送 broker 的 queue。每次消息发送都会通过 updateFaultItem 方法来维护 broker 的状态信息。

源码清单-10

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        // 计算延迟多久,isolation表示是否需要隔离该broker,若是,则从30s往前找第一个比30s小的延迟值,再按下标判断规避的周期,若30s,则是10min规避;
        // 否则,按上一次发送耗时来决定规避时长;
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}  

深入到 selectOneMessageQueue 方法内部一探究竟:

源码清单-11

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        // 开启异常规避
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                // 按顺序取下一个message queue作为发送的queue
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 当前queue所在的broker可用,且与上一个queue的broker相同,
                // 或者第一次发送,则使用这个queue
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }
    // 不开启异常规避,则随机自增选择Queue
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

四、RocketMQ 的三种 CommunicationMode

源码清单-12

 public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

以上三种模式指的都是消息从发送方到达 broker 的阶段,不包含 broker 将消息投递给订阅方的过程。三种模式的发送方式的差异:

单向模式:ONEWAY。消息发送方只管发送,并不关心 broker 处理的结果如何。这种模式下,由于处理流程少,发送耗时非常小,吞吐量大,但不能保证消息可靠不丢,常用于流量巨大但不重要的消息场景,例如心跳发送等。

异步模式:ASYNC。消息发送方发送消息到 broker 后,无需等待 broker 处理,拿到的是 null 的返回值,而由一个异步的线程来做消息处理,处理完成后以回调的形式告诉发送方发送结果。异步处理时如有异常,返回发送方失败结果之前,会经过内部重试(默认 3 次,发送方不感知)。这种模式下,发送方等待时长较小,吞吐量较大,消息可靠,用于流量大但重要的消息场景。

同步模式:SYNC。消息发送方需等待 broker 处理完成并明确返回成功或失败,在消息发送方拿到消息发送失败的结果之前,也会经历过内部重试(默认 3 次,发送方不感知)这种模式下,发送方会阻塞等待消息处理结果,等待时长较长,消息可靠,用于流量不大但重要的消息场景。需要强调的是,事务消息的一阶段半事务消息的处理是同步模式。

在 sendKernelImpl 方法中也可以看到具体的实现差异。ONEWAY 模式最为简单,不做任何处理。负责发送的 sendMessage 方法参数中,相比同步模式,异步模式多了回调方法、包含 topic 发送路由元信息的 topicPublishInfo、包含发送 broker 信息的 instance、包含发送队列信息的 producer、重试次数。另外,异步模式下,会对有压缩的消息先做 copy。

源码清单-13

    switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            } 

官方文档中有这样一张图,十分清晰的描述了异步通信的详细过程:

Ⅱ 阶段发送

源码清单-3 体现了本地事务的执行,localTransactionState 将本地事务执行结果与事务消息二阶段的发送关联起来。

值得注意的是,如果一阶段的发送结果是 SLAVENOTAVAILABLE,即便 broker 不可用时,也会将 localTransactionState 置为 Rollback,此时将不会执行本地事务。之后由 endTransaction 方法负责二阶段提交,见源码清单-4。具体到 endTransaction 的实现:

源码清单-14

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 采用oneway的方式发送二阶段消息
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

在二阶段发送时,之所以用 oneway 的方式发送,个人理解这正是因为事务消息有一个特殊的可靠机制——回查。

消息回查

当 Broker 经过了一个特定的时间,发现依然没有得到事务消息的二阶段是否要提交或者回滚的确切信息,Broker 不知道 Producer 发生了什么情况(可能 producer 挂了,也可能 producer 发了 commit 但网络抖动丢了,也可能……于是主动发起回查。

事务消息的回查机制,更多的是在 broker 端的体现。RocketMQ 的 broker 以 Half 消息、Op 消息、真实消息三个不同的 topic 来将不同发送阶段的事务消息进行了隔离,使得 Consumer 只能看到最终确认 commit 需要投递出去的消息。其中详细的实现逻辑在本文中暂不多赘述,后续可另开一篇专门来从 Broker 视角来解读。

回到 Producer 的视角,当收到了 Broker 的回查请求,Producer 将根据消息检查本地事务状态,根据结果决定提交或回滚,这就要求 Producer 必须指定回查实现,以备不时之需。当然,正常情况下,并不推荐主动发送 UNKNOW 状态,这个状态毫无疑问会给 broker 带来额外回查开销,只在出现不可预知的异常情况时才启动回查机制,是一种比较合理的选择。

另外,4.7.1 版本的事务回查并非无限回查,而是最多回查 15 次:

源码清单-15

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

附录

官方给出 Producer 的默认参数如下(其中超时时长的参数,在前文中也已经提到,debug 的结果是默认 3000ms,并非 10000ms):

本文为阿里云原创内容,未经允许不得转载。

相关推荐

辞旧迎新,新手使用Containerd时的几点须知

相信大家在2020年岁末都被Kubernetes即将抛弃Docker的消息刷屏了。事实上作为接替Docker运行时的Containerd在早在Kubernetes1.7时就能直接与Kubelet集成使...

分布式日志系统ELK+skywalking分布式链路完整搭建流程

开头在分布式系统中,日志跟踪是一件很令程序员头疼的问题,在遇到生产问题时,如果是多节点需要打开多节点服务器去跟踪问题,如果下游也是多节点且调用多个服务,那就更麻烦,再者,如果没有分布式链路,在生产日志...

Linux用户和用户组管理

1、用户账户概述-AAA介绍AAA指的是Authentication、Authorization、Accounting,即认证、授权和审计。?认证:验证用户是否可以获得权限,是3A的第一步,即验证身份...

linux查看最后N条日志

其实很简单,只需要用到tail这个命令tail-100catalina.out输入以上命令,就能列出catalina.out的最后100行。...

解决linux系统日志时间错误的问题

今天发现一台虚拟机下的系统日志:/var/log/messages,文件时间戳不对,跟正常时间差了12个小时。按网上说的执行了servicersyslogrestart重启syslog服务,还是不...

全程软件测试(六十二):软件测试工作如何运用Linux—读书笔记

从事过软件测试的小伙们就会明白会使用Linux是多么重要的一件事,工作时需要用到,面试时会被问到,简历中需要写到。对于软件测试人员来说,不需要你多么熟练使用Linux所有命令,也不需要你对Linux...

Linux运维之为Nginx添加错误日志(error_log)配置

Nginx错误日志信息介绍配置记录Nginx的错误信息是调试Nginx服务的重要手段,属于核心功能模块(nginx_core_module)的参数,该参数名字为error_log,可以放在不同的虚机主...

Linux使用swatchdog实时监控日志文件的变化

1.前言本教程主要讲解在Linux系统中如何使用swatchdog实时监控日志文件的变化。swatchdog(SimpleWATCHDOG)是一个简单的Perl脚本,用于监视类Unix系统(比如...

syslog服务详解

背景:需求来自于一个客户想将服务器的日志转发到自己的日志服务器上,所以希望我们能提供这个转发的功能,同时还要满足syslog协议。1什么是syslog服务1.1syslog标准协议如下图这里的fa...

linux日志文件的管理、备份及日志服务器的搭建

日志文件存放目录:/var/log[root@xinglog]#cd/var/log[root@xinglog]#lsmessages:系统日志secure:登录日志———————————...

运维之日志管理简介

日志简介在运维过程中,日志是必不可少的东西,通过日志可以快速发现问题所在。日志分类日志分类,对不同的日志进行不同维度的分析。操作系统日志操作系统是基础,应用都是在其之上;操作系统日志的分析,可以反馈出...

Apache Log4j 爆核弹级漏洞,Spring Boot 默认日志框架就能完美躲过

这两天沸沸扬扬的Log4j2漏洞门事件炒得热火朝天:突发!ApacheLog4j2报核弹级漏洞。。赶紧修复!!|Java技术栈|Java|SpringBoot|Spring...

Linux服务器存在大量log日志,如何快速定位错误?

来源:blog.csdn.net/nan1996jiang/articlep/details/109550303针对大量log日志快速定位错误地方tail/head简单命令使用:附加针对大量log日志...

Linux中查看日志文件的正确姿势,求你别tail走天下了!

作为一个后端开发工程师,在Linux中查看查看文件内容是基本操作了。尤其是通常要分析日志文件排查问题,那么我们应该如何正确打开日志文件呢?对于我这种小菜鸡来说,第一反应就是cat,tail,vi(或...

分享几款常用的付费日志系统,献给迷茫的你!

概述在前一篇文章中,我们分享了几款免费的日志服务器。他们各有各的特点,但是大家有不同的需求,有时免费的服务器不能满足大家的需要,下面推荐几款付费的日志服务器。1.Nagios日志服务器Nagio...

取消回复欢迎 发表评论: