• 展开微博窗口
  • QQ:52619941
  • 微信:cnmemory
  • 展开分类目录
  • 还没有账号?

Memory

利用swoole的task进程和redis实现队列任务

swoole简介

面向生产环境的 PHP 异步网络通信引擎,可以通过该扩展快速创建TCP/UDP服务端、Http服务端、websocket服务端。

Task进程简介

如果没有接触过swoole可以先上swoole官网了解swoole的进程模型

swoole_server中提供了task的进程,用来异步处理耗时的任务,只需要配置swoole_server时设置task_worker_num即可启用task进程,在worker进程中使用$server->task(array $data)进行任务的投递,但是在任务耗时且量大的情况下会出现任务丢失的现象,所以决定采用第三方来实现队列,来确保数据的完整和可靠性。

队列的实现

通过redislist结构可以很方便的实现一个FIFO队列。利用lpush向队列中插入数据,brpop从队列中取出数据,并且是个阻塞的函数。

实现生产消费

我的目的是通过http请求能够实现任务的投递,所以采用swoole_http_server来实现一个HTTP服务器,代码很简单。

$server = new swoole_http_server('0.0.0.0', 9001);

这里处理HTTP请求的进程我们称之为worker进程,所以在worker进程中我们处理任务投递,即将数据插入队列,在task进程中以while(true)的方式实现任务的消费。

先说任务的消费,我们需要在onWokerStart(woker进程启动)事件中让task进程不断的(while(true))从队列中读取数据,这里会有一个问题,由于TaskWorker进程同属于Manager进程的子进程,所以在执行onWorkerStart事件的时候,taskworker进程都会执行里面的代码片段,我们需要区分来确保worker进程只负责投递任务,task进程只负责消费队列中的数据。通过以下代码可以进行区分

$worker_id >= $server->setting['worker_num'] ? "Task" : "Worker";

每个worker/task进程都会有一个唯一的id(并非pid),当这个id大于设置的woker进程数时就是task进程。所以接下来的onWokerStart中的代码应该是这样的

$server->on('workerstart', function($server){
    $redis = new Redis(); // 为每个进程创建一个redis链接,当然你也可以实现一个redis连接池来充分利用redis的连接资源
    $redis->connect('127.0.0.1');
    if ($woker_id >= $server->setting['worker_num']) {
        //Task进程消费
        while(true) {
            //brpop第二个参数5表示超时(阻塞等待)时间
            if (($message = $redis->brpop('queue', 5))===null) {
                sleep(1); //如果没有数据,休息一会
                continue;
            }
            $data = $message[1];
            //处理数据,我们就简单的echo一下吧
            echo $data . PHP_EOL;
        }
    } else {
        //woker进程赋值一下server方便之后调用redis连接
        $server->redis = &$redis;
    }
});

为什么不采用$server->task(...)的方式投递任务?因为上面提到的,在耗时且任务量大的情况下,task()函数推送任务会出现失败的情况,我们需要保证任务有序且完整的进行,则让task进程直接以while的方式从redis中pop出任务数据。

投递任务

request事件中,即worker进程中处理任务的投递。

//绑定请求事件,当接受到http请求时会执行下面匿名函数中的代码
$server->on("request", function($req, $resp) use($server){
    $redis = $server->redis;
    $redis->lpush("hello");
});

总结

故事到了这里基本结束了,当http server收到请求的时候就会推送一条hello的数据到队列中,task进程就会取出这条数据并打印在屏幕上。这里只提供了一个队列插入和取出的思路,还有很多的细节没有实现,比如不同的任务执行不同的代码逻辑等等,我想我可能真的是太懒了。

码字很辛苦,转载请注明来自雨林寒舍《利用swoole的task进程和redis实现队列任务》

评论

  1. asds #1

    请问一下,如果我在 open 事件中获取到userid,然后此用户对应的队列是 user_xxx_messages ,xxx为用户的id, 如何监听到对应的队列?在workerstart中无法获取到 userid

    回复
    2019-02-20