如何用RabbitMQ和Swoole实现一个异步任务系统

 更新时间:2021年5月30日 22:19  点击:1406

系统介绍

从图中可以看到,我们这个系统是一个基于事件的异步任务系统。就是说当一个事件产生时,生产者将事件抛给调度器,调度器负责查询事件下有哪些任务,然后将这些任务丢到相应的队列中,最后由消费者消费任务队列中的任务。

在整个系统中主要分为三大部分

1.事件生产者,即产生消息事件的一方。

2.任务调度器(Scheduler),负责注册事件并调度任务。

3.消费者(Worker),负责消费任务队列中的任务。

事件生产者

事件生产者很简单,在业务系统中直接调用即可,代码如下。

<?php
 
require_once DIR.'/../autoload.php';
 
use Asynclib\Ebats\Event;
 
try{
 
    $event = new Event('order_paied');  //定义事件
 
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数
 
    $event->publish();
 
}catch (Exception $exc){
 
    echo $exc->getMessage();
 
}

任务调度器

调度器主要做两件事,一是注册事件,另一个是调度任务。

注册事件代码如下:

//注册事件
 
EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务)
 
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货

这样就注册了两个事件,事件下各有一个任务。

具体调度部分代码很简单,就不多赘述,有兴趣的可以去看代码。

消费者

重头戏来了,一个异步任务系统最重要的就是消费端了,现在让我们来看下Worker的流程图。

可以看到,在这里我们采用了两个交换器和两个队列,一个负责处理正常的任务即ntask,另一个负责处理需要延迟执行的任务即dtask。简单描述下一个任务的生命周期。

正常任务

1、task产生,进入正常任务的交换器Exchange[ebats_core_ntask]

2、交换器根据topic将任务分发到对应的队列中

3、子进程ntask阻塞等待成功获取到task,并执行该任务

4、执行失败,需要重试时抛出RetryException,不需要重试时抛出TaskException

5、子进程ntask捕获到重试异常将任务抛给延迟任务的交换器Exchange[ebats_core_dtask]

6、将任务执行信息回调给上层开发者以便保存查看

延迟任务

1、子进程dtask阻塞等待成功获取到task,并执行该任务
2、执行失败,需要重试时抛出RetryException,不需要重试时抛出TaskException
3、子进程dtask捕获到重试异常将任务抛给延迟任务的交换器Exchange[ebats_core_dtask]
4、将任务执行信息回调给上层开发者以便保存查看

消费者代码如下:

require_once DIR.'/../autoload.php';
 
require_once DIR.'/task/TaskDemoModel.php';
 
use Asynclib\Ebats\Worker;
 
  
 
//执行结果回调函数
 
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){
 
  
 
};
 
$worker = new Worker($callback);  //支持多进程消费默认为1
 
$worker->setQueue('demo');  //队列名和事件的topic一一对应
 
$worker->run();

自定义调度器

一般来说这是一个基于事件的任务系统,那么能不能直接产生任务呢。答案是肯定的。

只需要创建一个自定义调度器,由您自行实现调度逻辑,最终生成一个任务即可。代码如下:

<?php
 
require_once DIR.'/../autoload.php';
 
use Asynclib\Ebats\Task;
 
use Asynclib\Core\Consumer;
 
use Asynclib\Amq\ExchangeTypes;
 
use Asynclib\Exception\ExceptionInterface;
 
  
 
/**
 
 * 本示例演示了如何创建一个自定义调度器,开发者可以根据自身需求开发自己的任务调度器
 
 */
 
try{
 
    $worker = new Consumer();
 
    $worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
 
    $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);
 
    $worker->run(function($key, $msg){
 
        $order_data = json_encode($msg);
 
        echo " [$key] $order_data \n";
 
        Task::create('demo', 'orderAsync', $msg);//创建任务,之后消息将作为参数由任务接管处理
 
    });
 
}catch (ExceptionInterface $exc){
 
    echo $exc->getMessage();
 
}

这样,当接收到消息时就会产生一个orderAsync的任务,您只需要启动一个用来消费这个Topic的Worker即可。

也许你会觉得这里直接写业务逻辑的代码就可以了,实际上也确实可以。当你可以忍受一个进程慢慢消费的时候是可以这样做的。但大多数情况下我们还是希望它能够尽快的消费掉,所以建议这里只负责创建任务,具体任务的业务逻辑由worker去执行。

以上就是如何用RabbitMQ和Swoole实现一个异步任务系统的详细内容,更多关于用RabbitMQ和Swoole实现一个异步任务系统的资料请关注猪先飞其它相关文章!

[!--infotagslink--]

相关文章

  • 深入分析C#中的异步和多线程

    这篇文章主要介绍了C#中异步和多线程的相关资料,帮助大家更好的理解和学习c#,感兴趣的朋友可以了解下...2021-01-16
  • C#多线程与异步的区别详解

    多线程和异步操作两者都可以达到避免调用线程阻塞的目的,从而提高软件的可响应性。甚至有些时候我们就认为多线程和异步操作是等同的概念。但是,多线程和异步操作还是有一些区别的。而这些区别造成了使用多线程和异步操作的时机的区别...2020-06-25
  • 详解C# Socket异步通信实例

    本篇文章主要介绍了C# Socket异步通信,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2020-06-25
  • c# winform异步不卡界面的实现方法

    这篇文章主要给大家介绍了关于c# winform异步不卡界面的实现方法,文中通过示例代码介绍的非常详细,对大家学习或者使用c#具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧...2020-06-25
  • JS异步的执行原理和回调详解

    这篇文章主要给大家介绍了关于JS异步的执行原理和回调的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-03-08
  • JQuery基于FormData异步提交数据文件

    这篇文章主要介绍了JQuery基于FormData异步提交数据文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-09-02
  • C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

    这篇文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-12-08
  • C#操作RabbitMQ的完整实例

    这篇文章主要为大家详细介绍了C#操作RabbitMQ的完整实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-06-25
  • 如何使用PHP+jQuery+MySQL实现异步加载ECharts地图数据(附源码下载)

    ECharts地图主要用于地理区域数据的可视化,展示不同区域的数据分布信息,通过本文给大家介绍如何使用PHP+jQuery+MySQL实现异步加载ECharts地图数据,需要的朋友参考下吧...2016-02-26
  • JQuery异步提交表单与文件上传功能示例

    这篇文章主要介绍了JQuery异步提交表单与文件上传功能,结合实例形式分析了jQuery表单提交及文件传输操作的相关实现技巧,需要的朋友可以参考下...2017-01-16
  • C#异步的世界(下)

    这篇文章主要介绍了C#异步的世界(下),对异步感兴趣的同学,可以参考下...2021-04-26
  • springboot+rabbitmq实现指定消费者才能消费的方法

    当项目部署到测试环境后,QA测试过程中,总是“莫名其妙”的发现所保存的用户付款单数据有问题。这篇文章主要介绍了springboot+rabbitmq实现指定消费者才能消费,需要的朋友可以参考下...2021-11-01
  • c#异步操作后台运行(backgroundworker类)示例

    这篇文章主要介绍了c#异步操作后台运行(backgroundworker类)示例,需要的朋友可以参考下...2020-06-25
  • SpringMVC和rabbitmq集成的使用案例

    这篇文章主要介绍了SpringMVC和rabbitmq集成的使用案例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-01-20
  • ASP.Net中的async+await异步编程的实现

    这篇文章主要介绍了ASP.Net中的async+await异步编程的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • Spring @Async无法实现异步的解决方案

    这篇文章主要介绍了Spring @Async无法实现异步的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-10-02
  • 又拍云异步上传实例教程详解

    这篇文章主要介绍了又拍云异步上传实例教程详解的相关资料,需要的朋友可以参考下...2016-04-23
  • 深入浅析NodeJs并发异步的回调处理

    这篇文章主要介绍了NodeJs并发异步的回调处理的相关资料,需要的朋友可以参考下...2015-12-24
  • Yii2.0中使用js异步删除示例

    本篇文章主要介绍了Yii2.0中使用js异步删除示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧 ...2017-03-12
  • C#实现异步发送邮件的方法

    这篇文章主要介绍了C#实现异步发送邮件的方法,涉及C#异步操作与邮件发送的技巧,非常具有实用价值,需要的朋友可以参考下...2020-06-25