PHP实现RabbitMQ消息列队的示例代码

 更新时间:2022年5月10日 23:24  点击:582 作者:PHP开源社区

业务场景

项目公司是主php做开发的,框架为thinkphp。众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序。首先我想到了php的workerman与swoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理。所以这里我想独立部署一个rabbitMQ服务器用于处理列队任务。

当rabbitMQ服务器我们准备好了,建立了一个持久化命名为ceshi的列队,如下:

项目上生产者和消费者的开发我这里全部采用tinkphp6+workerman,为便于管理。这里这么做也是因为发现workerman中对rabbitMQ的文档解释太少了!

所以开始踩坑!

1、首先部署好thinkphp6框架

过程去看thinkphp6手册

2、安装workerman扩展

过程去看thinkphp6手册

3、生产者

配置一个workerman类

创建的Send类代码如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
    //websocket地址,一会用于测试。
    protected $socket = 'websocket://127.0.0.1:2345';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{
        //websocket发送过来的消息
        $connection->send('我收到你的信息了:'.$data);
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通讯端口
            'user'=>'admin',//rabbitMQ 账号
            'password'=>'123456'//rabbitMQ 密码
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 创建队列(Queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) use($data){
            echo "发送消息内容:".$data."\n";

            /**
             * 发送消息
             * body 发送的数据
             * headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型
             * exchange 交换器名称
             * routingKey 路由key
             * mandatory
             * immediate
             * @return bool|PromiseInterface|int
             */

            return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            //echo " [x] Sent 'Hello World!'\n";
            $client = $channel->getClient();
            return $channel->close()->then(function () use ($client) {
                return $client;
            });
        })->then(function (Client $client) {
            $client->disconnect();
        });
    }

    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
{


    }
}

上述都OK以后咱们可以项目路径下通过命令启动这个生产者:

php think worker:server

测试发送数据:

通过这个网站

连接【ws://127.0.0.1:2345】后发送数据!

前往rabbitMQ控制台

列队中有一条消息产生并且等待了!

这个时候你可能问,如果我发送数据不想通过ws发送而是接口发送怎么办?

笨思路呗:接口给内置服务器发消息->内置服务去发消息给rabbitMQ

将协议改为tcp

然后重新启动服务

然后去tp6创建一个路由接口

接口代码

<?php
namespace app\controller;

use app\BaseController;

class Index extends BaseController
{
    public function index(string $msg)
{
        //连接本地tcp服务
        $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
        //发送字符串
        fwrite($client, $msg."\n");
        //断开服务
        fclose($client);
        return 'OK';
    }

}

执行结果:

说明接口成功的将数据发送给了本地内置的tcp服务。

同时,内置服务将收到的数据给了rabbitMQ服务列队中。

生产者完成。

4、消费者

同生产者一样新创建一个thinkphp6及安装workerman扩展,注意端口别和生产者冲突!这里我设置的是2346端口

创建的Receive类代码如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
    protected $socket = 'tcp://127.0.0.1:2346';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{

    }

    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
{
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通讯端口
            'user'=>'admin',//rabbitMQ 账号
            'password'=>'123456'//rabbitMQ 密码
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 创建队列(Queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
            $channel->consume(
                function (Message $message, Channel $channel, Client $client) {
                    echo "接收消息内容:", $message->content, "\n";
                },
                'ceshi',
                '',
                false,
                true
            );
        });

    }
}

都OK以后咱们可以项目路径下通过命令启动这个消费者:

php think worker:server

此时应该会自动消费掉rabbitMQ中等待的消息!

到这里消费者也就结束啦!

5、整体测试

接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!

至于具体怎么灵活应用自行开拓大脑哦~

比如php项目有些业务吃力,可以去做个java的消费端,让java来完成任务~

以上就是PHP实现RabbitMQ消息列队的示例代码的详细内容,更多关于PHP RabbitMQ消息列队的资料请关注猪先飞其它相关文章!

原文出处:https://mp.weixin.qq.com/s/af_NBSkHVoHyhQQkF4264Q

[!--infotagslink--]

相关文章

  • 源码分析系列之json_encode()如何转化一个对象

    这篇文章主要介绍了源码分析系列之json_encode()如何转化一个对象,对json_encode()感兴趣的同学,可以参考下...2021-04-22
  • php中去除文字内容中所有html代码

    PHP去除html、css样式、js格式的方法很多,但发现,它们基本都有一个弊端:空格往往清除不了 经过不断的研究,最终找到了一个理想的去除html包括空格css样式、js 的PHP函数。...2013-08-02
  • index.php怎么打开?如何打开index.php?

    index.php怎么打开?初学者可能不知道如何打开index.php,不会的同学可以参考一下本篇教程 打开编辑:右键->打开方式->经文本方式打开打开运行:首先你要有个支持运行PH...2017-07-06
  • PHP中func_get_args(),func_get_arg(),func_num_args()的区别

    复制代码 代码如下:<?php function jb51(){ print_r(func_get_args()); echo "<br>"; echo func_get_arg(1); echo "<br>"; echo func_num_args(); } jb51("www","j...2013-10-04
  • PHP编程 SSO详细介绍及简单实例

    这篇文章主要介绍了PHP编程 SSO详细介绍及简单实例的相关资料,这里介绍了三种模式跨子域单点登陆、完全跨单点域登陆、站群共享身份认证,需要的朋友可以参考下...2017-01-25
  • PHP实现创建以太坊钱包转账等功能

    这篇文章主要介绍了PHP实现创建以太坊钱包转账等功能,对以太坊感兴趣的同学,可以参考下...2021-04-20
  • php微信公众账号开发之五个坑(二)

    这篇文章主要为大家详细介绍了php微信公众账号开发之五个坑,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2016-10-02
  • PHP如何通过date() 函数格式化显示时间

    这篇文章主要介绍了PHP如何通过date() 函数格式化显示时间,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-11-13
  • ThinkPHP使用心得分享-ThinkPHP + Ajax 实现2级联动下拉菜单

    首先是数据库的设计。分类表叫cate.我做的是分类数据的二级联动,数据需要的字段有:id,name(中文名),pid(父id). 父id的设置: 若数据没有上一级,则父id为0,若有上级,则父id为上一级的id。数据库有内容后,就可以开始写代码,进...2014-05-31
  • C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

    这篇文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-12-08
  • C#操作RabbitMQ的完整实例

    这篇文章主要为大家详细介绍了C#操作RabbitMQ的完整实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-06-25
  • golang与php实现计算两个经纬度之间距离的方法

    这篇文章主要介绍了golang与php实现计算两个经纬度之间距离的方法,结合实例形式对比分析了Go语言与php进行经纬度计算的相关数学运算技巧,需要的朋友可以参考下...2016-07-29
  • PHP+jQuery+Ajax实现多图片上传效果

    今天我给大家分享的是在不刷新页面的前提下,使用PHP+jQuery+Ajax实现多图片上传的效果。用户只需要点击选择要上传的图片,然后图片自动上传到服务器上并展示在页面上。...2015-03-15
  • PHP正则表达式过滤html标签属性(DEMO)

    这篇文章主要介绍了PHP正则表达式过滤html标签属性的相关内容,实用性非常,感兴趣的朋友参考下吧...2016-05-06
  • php构造方法中析构方法在继承中的表现

    这篇文章主要为大家详细介绍了php构造方法中析构方法在继承中的表现,感兴趣的小伙伴们可以参考一下...2016-04-15
  • PHP如何使用cURL实现Get和Post请求

    这篇文章主要介绍了PHP如何使用cURL实现Get和Post请求,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-07-11
  • thinkPHP中多维数组的遍历方法

    这篇文章主要介绍了thinkPHP中多维数组的遍历方法,以简单实例形式分析了thinkPHP中foreach语句的使用技巧,需要的朋友可以参考下...2016-01-12
  • PHP简单实现生成txt文件到指定目录的方法

    这篇文章主要介绍了PHP简单实现生成txt文件到指定目录的方法,简单对比分析了PHP中fwrite及file_put_contents等函数的使用方法,需要的朋友可以参考下...2016-04-28
  • php判断邮箱地址是否存在的方法

    这篇文章主要介绍了php判断邮箱地址是否存在的方法,php判断邮箱地址是否存在的方法有两种,感兴趣的朋友可以参考一下...2016-02-18
  • php图片添加文字水印实现代码

    这篇文章主要为大家详细介绍了php图片添加文字水印实现代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2016-03-17