解决kafka消息堆积及分区不均匀的问题

 更新时间:2021年9月12日 12:00  点击:2063

kafka消息堆积及分区不均匀的解决

我在环境中发现代码里面的kafka有所延迟,查看kafka消息发现堆积严重,经过检查发现是kafka消息分区不均匀造成的,消费速度过慢。这里由自己在虚拟机上演示相关问题,给大家提供相应问题的参考思路。

这篇文章有点遗憾并没重现分区不均衡的样例和Warning: Consumer group ‘testGroup1' is rebalancing. 这里仅将正确的方式展示,等后续重现了在进行补充。

主要有两个要点:

  • 1、一个消费者组只消费一个topic.
  • 2、factory.setConcurrency(concurrency);这里设置监听并发数为 部署单元节点*concurrency=分区数量

1、先在kafka消息中创建

对应分区数目的topic(testTopic2,testTopic3)testTopic1由代码创建

./kafka-topics.sh --create --zookeeper 192.168.25.128:2181 --replication-factor 1 --partitions 2 --topic testTopic2

2、添加配置文件application.properties

kafka.test.topic1=testTopic1
kafka.test.topic2=testTopic2
kafka.test.topic3=testTopic3
kafka.broker=192.168.25.128:9092
auto.commit.interval.time=60000
#kafka.test.group=customer-test
kafka.test.group1=testGroup1
kafka.test.group2=testGroup2
kafka.test.group3=testGroup3
kafka.offset=earliest
kafka.auto.commit=false

session.timeout.time=10000
kafka.concurrency=2

3、创建kafka工厂

package com.yin.customer.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

/**
 * @author yin
 * @Date 2019/11/24 15:54
 * @Method
 */
@Configuration
@Component
public class KafkaConfig {
    @Value("${kafka.broker}")
    private String broker;
    @Value("${kafka.auto.commit}")
    private String autoCommit;

   // @Value("${kafka.test.group}")
    //private String testGroup;

    @Value("${session.timeout.time}")
    private String sessionOutTime;

    @Value("${auto.commit.interval.time}")
    private String autoCommitTime;

    @Value("${kafka.offset}")
    private String offset;
    @Value("${kafka.concurrency}")
    private Integer concurrency;

   @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //监听设置两个个分区
        factory.setConcurrency(concurrency);
        //打开批量拉取数据
        factory.setBatchListener(true);
        //这里设置的是心跳时间也是拉的时间,也就说每间隔max.poll.interval.ms我们就调用一次poll,kafka默认是300s,心跳只能在poll的时候发出,如果连续两次poll的时候超过
        //max.poll.interval.ms 值就会导致rebalance
        //心跳导致GroupCoordinator以为本地consumer节点挂掉了,引发了partition在consumerGroup里的rebalance。
        // 当rebalance后,之前该consumer拥有的分区和offset信息就失效了,同时导致不断的报auto offset commit failed。
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private ConsumerFactory<String,String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        //kafka的地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        //是否自动提交 Offset
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        // enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑
        //默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,  5000);

        //这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
        //zookeeper.session.timeout.ms 默认值:6000
        //ZooKeeper的session的超时时间,如果在这段时间内没有收到ZK的心跳,则会被认为该Kafka server挂掉了。
        // 如果把这个值设置得过低可能被误认为挂掉,如果设置得过高,如果真的挂了,则需要很长时间才能被server得知。
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionOutTime);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //组与组间的消费者是没有关系的。
        //topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。
        //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);

        //当创建一个新分组的消费者时,auto.offset.reset值为latest时,
        // 表示消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。
        // https://blog.csdn.net/u012129558/article/details/80427016

        //earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
       // latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        //不是指每次都拉50条数据,而是一次最多拉50条数据()
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        return propsMap;
    }
}

4、展示kafka消费者

@Component
public class KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "${kafka.test.topic1}",groupId = "${kafka.test.group1}",containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition1(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
        logger.info("testTopic1 recevice a message size :{}" , records.size());

        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("received:{} " , record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    Thread.sleep(300);
                    logger.info("p1 topic is:{} received message={}",topic, message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = "${kafka.test.topic2}",groupId = "${kafka.test.group2}",containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition2(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
        logger.info("testTopic2 recevice a message size :{}" , records.size());

        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("received:{} " , record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    Thread.sleep(300);
                    logger.info("p2 topic :{},received message={}",topic, message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = "${kafka.test.topic3}",groupId = "${kafka.test.group3}",containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        logger.info("testTopic3 recevice a message size :{}" , records.size());

        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("received:{} " , record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    logger.info("p3 topic :{},received message={}",topic, message);
                    Thread.sleep(300);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ack.acknowledge();
        }
    }
}

查看分区消费情况:

在这里插入图片描述

kafka出现若干分区不消费的现象

近日,有用户反馈kafka有topic出现某个消费组消费的时候,有几个分区一直不消费消息,消息一直积压(图1)。除了一直积压外,还有一个现象就是消费组一直在重均衡,大约每5分钟就会重均衡一次。具体表现为消费分区的owner一直在改变(图2)。

(图1)

(图2)

定位过程

业务侧没有报错,同时kafka服务端日志也一切正常,同事先将消费组的机器滚动重启,仍然还是那几个分区没有消费,之后将这几个不消费的分区迁移至别的broker上,依然没有消费。

还有一个奇怪的地方,就是每次重均衡后,不消费的那几个分区的消费owner所在机器的网络都有流量变化。按理说不消费应该就是拉取不到分区不会有流量的。于是让运维去拉了下不消费的consumer的jstack日志。一看果然发现了问题所在。

从堆栈看,consumer已经拉取到消息,然后就一直卡在处理消息的业务逻辑上。这说明kafka是没有问题的,用户的业务逻辑有问题。

consumer在拉取完一批消息后,就一直在处理这批消息,但是这批消息中有若干条消息无法处理,而业务又没有超时操作或者异常处理导致进程一直处于消费中,无法去poll下一批数据。

又由于业务采用的是autocommit的offset提交方式,而根据源码可知,consumer只有在下一次poll中才会自动提交上次poll的offset,所以业务一直在拉取同一批消息而无法更新offset。反映的现象就是该consumer对应的分区的offset一直没有变,所以有积压的现象。

至于为什么会一直在重均衡消费组的原因也很明了了,就是因为有消费者一直卡在处理消息的业务逻辑上,超过了max.poll.interval.ms(默认5min),消费组就会将该消费者踢出消费组,从而发生重均衡。

验证

让业务方去查证业务日志,验证了积压的这几个分区,总是在循环的拉取同一批消息。

解决方法

临时解决方法就是跳过有问题的消息,将offset重置到有问题的消息之后。本质上还是要业务侧修改业务逻辑,增加超时或者异常处理机制,最好不要采用自动提交offset的方式,可以手动管理。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持猪先飞。

[!--infotagslink--]

相关文章

  • .NET Core下使用Kafka的方法步骤

    这篇文章主要介绍了.NET Core下使用Kafka的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • 深入了解如何基于Python读写Kafka

    这篇文章主要介绍了深入了解如何基于Python读写Kafka,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-05-06
  • 阿里云主机Windows 2008服务器硬盘分区和格式化图文教程

    这篇文章主要介绍了阿里云主机Windows 2008服务器硬盘分区和格式化图文教程,本文对每一个步骤都配有图文解说,一看就会呀,需要的朋友可以参考下...2016-01-27
  • mysql分区功能详解,以及实例分析

    下面小编就为大家带来一篇mysql分区功能详解,以及实例分析。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2017-04-03
  • MySQL中表分区技术详细解析

    数据库分区是一种物理数据库设计技术。虽然分区技术可以实现很多效果,但其主要目的是为了在特定的SQL操作中减少数据读写的总量以缩减sql语句的响应时间,同时对于应用来说分区完全是透明的。...2016-06-24
  • Mysql临时表及分区表区别详解

    这篇文章主要介绍了Mysql临时表及分区表区别详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-09-10
  • mysql中如何判断是否支持分区

    mysql可以通过下面语句判断是否支持分区:SHOW VARIABLES LIKE '%partition%';如果输出:have_partitioning YES表示支持分区。或者通过:SHOW PLUGINS; 显示所有插件,如果有partition ACTIVE STORAGE ENGINE GPL 插件则表...2015-10-21
  • 详解MySQL分区表

    这篇文章主要介绍了MySQL分区表的相关资料,帮助大家更好的理解和学习mysql,感兴趣的朋友可以了解下...2020-08-11
  • 什么是分表和分区 MySql数据库分区和分表方法

    这篇文章主要为大家详细介绍了MySql数据库分区和分表方法,告诉大家什么是分表和分区,mysql分表和分区有什么联系,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2017-02-19
  • 解决kafka消息堆积及分区不均匀的问题

    这篇文章主要介绍了解决kafka消息堆积及分区不均匀的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-12
  • SpringBoot+Nacos+Kafka微服务流编排的简单实现

    本文主要介绍了SpringBoot+Nacos+Kafka微服务流编排的简单实现,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-08-20
  • MySQL 5.5 range分区增加删除处理的方法示例

    这篇文章主要给大家介绍了关于MySQL 5.5 range分区增加删除处理的相关资料,文中通过示例代码介绍的非常详细,对大家具有一定的参考学习价值,需要的朋友们下面来一起看看吧。...2017-06-12
  • .Net Core 集成 Kafka的步骤

    这篇文章主要介绍了.Net Core 集成 Kafka的实现步骤,帮助大家更好的理解和学习使用.net技术,感兴趣的朋友可以了解下...2021-09-22
  • MySql分表、分库、分片和分区知识深入详解

    这篇文章主要介绍了MySql分表、分库、分片和分区知识深入详解,如果有并发场景和数据量较大的场景的可以看一下文章,对你会有或多或少的帮助...2021-03-20
  • MySQL数据库分区功能的使用教程

    这篇文章主要介绍了MySQL数据库分区功能的使用教程,文中特别讲解了MySQL分表和分区的区别以及联系,需要的朋友可以参考下...2016-05-09
  • python 实现mysql自动增删分区的方法

    这篇文章主要介绍了python 实现mysql自动增删分区的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-04-01
  • java自己手动控制kafka的offset操作

    这篇文章主要介绍了java自己手动控制kafka的offset操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-02-20
  • PostgreSQL 创建表分区

    在pg里表分区是通过表继承来实现的,一般都是建立一个主表,里面是空,然后每个分区都去继承它。...2020-07-11
  • C#判断指定分区是否是ntfs格式的方法

    这篇文章主要介绍了C#判断指定分区是否是ntfs格式的方法,涉及C#中DriveFormat属性的使用技巧,具有一定参考借鉴价值,需要的朋友可以参考下...2020-06-25
  • SpringBoot集成Kafka的步骤

    这篇文章主要介绍了SpringBoot集成Kafka的步骤,帮助大家更好的理解和使用SpringBoot,感兴趣的朋友可以了解下...2021-01-06