kafka并发写大消息异常TimeoutException排查记录

 更新时间:2022年2月28日 10:21  点击:291 作者:kl

前言

先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的原始数据。昨儿开发反馈,线上的binlog大量报错,都是kafka的异常,而且都是同一条topic抛的错,特征也很明显,发送的消息体非常大,主观判断肯定是写入大消息导致的超时了,异常详情如下:

thread:  kafka-producer-network-thread | producer-1
throwable:  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for BIN-LOG-DATA-center-dmz2-TABLE-kk-data-center-ods_contract_finance_info-0 due to 56352 ms has passed since last append

定位异常点

应用抛一个不常见的异常,一般操作是先去百度or谷歌搜索一番的,就上面这个timeout超时的异常,搜索引擎的结果都是producer连不上Borker导致的问题,根本不是我们这个场景的,所以其次我们就需要从源码中寻找答案了。博主使用的开发工具是IDEA,借助IDEA很容易定位到异常抛出点。首先定位TimeoutException异常类,然后按住ctrl键,点击这个类,会出现如下图所有抛TimeoutException异常的点,然后根据异常message内容,寻找相匹配的点击进去就是抛异常的地方了,如图,红色箭头所指即代码位置:

分析抛异常的逻辑

程序中的异常,一定是符合某些条件才会抛出的,想要解决异常,只要让运行时的环境不满足抛异常的条件即可,下面就是抛异常的代码:

boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";

        boolean expired = expiryErrorMessage != null;
        if (expired)
            abortRecordAppends();
        return expired;
    }

可以看到,我们的异常是在第一个逻辑判断时候就满足了所以抛异常了。在此处有可能会抛出三个不同的timeout异常,用中文语义翻译条件分别是:

  • 没设置重试,并且发送批次(batch.size)满了,并且配置请求超时时间(request.timeout.ms)小于【当前时间减去最后追加批次的时间】
  • 没设置重试,并且配置请求超时时间(request.timeout.ms)小于【创建批次时间减去配置的等待发送的时间(linger.ms)】
  • 设置重试,并且配置请求超时时间(request.timeout.ms)小于【当前时间-最后重试时间-重试需要等待的时间(retry.backoff.ms)】

上面括号中的参数就是kafka producer中配置的相关的参数,这些参数都没有重新设置过,batch.size默认是10kb大小,而引发报错的消息都是36kb的大小,默认的request.timeout.ms超时设置是30s,所以在这个判断可能过期了的方法中,引发我们异常的主要原因是batch.size和request.timeout.ms的参数设置问题了。

真实原因-解决方案

从上面代码看表面原因是参数设置不够了,实际上呢,博主使用kafka-test启动了五个Borker集群做复现验证测试,测试写入相同的36kb的message,在所有配置也保持默认的情况下,也根本毫无压力。后面查找相关的错误日志,发现所有的TimeoutException集中在几乎同一时刻,经查明,是因为业务批量导入了数据到mysql中,造成binlog消息突然增加,高并发的往kafka写大消息导致Borker处理不过来,造成的TimeoutException超时,所以真正解决问题也可以从两个方面入手:

  • 服务端:增加Borker,并设置多个TopicPartition,平摊写入压力,这个是根本的解决问题
  • 客户端:加大request.timeout.ms、batch.size参数,或者开启消息重试,这种方案治标不治本,但是也能大概率的减少因为此类场景导致的TimeoutException

结语

异常不可怕,所有异常都是人为抛的,都是有既定的触发条件的,只要定位到触发异常的条件对症下药即可解决问题。不过博主五年来的经验发现,日志打印真的是门艺术,在这个方面,Spring框架和Dubbo以及Apollo配置中心框架就是日志打印的典范,不管发生什么异常,日志里都会输出详细的上下文环境,异常的原因,建议的解决方法,如果涉及到相关的配置,也会打印该怎么配置最好。反观kafka client的这条TimeoutException就显的信息量有点过少了,如果能把相关的配置信息和排查的方向写明会更好。最后安利一波kafka test,轻松搭建多Borker的kafka集群,一个注解就ok了。详情参考我的这篇博文《深入研究spring boot集成kafka之spring-kafka底层原理》

以上就是kafka并发写大消息异常TimeoutException排查记录的详细内容,更多关于kafka并发异常TimeoutException排查的资料请关注猪先飞其它相关文章!

原文出处:http://www.kailing.pub/article/index/arcid/259.html

[!--infotagslink--]

相关文章

  • .NET Core下使用Kafka的方法步骤

    这篇文章主要介绍了.NET Core下使用Kafka的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • 深入了解如何基于Python读写Kafka

    这篇文章主要介绍了深入了解如何基于Python读写Kafka,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-05-06
  • 解决kafka消息堆积及分区不均匀的问题

    这篇文章主要介绍了解决kafka消息堆积及分区不均匀的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-12
  • SpringBoot+Nacos+Kafka微服务流编排的简单实现

    本文主要介绍了SpringBoot+Nacos+Kafka微服务流编排的简单实现,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-08-20
  • .Net Core 集成 Kafka的步骤

    这篇文章主要介绍了.Net Core 集成 Kafka的实现步骤,帮助大家更好的理解和学习使用.net技术,感兴趣的朋友可以了解下...2021-09-22
  • java自己手动控制kafka的offset操作

    这篇文章主要介绍了java自己手动控制kafka的offset操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-02-20
  • docker部署kafka的方法步骤

    本文主要介绍了docker部署kafka的方法步骤,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-10-21
  • Spring Boot 集成 Kafkad的实现示例

    这篇文章主要介绍了Spring Boot 集成 Kafkad的示例,帮助大家更好的理解和学习使用Spring Boot框架,感兴趣的朋友可以了解下...2021-04-09
  • SpringBoot集成Kafka的步骤

    这篇文章主要介绍了SpringBoot集成Kafka的步骤,帮助大家更好的理解和使用SpringBoot,感兴趣的朋友可以了解下...2021-01-06
  • kafka 启动报错 missingTopicsFatal is true的解决

    这篇文章主要介绍了kafka 启动报错 missingTopicsFatal is true的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-07-04
  • Java Kafka 消费积压监控的示例代码

    这篇文章主要介绍了Java Kafka 消费积压监控,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-07-01
  • springboot+kafka中@KafkaListener动态指定多个topic问题

    这篇文章主要介绍了springboot+kafka中@KafkaListener动态指定多个topic问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2022-12-29
  • SpringBoot集成Kafka 配置工具类的详细代码

    spring-kafka是基于java版的kafkaclient与spring的集成,提供了KafkaTemplate,封装了各种方法,方便操作,它封装了apache的kafka-client,不需要再导入client依赖,这篇文章主要介绍了SpringBoot集成Kafka配置工具类,需要的朋友可以参考下...2022-09-26
  • spring-kafka使消费者动态订阅新增的topic问题

    这篇文章主要介绍了spring-kafka使消费者动态订阅新增的topic问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2022-12-29
  • 深入研究spring boot集成kafka之spring-kafka底层原理

    这篇文章主要深入研究了springboot集成kafka如何实现spring-kafka的底层原理分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步...2022-02-25
  • Kafka中消息队列的两种模式讲解

    这篇文章主要介绍了Kafka中消息队列的两种模式讲解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2022-05-05
  • kafka 消息队列中点对点与发布订阅的区别说明

    这篇文章主要介绍了kafka 消息队列中点对点与发布订阅的区别说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2022-05-05
  • 原因分析IDEA导入Spring-kafka项目Gradle编译失败

    这篇文章主要为大家介绍分析了IDEA导入Spring-kafka项目Gradle中编译失败原因及解决,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步...2022-02-25
  • Kafka的监听地址配置实例详解

    这篇文章主要给大华介绍了关于Kafka监听地址配置的相关资料,文中通过实例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2022-01-23
  • kafka并发写大消息异常TimeoutException排查记录

    这篇文章主要为大家介绍了kafka并发写大消息异常TimeoutException的排查记录及解决方案,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步...2022-02-28