RocketMQ消息重试机制原理分析讲解

 更新时间:2023年2月15日 09:35  点击:702 作者:每天都要进步一点点

一、概述

由于网络抖动、服务宕机等一些不确定的因素,RocketMQ在发送消息的时候很有可能出现消息发送或者消费失败的问题。

Consumer消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
  • 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

如果没有消息重试机制,就可能产生消息丢失的问题,这样就会对系统产生较大的影响。RocketMQ内部封装了消息重试的处理流程,无需开发人员手动处理,并且支持了生产端、消费端两端的重试机制。

二、生产端的消息重试

生产端的消息重试是指:Producer往Broker上发消息没有发送成功,比如网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

生产者端的消息重试配置比较简单,只需要在定义生产者的时候,调用producer.setRetryTimesWhenSendFailed(xxx)方法设置消息发送失败的最大重试次数。如下:

// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

三、消费端的消息重试

同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。

(1)、顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

(2)、无序消息的重试

对于无序消息(普通、延时、事务消息),当消费者消费消息失败时,可以通过设置返回状态达到消息重试的结果。

需要注意的是:无序消息的重试只会针对集群消费方式(MessageModel.CLUSTERING)生效;广播方式不提供失败重试特性,即消费失败后,失败的消息不再重试,继续消费新的消息。

四、消息重试次数

RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110 秒97 分钟
230 秒108 分钟
31 分钟119 分钟
42 分钟1210 分钟
53 分钟1320 分钟
64 分钟1430 分钟
75 分钟151 小时
86 分钟162 小时

如果消息重试 16 次后仍然失败,消息将不再投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。所以就需要我们消费者端做好消费幂等操作。

五、消息重试配置

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(下述三种方式任选一种):

  • 返回 Action.ReconsumeLater (推荐);
  • 返回 Null;
  • 抛出异常;

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //处理消息
        //.....
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("消费消息发生异常");
    }
}

集群消费方式下,如果希望消息失败后,不进行消息重试,那么我们可以捕获消费逻辑中可能抛出的异常,然后返回Action.CommitMessage,那么这条消息将不会再重试。如下:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            // 消费消息....
        } catch (Throwable e) {
            // 捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        // 消息处理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

当然,RocketMQ也允许Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于 16 次,则重试时间间隔如目录四:消息重试次数的描述;
  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时;

Properties properties = new Properties();
//  配置对应 Group ID的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
Consumer consumer =ONSFactory.createConsumer(properties);

注意:

  • 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效;
  • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效;
  • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置;

六、消息重试原理

RocketMQ会为每个消费者组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。

考虑到异常恢复需要一些时间,RocketMQ会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

到此这篇关于RocketMQ消息重试机制原理分析讲解的文章就介绍到这了,更多相关RocketMQ消息重试内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!

原文出处:https://weishihuai.blog.csdn.net/article/details/124496524

[!--infotagslink--]

相关文章

  • C#微信开发之发送模板消息

    这篇文章主要为大家详细介绍了C#微信开发之发送模板消息的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-06-25
  • WM_CLOSE、WM_DESTROY、WM_QUIT及各种消息投递函数详解

    这篇文章主要介绍了WM_CLOSE、WM_DESTROY、WM_QUIT及各种消息投递函数,有助于读者更好的理解windows程序的消息机制,需要的朋友可以参考下...2020-04-25
  • PHP的运行机制与原理(底层)

    说到php的运行机制还要先给大家介绍php的模块,PHP总共有三个模块:内核、Zend引擎、以及扩展层;PHP内核用来处理请求、文件流、错误处理等相关操作;Zend引擎(ZE)用以将源文件转换成机器语言,然后在虚拟机上运行它;扩展层是一组...2015-11-24
  • 简单用VBS调用企业微信机器人发定时消息的方法

    这篇文章主要介绍了简单用VBS调用企业微信机器人发定时消息的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-12-08
  • python实现企业微信定时发送文本消息的实例代码

    这篇文章主要介绍了python实现企业微信定时发送文本消息的实例代码,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-11-25
  • jquery插件jquery.confirm弹出确认消息

    这篇文章介绍了插件jquery.confirm弹出确认消息的实现方法,感兴趣的小伙伴们可以参考一下...2015-12-24
  • 详解C语言进程同步机制

    这篇文章主要介绍了详解C语言进程同步机制的的相关资料,文中代码非常详细,帮助大家更好的理解和学习,感兴趣的朋友可以了解下...2020-06-18
  • 通过实例了解Python异常处理机制底层实现

    这篇文章主要介绍了通过实例了解Python异常处理机制底层实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-07-24
  • jsp 自动编译机制详细介绍

    这篇文章主要介绍了 Jasper的自动检测实现的机制比较简单,依靠某后台线程不断检测JSP文件与编译后的class文件的最后修改时间是否相同,若相同则认为没有改动,但倘若不同则需要重新编译,需要的朋友可以参考下...2016-12-02
  • Windows窗口消息实例详解

    这篇文章主要介绍了Windows窗口消息,以实例形式详细罗列了Windows窗口消息,非常具有实用价值,需要的朋友可以参考下...2020-04-25
  • 理解js回收机制通俗易懂版

    这篇文章主要帮助大家更好的理解js回收机制,通俗易懂,便于大家理解,感兴趣的小伙伴们可以参考一下...2016-03-01
  • 帝国CMS会员注册字段增加编辑器、发送短消息改为编辑框

    通过本教程可以实现帝国CMS后台给前台注册用户发消息,把内容输入框改为编辑器,可上传图片,等打开文件\e\admin\member\SendMsg.php 大约84行<textarea name="msgtext" cols="6...2016-01-27
  • c#如何用好垃圾回收机制GC

    这篇文章主要介绍了c# 如何用好垃圾回收机制GC,帮助大家更好的理解和学习c#,感兴趣的朋友可以了解下...2020-11-03
  • 跟我学Node.js(四)---Node.js的模块载入方式与机制

    其它的如通过NPM安装的第三方模块(third-party modules)或本地模块(local modules),每个模块都会暴露一个公开的API。以便开发者可以导入。如复制代码 代码如下:var mod = require('module_name')此句执行后,Node内部会载入...2014-06-07
  • 详解JAVA 反射机制

    这篇文章主要介绍了JAVA 反射机制的相关知识,文中讲解的非常细致,代码帮助大家更好的理解学习,感兴趣的朋友可以了解下...2020-06-17
  • 微信自动回复消息

    在微信回复信息,POST的XML数据如下图所示 ToUserName 为接收者名称 FromUserName 为发送者名称 被动回复,响应的信息如下: 这里要注意,ToUserName和FromU...2016-05-19
  • Go语言使用钉钉机器人推送消息的实现示例

    本文主要介绍了Go语言使用钉钉机器人推送消息的实现示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-09-23
  • 详解CocosCreator项目结构机制

    这篇文章主要介绍了详解CocosCreator项目结构机制,只有了解这些机制后,才能更好的进行项目开发,避免潜在错误,并且快速的除错...2021-04-15
  • .NET Core中本地化机制的深入讲解

    这篇文章主要给大家介绍了关于.NET Core中本地化机制的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • Delphi实现获取句柄并发送消息的方法

    这篇文章主要介绍了Delphi实现获取句柄并发送消息的方法,需要的朋友可以参考下...2020-06-30