Java RabbitMQ的工作队列与消息应答详解
Work Queues
工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
其实就是生产者发送大量的消息,发送到队列之后,由多个消费者(工作线程)来处理消息,并且每个消息只能被处理一次。
1. 轮询分发消息
多个工作线程按照次序每来一个消息执行一次。
1.1 抽取工具类
直接通过信息获取信道
/** * @Description RabbitMQ工具类 * @date 2022/3/5 10:02 */ public class RabbitMQUtils { public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); return connection.createChannel(); } }
1.2 编写两个工作线程
Work2和Work1代码没有区别,只需要对它做出区分即可。
public class Worker1 { // 指定队列名称 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 获取信道 Channel channel = RabbitMQUtils.getChannel(); // 声明:接收消息回调 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("工作线程01:"+ new String(message.getBody())); }; // 声明:取消消费回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("工作线程01取消接收:"+consumerTag); }; System.out.println("工作线程01启动完成......"); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } }
1.3 编写生产者
public class Producer { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,false,false,true,null); // 消息体 Scanner scanner = new Scanner(System.in); int i = 1; while (scanner.hasNext()){ String msg = scanner.next(); msg = msg + i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("发送成功:" + msg); } System.out.println("----------==========发送完毕==========----------"); } }
1.4 运行测试
先启动两个工作线程,再启动生产者。
出现404异常请参考下方1.6
生产者发送情况:
轮询状态下两个工作队列接收状态:
1.5 异常情况
在先启动两个消费者线程时,会提示404找不到队列。
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost '/', class-id=60, method-id=20)
发生这个情况的原因很显然是因为先启动了消费者,但是在RabbitMQ中没有创建相对应的队列名称,解决方法可以:
1.先启动生产者创建队列(也可以在RabbitMQ中创建队列);
2.再启动消费者就不会产生这个错误;
3.再在生产者中使用Scanner
类去发送消息测试。
2. 消息应答
消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以删除消息。其目的就是为了保护消息在被处理之前不会消失。
2.1 自动应答
这种方式发送后就被认定为已经传送成功,所以在消息接收到之前消费者的连接或者channel关闭,那么这个消息就会丢失。其特点是消费者可以传递过载的消息,对传递的消息没有限制,但如果因内存耗尽消费者线程被系统杀死,就会使得多条消息丢失。所以这个模式需要在数据安全性和吞吐量之间选择,适合使用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
所以自动应答的方式局限性很高。
2.2 手动应答
优点:可以批量应答和减少网络拥挤。
1.channel.basicAck(long deliveryTag, boolean multiple);
:肯应应答,处理完消息之后提醒RabbitMQ可以删除当前队列,deliveryTag:当前队列中选中的消息;multiple:是否批量应答。
2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
:否定应答,
3.channel.basicReject(long deliveryTag, boolean requeue)
:否定并且拒绝应答。
2.3 消息自动重新入队
如果消费者因为一些原因失去了对RabbitMQ的连接,导致没有发送ACK确认,RabbitMQ就会对该消息进行重新排队,并且分发给可以处理该消息的消费者,所以即使某个消费者死亡,也可以保证消息不会丢失。
2.4 手动应答测试
测试目的:在手动应答状态下不会发生消息丢失的情况。
测试方法:
1.创建两个消费者;
2.使用工具类使线程睡眠一定时间;
3.在睡眠时关闭线程,看能否自动重新入队。
2.4.1 生产者代码
/** * @Description 手动应答生产者 * @date 2022/3/5 19:03 */ public class Producer1 { // 指定队列名 private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:'" + msg + "'成功"); } } }
2.4.2 消费者代码
/** * @Description 手动应答消费者1 * @date 2022/3/5 19:17 */ public class Worker1 { private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); System.out.println("线程A等待接收......"); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模拟并发沉睡一秒 try { Thread.sleep(1000); System.out.println("线程A接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); /** * basicAck: * 1. 消息标记 * 2. 是否批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消费者取消消费"); }); } }
Worker2类和1区别不大,将名称改成B再将睡眠事件改成30即可。
2.4.3 测试
测试方法:
1.先启动生产者创建队列;
2.启动两个消费者接收消息;
3.因为是轮询方式,所以A线程接收之后肯定是B线程接收,在睡眠时关闭B线程,如果A线程接收到说明测试成功。
发送消息:
线程A接收:
再发送消息:
关闭线程B线程A接收到消息:
测试成功!
总结
本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注猪先飞的更多内容!
原文出处:https://blog.csdn.net/weixin_44289860/article/details/123289
相关文章
C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)
这篇文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-12-08- 这篇文章主要为大家详细介绍了C#操作RabbitMQ的完整实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-06-25
springboot+rabbitmq实现指定消费者才能消费的方法
当项目部署到测试环境后,QA测试过程中,总是“莫名其妙”的发现所保存的用户付款单数据有问题。这篇文章主要介绍了springboot+rabbitmq实现指定消费者才能消费,需要的朋友可以参考下...2021-11-01- 这篇文章主要介绍了SpringMVC和rabbitmq集成的使用案例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-01-20
- 这篇文章主要介绍了SpringBoot整合RabbitMQ实现消息确认机制,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-08-31
- 这篇文章主要为大家详细介绍了RabbitMQ .NET消息队列使用方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-09-22
运用.net core中实例讲解RabbitMQ高可用集群构建
这篇文章主要介绍了运用.net core中实例讲解RabbitMQ高可用集群构建,文中相关示例代码讲解的非常清晰,感兴趣的小伙伴可以参考一下这篇文章,相信可以帮助到你...2021-09-07- 最近在使用RabbitMQ来实现延迟任务的时候发现,这其中的知识点还是挺多的,所以下面这篇文章主要给大家介绍了关于利用RabbitMQ实现延迟任务的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下。...2021-09-22
- 这篇文章主要介绍了C#调用RabbitMQ实现消息队列的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-06-25
- RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统,下面这篇文章主要给大家介绍了关于.Net使用RabbitMQ即时发消息的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下...2021-09-22
- 这篇文章主要介绍了asp.net微信开发中有关消息应答的相关内容,需要的朋友可以参考下...2021-09-22
- 本篇文章主要介绍了C#实现rabbitmq 延迟队列功能实例代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。...2020-06-25
- RabbitMQ Management插件还提供了基于RESTful风格的HTTP API接口来方便调用。本文就主要介绍了.Net RabbitMQ实现HTTP API接口调用,感兴趣的可以了解一下...2021-09-22
- 这篇文章主要整对rabbitmq学习后封装RabbitMQ.Client的一个分享,文章最后,我会把封装组件和demo奉上,对.net平台的rabbitmq使用封装相关知识感兴趣的朋友一起看看吧...2021-09-14
- 这篇文章主要介绍了SpringBoot中使用 RabbitMQ的教程详解,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-11-16
springboot中rabbitmq实现消息可靠性机制详解
这篇文章主要介绍了springboot中rabbitmq实现消息可靠性机制详解,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下...2021-09-25- 这篇文章主要介绍了解决SpringMVC项目连接RabbitMQ出错的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-01-20
运用.NetCore实例讲解RabbitMQ死信队列,延时队列
这篇文章主要运用.NetCore实例讲解RabbitMQ死信队列,延时队列,,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2021-09-22- 这篇文章主要给大家介绍了关于RabbitMQ的配置与安装的相关资料,文中通过示例代码以及图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-06-25
- 当使用docker部署rabbitmq时遇到两个问题,访问交换机时报错,另一种是访问channel时报错,本文给大家分享解决方案,感兴趣的朋友跟随小编一起看看吧...2021-07-12