springboot+rabbitmq实现指定消费者才能消费的方法

 更新时间:2021年11月1日 16:00  点击:2505 作者:buguge

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费?

程序里有一个应用场景使用到了rabbitmq——当财务确认收到企业的打款金额后,系统会把企业订单生成用户付款单。由于订单记录数据量大,改为通过mq来异步实现。即财务确认收款操作后,将企业订单数据放入mq,另一端监听mq消息队列,将收到的企业订单加工转换成用户付款单,并做持久化。

本地开发环境与测试环境共用一套rabbitmq。当项目部署到测试环境后,QA测试过程中,总是“莫名其妙”的发现所保存的用户付款单数据有问题。

当然,首先要排查程序,检查Consumer的数据处理的逻辑是否有bug。单元测试后,发现并不存在测试环境的bug。

原来,消息队列被“非正常”消费了!

Q: 什么情况?

A: 几个伙伴一起参与的项目,大家总是要调试自己的程序的。而如果碰巧本地程序监听到消息队列里有消息,那么,消息就被本地程序消费掉了。问题正是出现在这里!————团队开发,大家并不会及时检出git上最新的程序版本。如果本地的程序版本不是最新的正确的版本,势必会出现bug。

那么,怎么办?

每次你改了逻辑,告诉大家获取最新?

不现实,约定的东西往往不奏效的。

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费? 或者说,如何实现消息的定向消费呢?

只要肯琢磨,办法总比困难多!百思可得解!

我们知道,rabbitmq手动ack模式。这还不够,因为我们怎么让consumer来决定是否消费呢? 所以,我们需要一个标识————producer设定一个标识,consumer如果匹配这个标识,则消费,否则予以reject放回消息队列。

通过查看spring-rabbit/spring-amqp的代码,发现可以在spring-amqp里的MessageProperties上做文章。生产者与消费者每次消息传输都会携带一个MessageProperties,通常我们是不指定的,走MessageProperties的默认设置值。

我的策略:MessageProperties有一个属性叫AppId。我们程序所部署的测试机器就一台,即消息Producer和消息Consumer在一台机器上。那么,我就可以利用机器的IP来识别消息。只有Producer与Consumer的IP匹配,才消费消息。程序员本机IP与测试服务器IP不一样,就会拒绝接收消息,会把消息重新放回消息队列,等待测试服务器的Consumer消费。

话不多说,上代码吧,

生产者代码:

package com.sboot.mq;

import org.junit.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetAddress;
import java.util.UUID;

public class MQProducerTest extends BaseTest {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void test() throws Exception {
        for (int i = 1; i <= 5; i++) {
            MessageProperties messageProperties = new MessageProperties();
            String ip = InetAddress.getLocalHost().getHostAddress();
            messageProperties.setAppId(ip);
//            messageProperties.setUserId(String.valueOf(i));
            MessageConverter messageConverter = new SimpleMessageConverter();
            String msg = UUID.randomUUID().toString();
//            System.out.println(msg);
            Message message1 = messageConverter.toMessage(msg, messageProperties);
            rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);
            System.out.println("入队完成");
            Thread.sleep(500L);
        }
    }
}

消费者手动ACK,要实现ChannelAwareMessageListener接口,感知rabbitmq.client.Channel实例,调用channel的basicAck、basicReject等方法:

package com.sboot.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.net.InetAddress;

@Component
@Profile(value = "dev")
@Slf4j
public class UserSettlementDevConsumer implements ChannelAwareMessageListener {

    @RabbitHandler
    @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());

        long tag = message.getMessageProperties().getDeliveryTag();
        String appId = message.getMessageProperties().getAppId();
        log.info("{}-{}, 消息出队", tag, appId);
        String receiveMsg = "";
        try {
            //核对标识,决定是否消费消息
            String ip = InetAddress.getLocalHost().getHostAddress();
            if (!ip.equals(appId)) {
                log.info("这不是我需要的消息。放回队列。{}", receiveMsg);
//                channel.basicNack(tag, false, true);
                channel.basicReject(tag, true);
//                channel.basicRecover(true);
                return;
            }

            MessageConverter messageConverter = new SimpleMessageConverter();
            receiveMsg = String.valueOf(messageConverter.fromMessage(message));
            。。。。在这里消费消息
            log.info("success " + receiveMsg);
            channel.basicAck(tag, false);

        } catch (Exception e) {
            log.error("receive message has an error, ", e);
            channel.basicNack(tag, false, true);
        }
    }

}

说明一下依赖的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解没有ackMode。

解决本案问题过程中的花絮:

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

@RabbitListener的ackMode的值见枚举org.springframework.amqp.core.AcknowledgeMode

NONE-- no acks(自动消费 autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手动消费,Consumer端必须显式调用ack或nack)AUTO --

设置了手动消费,上文消费端的deliveryTag会是不同的long值。自动消费的deliveryTag是重复的1和2这样的。并且,自动消费时,如果要使用channel的ack/nack,会报异常:

2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)

到此这篇关于springboot+rabbitmq实现指定消费者才能消费的文章就介绍到这了,更多相关springboot rabbitmq消费内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!

原文出处:https://www.cnblogs.com/buguge/p/13183980.html

[!--infotagslink--]

相关文章

  • 解决springboot使用logback日志出现LOG_PATH_IS_UNDEFINED文件夹的问题

    这篇文章主要介绍了解决springboot使用logback日志出现LOG_PATH_IS_UNDEFINED文件夹的问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-04-28
  • SpringBoot实现excel文件生成和下载

    这篇文章主要为大家详细介绍了SpringBoot实现excel文件生成和下载,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-02-09
  • 详解springBoot启动时找不到或无法加载主类解决办法

    这篇文章主要介绍了详解springBoot启动时找不到或无法加载主类解决办法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-09-16
  • SpringBoot集成Redis实现消息队列的方法

    这篇文章主要介绍了SpringBoot集成Redis实现消息队列的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-10
  • 解决Springboot get请求是参数过长的情况

    这篇文章主要介绍了解决Springboot get请求是参数过长的情况,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-09-17
  • Spring Boot项目@RestController使用重定向redirect方式

    这篇文章主要介绍了Spring Boot项目@RestController使用重定向redirect方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-02
  • Springboot+TCP监听服务器搭建过程图解

    这篇文章主要介绍了Springboot+TCP监听服务器搭建过程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-10-28
  • springBoot 项目排除数据库启动方式

    这篇文章主要介绍了springBoot 项目排除数据库启动方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-10
  • 详解SpringBoot之访问静态资源(webapp...)

    这篇文章主要介绍了详解SpringBoot之访问静态资源(webapp...),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-09-14
  • SpringBoot接口接收json参数解析

    这篇文章主要介绍了SpringBoot接口接收json参数解析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-10-19
  • springboot中使用@Transactional注解事物不生效的坑

    这篇文章主要介绍了springboot中使用@Transactional注解事物不生效的原因,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-01-26
  • springboot多模块包扫描问题的解决方法

    这篇文章主要介绍了springboot多模块包扫描问题的解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-09-16
  • C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

    这篇文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-12-08
  • C#操作RabbitMQ的完整实例

    这篇文章主要为大家详细介绍了C#操作RabbitMQ的完整实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-06-25
  • Springboot实现多线程注入bean的工具类操作

    这篇文章主要介绍了Springboot实现多线程注入bean的工具类操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-08-27
  • Springboot mybatis plus druid多数据源解决方案 dynamic-datasource的使用详解

    这篇文章主要介绍了Springboot mybatis plus druid多数据源解决方案 dynamic-datasource的使用,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-11-18
  • Springboot+MDC+traceId日志中打印唯一traceId

    本文主要介绍了Springboot+MDC+traceId日志中打印唯一traceId,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-10-17
  • SpringBoot部署到Linux读取resources下的文件及遇到的坑

    本文主要给大家介绍SpringBoot部署到Linux读取resources下的文件,在平时业务开发过程中,很多朋友在获取到文件内容乱码或者文件读取不到的问题,今天给大家分享小编遇到的坑及处理方案,感兴趣的朋友跟随小编一起看看吧...2021-06-21
  • 关于springboot中nacos动态路由的配置

    这篇文章主要介绍了springboot中nacos动态路由的配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-11
  • SpringBoot高版本修改为低版本时测试类报错的解决方案

    这篇文章主要介绍了SpringBoot高版本修改为低版本时测试类报错的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-18