RocketMQ消费幂概念与使用分析

 更新时间:2023年2月15日 09:35  点击:229 作者:每天都要进步一点点

一、什么是消费幂等

幂等:如果有一个操作,多次执行与一次执行所产生的影响是相同的,我们就称这个操作是幂等的。

基于上述的概念,结合消息消费的场景,我们能够总结出消息幂等的概念:

如果消息重试多次,消费者端对该重复消息消费多次与消费一次的结果是相同的,并且多次消费没有对系统产生副作用,那么我们就称这个过程是消息幂等的。

在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。

二、消息重复的场景分析

由于网络原因闪断,ACK返回失败等情况出现,不可避免的会发生消息重复的情况。最常见的有下面三种场景:

(1)、生产者发送消息时发生消息重复

当一条消息已被成功发送到RocketMQ的Broker中,并且Broker已经持久化到磁盘了,此时出现了网络闪断或者生产者宕机现象,导致Broker对生产者应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息,那么后续Consumer就一定会消费两次该消息。

(2)、消费者消费消息时发生消息重复

消息已投递到Consumer并完成业务处理,都会向RocketMQ Broker返回ACK确认响应,但是由于网络闪断等原因,可能导致Broker没能成功收到Consumer发送的消费成功ACK响应,此时Broker认为Consumer没能消费成功,为了保证消息至少被消费一次,Broker将在网络恢复后再次尝试投递之前已被处理过的消息,此时消费者就会收到与之前处理过的内容相同、Message ID也相同的消息。

(3)、负载均衡时发生消息重复

当Broker重启或Consumer重启、扩容或缩容时,都会触发重新负载均衡(Rebalance),此时Consumer去读取Broker中的offset可能还没及时更新,此时Consumer可能会收到曾经被消费过的消息。

可以看到,无论是发送时重复还是消费时重复,最终的效果均为消费者消费时收到了重复的消息,那么我们就知道:只需要在消费者端统一进行幂等处理就能够实现消息幂等。

三、如何实现消费幂等

由于做幂等操作不可避免要产生巨大的开销,RocketMQ 为了追求高性能,本身没有提供消费幂等的特性,它要求我们在业务上进行去重,也就是说自己在消费消息时要做到幂等性。RocketMQ 虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费重复情况,只有网络异常,Consumer 启停等异常情况下会出现消息重复。 所以消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。

前面介绍到,RocketMQ的消息有消息ID(Message ID)、消息Key(Message Key)两个属性。因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是根据业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key 进行设置:

Message message = new Message();
// 设置消息的Key
message.setKey("XXX");
mqProducer.send(message);

生产者发送消息的时候,消息已经设置了唯一的Message Key,在Consumer消费消息时,可以根据消息的Key 进行幂等处理。

// 根据业务唯一标识Key做幂等处理
mqConsumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for(MessageExt msg : msgs){
            // 获取到消息Key
            String key = msg.getKeys();
            // 伪代码如下:
            // 1. 根据消息key去redis查询是否存在的记录
            Object obj = redis.get(key);
            if (null != obj) {
                logger.info("消息重复消费了");
                // ...
            } else {
                // 2. 从数据库中查询是否存在记录
                MessageLog messageLog = messageService.getByMessageKey(key);
                if (null != messageLog) {
                     logger.info("消息重复消费了");
                     // ...
                } else {
                    // 3. 写redis、DB
                    // 业务处理
                    redis.set(xxx, xxx);
                    messageService.save(xxx);
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

这里给一个通用性的解决方案 :使用数据库 + Redis实现消息消费幂等。

(1)、Consumer消费消息时,拿到唯一的业务标识---消息Key,然后根据消息Key去Redis缓存中查询是否存在对应的记录,如果存在,则说明本次操作是重复性操作;如果缓存中不存在此Key对应的记录,则执行下一步;

(2)、根据消息Key去数据库中查询是否存在对应的记录,如果存在,则说明本次操作是重复性操作;如果不存在的话,则执行下一步;

(3)、在同一个事务中完成三项操作,保证下面三项操作同时成功,同时失败:

a、进行业务处理;

b、将消息Key通过set(key, value, expireTime)写入到Redis缓存中;

c、将消息Key作为数据库表的主键或者唯一键插入到表中;

关于第二步中再次去从数据库中校验是否存在对应的记录,其实这一步也是有必要的。由于我们一般都会在缓存使用过程中设置过期时间,如果缓存一旦过期,就可能发生缓存穿透,使请求直接渗透到数据库中,所以我们此时还是要从数据库中再次校验一下,将二者结合在一起是一个比较好的方案。

到此这篇关于RocketMQ消费幂概念与使用分析的文章就介绍到这了,更多相关RocketMQ消费幂等内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!

原文出处:https://weishihuai.blog.csdn.net/article/details/123863198

[!--infotagslink--]

相关文章

  • docker安装RocketMQ的实现步骤

    本文主要介绍了docker安装RocketMQ的实现步骤,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-11-14
  • 超详细Docker Desktop下安装rocketmq的教程

    这篇文章主要介绍了Docker Desktop下安装rocketmq,本文内容通过图文操作命令给大家讲解的非常详细,需要的朋友可以参考下...2021-10-19
  • 解决springboot集成rocketmq关于tag的坑

    这篇文章主要介绍了解决springboot集成rocketmq关于tag的坑,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-08-10
  • RocketMQ获取指定消息的实现方法(源码)

    这篇文章主要给大家介绍了关于RocketMQ获取指定消息的实现方法,文中通过示例代码介绍的非常详细,对大家学习或者使用RocketMQ具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧...2020-08-16
  • RocketMQ消息重试机制原理分析讲解

    这篇文章主要介绍了RocketMQ消息重试机制,消息的发送和消费并不是百分百成功的,在出现消息推送失败时,RocketMQ有何补偿方式来进行消息重试呢?这是我们今天要一起学习的点...2023-02-15
  • RocketMQ事务消息原理与使用详解

    RocketMQ事务消息(TransactionalMessage)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似X/OpenXA的分布式事务功能,通过事务消息能达到分布式事务的最终一致...2023-02-15
  • docker安装rocketMQ和安装过程中出现问题的解决

    本文主要介绍了docker安装rocketMQ和安装过程中出现问题的解决,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-12-27
  • SpringBoot整合RocketMQ的方法详解

    这篇文章主要为大家详细介绍了SpringBoot整合RocketMQ的方法,文中的示例代码讲解详细,对我们学习有一定帮助,感兴趣的小伙伴可以了解一下...2022-08-12
  • Springboot 整合 RocketMQ 收发消息

    这篇文章主要介绍了Springboot整合RocketMQ收发消息,需要的朋友可以参考下...2021-12-29
  • RocketMQ延迟消息简明介绍

    这篇文章主要介绍了RocketMQ延迟消息,延迟消息是个啥?顾名思义,就是等一段时间再消费的消息。文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2022-08-22
  • RocketMQ消费幂概念与使用分析

    如果有⼀个操作,多次执⾏与⼀次执⾏所产⽣的影响是相同的,我们就称这个操作是幂等的。当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费⼀次的结果是相同的,并且多次消费并未对业务系统产⽣任何负⾯影响,那么这整个过程就可实现消息幂等...2023-02-15
  • 一文彻底掌握RocketMQ 的存储模型

    这篇文章主要介绍了RocketMQ的存储模型,本文的重点在于分析BrokerServer的消息存储模型,笔者按照自己的理解,尝试分析RocketMQ的存储模型,需要的朋友可以参考下...2023-01-03