C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)
1:RabbitMQ是个啥?(专业术语参考自网络)
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库
2:使用RabbitMQ有啥好处?
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。
对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。
RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,
3:RabbitMq的安装以及环境搭建等:
网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!
3.1:运行容器的命令如下:
docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management
4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?
4.1什么时候使用MQ?
1)数据驱动的任务依赖
2)上游不关心多下游执行结果
3)异步返回执行时间长
4.2什么时候不使用MQ?
需要实时关注执行结果 (eg:同步调用)
5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)
6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:
Code:
//简单生产端 ui调用者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { //就是简单的队列,生产者 Console.WriteLine("====RabbitMqPublishDemo===="); for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}"); } Console.WriteLine("生成完毕!"); Console.ReadLine(); } } } /// <summary> /// 简单生产者 逻辑 /// </summary> /// <param name="queueName"></param> /// <param name="msg"></param> public static void PublishSampleMsg(string queueName, string msg) { using (IConnection conn = connectionFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var msgBody = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody); } } } //简单消费端 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { Console.WriteLine("====RabbitMqConsumerDemo===="); ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr => { Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}"); }); Console.ReadLine(); } } } #region 简单生产者后端逻辑 /// <summary> /// 简单消费者 /// </summary> /// <param name="queueName">队列名称</param> /// <param name="isBasicNack">失败后是否自动放到队列</param> /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param> public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, { Console.WriteLine("ConsumeSampleMsg Waiting for messages...."); IConnection conn = connectionFactory.CreateConnection(); IModel channel = conn.CreateModel(); channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { byte[] bymsg = ea.Body.ToArray(); string msg = Encoding.UTF8.GetString(bymsg); if (handleMsgStr != null) { handleMsgStr.Invoke(msg); } else { Console.WriteLine($"{DateTime.Now}->收到消息:{msg}"); } }; channel.BasicConsume(queueName, autoAck: true, consumer); } #endregion
7:Work模式
//简单生产端 ui调用者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { //就是简单的队列,生产者 Console.WriteLine("====RabbitMqPublishDemo===="); for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}"); } Console.WriteLine("生成完毕!"); Console.ReadLine(); } } } /// <summary> /// 简单生产者 逻辑 /// </summary> /// <param name="queueName"></param> /// <param name="msg"></param> public static void PublishSampleMsg(string queueName, string msg) { using (IConnection conn = connectionFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var msgBody = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody); } } } //简单消费端 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { Console.WriteLine("====RabbitMqConsumerDemo===="); ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr => { Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}"); }); Console.ReadLine(); } } } #region 简单生产者后端逻辑 /// <summary> /// 简单消费者 /// </summary> /// <param name="queueName">队列名称</param> /// <param name="isBasicNack">失败后是否自动放到队列</param> /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param> public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, { Console.WriteLine("ConsumeSampleMsg Waiting for messages...."); IConnection conn = connectionFactory.CreateConnection(); IModel channel = conn.CreateModel(); channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { byte[] bymsg = ea.Body.ToArray(); string msg = Encoding.UTF8.GetString(bymsg); if (handleMsgStr != null) { handleMsgStr.Invoke(msg); } else { Console.WriteLine($"{DateTime.Now}->收到消息:{msg}"); } }; channel.BasicConsume(queueName, autoAck: true, consumer); } #endregion
8:Fanout
Code:
//就如下的code, 多次生产,3个消费者都可以自动开始消费 //生产者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}"); } Console.WriteLine("工作队列模式 生成完毕......!"); Console.ReadLine(); } } } //生产者后端逻辑 public static void PublishWorkQueueModel(string queueName, string msg) { using (var connection = connectionFactory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(msg); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body); Console.WriteLine($"{DateTime.Now},SentMsg: {msg}"); } } //work消费端 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { Console.WriteLine("====Work模式开启了===="); ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg => { Console.WriteLine($"work模式获取到消息{msg}"); }); Console.ReadLine(); } } } //work后端逻辑 public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null) { var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages...."); consumer.Received += (sender, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); if (handserMsg != null) { if (!string.IsNullOrEmpty(message)) { handserMsg.Invoke(message); } } channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); }
9:Direct
Code:
//同一个消息会被多个订阅者消费 //发布者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { #region 发布订阅模式,带上了exchange for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}"); } Console.WriteLine("发布ok!"); #endregion Console.ReadLine(); } } } //发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout public static void PublishExchangeModel(string exchangeName, string message) { using (var connection = connectionFactory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body); Console.WriteLine($" Sent {message}"); } } //订阅者 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { #region 发布订阅模式 Exchange ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg => { Console.WriteLine($"订阅到消息:{msg}"); }); #endregion Console.ReadLine(); } } } //订阅者后端的逻辑 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null) { var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉 var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: ""); Console.WriteLine(" Waiting for msg...."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); if (handlerMsg != null) { if (!string.IsNullOrEmpty(message)) { handlerMsg.Invoke(message); } } else { Console.WriteLine($"订阅到消息:{message}"); } }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); }
到此这篇关于C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)的文章就介绍到这了,更多相关C#使用RabbitMq队列内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!
相关文章
- 我们在使用C#做项目的时候,基本上都需要制作登录界面,那么今天我们就来一步步看看,如果简单的实现登录界面呢,本文给出2个例子,由简入难,希望大家能够喜欢。...2020-06-25
- 这篇文章主要介绍了C# 字段和属性的的相关资料,文中示例代码非常详细,供大家参考和学习,感兴趣的朋友可以了解下...2020-11-03
- 这篇文章主要介绍了C#中截取字符串的的基本方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-11-03
- 本文给大家分享C#连接SQL数据库和查询数据功能的操作技巧,本文通过图文并茂的形式给大家介绍的非常详细,需要的朋友参考下吧...2021-05-17
- 这篇文章主要介绍了C#实现简单的Http请求的方法,以实例形式较为详细的分析了C#实现Http请求的具体方法,需要的朋友可以参考下...2020-06-25
- 本文主要介绍了C#中new的几种用法,具有很好的参考价值,下面跟着小编一起来看下吧...2020-06-25
使用Visual Studio2019创建C#项目(窗体应用程序、控制台应用程序、Web应用程序)
这篇文章主要介绍了使用Visual Studio2019创建C#项目(窗体应用程序、控制台应用程序、Web应用程序),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2020-06-25- 这篇文章主要介绍了C#开发Windows窗体应用程序的简单操作步骤,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-04-12
- 这篇文章主要介绍了C#从数据库读取图片并保存的方法,帮助大家更好的理解和使用c#,感兴趣的朋友可以了解下...2021-01-16
- 最近做一个小项目不可避免的需要前端脚本与后台进行交互。由于是在asp.net中实现,故问题演化成asp.net中jiavascript与后台c#如何进行交互。...2020-06-25
- 本文通过例子,讲述了C++调用C#的DLL程序的方法,作出了以下总结,下面就让我们一起来学习吧。...2020-06-25
- 这篇文章主要用实例讲解C#递归算法的概念以及用法,文中代码非常详细,帮助大家更好的参考和学习,感兴趣的朋友可以了解下...2020-06-25
- 轻松学习C#的基础入门,了解C#最基本的知识点,C#是一种简洁的,类型安全的一种完全面向对象的开发语言,是Microsoft专门基于.NET Framework平台开发的而量身定做的高级程序设计语言,需要的朋友可以参考下...2020-06-25
- 本文主要介绍了C#变量命名规则小结,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-09-09
- 这篇文章主要介绍了C#绘制曲线图的方法,以完整实例形式较为详细的分析了C#进行曲线绘制的具体步骤与相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下...2020-06-25
- 本文主要介绍了C# 中取绝对值的函数。具有很好的参考价值。下面跟着小编一起来看下吧...2020-06-25
- 这篇文章主要介绍了c#自带缓存使用方法,包括获取数据缓存、设置数据缓存、移除指定数据缓存等方法,需要的朋友可以参考下...2020-06-25
- 这篇文章主要介绍了c#中(&&,||)与(&,|)的区别详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-06-25
- 下面小编就为大家带来一篇C#学习笔记- 随机函数Random()的用法详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2020-06-25
- 这篇文章主要介绍了C#中list用法,结合实例形式分析了C#中list排序、运算、转换等常见操作技巧,具有一定参考借鉴价值,需要的朋友可以参考下...2020-06-25