利用swoole的task进程和redis实现队列任务
swoole简介
面向生产环境的 PHP 异步网络通信引擎,可以通过该扩展快速创建TCP/UDP服务端、Http服务端、websocket服务端。
Task进程简介
如果没有接触过swoole可以先上swoole官网了解swoole的进程模型
在swoole_server
中提供了task
的进程,用来异步处理耗时的任务,只需要配置swoole_server
时设置task_worker_num
即可启用task进程,在worker
进程中使用$server->task(array $data)
进行任务的投递,但是在任务耗时且量大的情况下会出现任务丢失的现象,所以决定采用第三方来实现队列,来确保数据的完整和可靠性。
队列的实现
通过redis
的list
结构可以很方便的实现一个FIFO
队列。利用lpush
向队列中插入数据,brpop
从队列中取出数据,并且是个阻塞的函数。
实现生产消费
我的目的是通过http请求能够实现任务的投递,所以采用swoole_http_server
来实现一个HTTP服务器,代码很简单。
$server = new swoole_http_server('0.0.0.0', 9001);
这里处理HTTP请求的进程我们称之为worker
进程,所以在worker
进程中我们处理任务投递,即将数据插入队列,在task
进程中以while(true)
的方式实现任务的消费。
先说任务的消费,我们需要在onWokerStart
(woker进程启动)事件中让task
进程不断的(while(true)
)从队列中读取数据,这里会有一个问题,由于Task
和Worker
进程同属于Manager进程
的子进程,所以在执行onWorkerStart
事件的时候,task
和worker
进程都会执行里面的代码片段,我们需要区分来确保worker
进程只负责投递任务,task
进程只负责消费队列中的数据。通过以下代码可以进行区分
$worker_id >= $server->setting['worker_num'] ? "Task" : "Worker";
每个worker/task
进程都会有一个唯一的id(并非pid
),当这个id大于设置的woker进程数时就是task进程。所以接下来的onWokerStart
中的代码应该是这样的
$server->on('workerstart', function($server){
$redis = new Redis(); // 为每个进程创建一个redis链接,当然你也可以实现一个redis连接池来充分利用redis的连接资源
$redis->connect('127.0.0.1');
if ($woker_id >= $server->setting['worker_num']) {
//Task进程消费
while(true) {
//brpop第二个参数5表示超时(阻塞等待)时间
if (($message = $redis->brpop('queue', 5))===null) {
sleep(1); //如果没有数据,休息一会
continue;
}
$data = $message[1];
//处理数据,我们就简单的echo一下吧
echo $data . PHP_EOL;
}
} else {
//woker进程赋值一下server方便之后调用redis连接
$server->redis = &$redis;
}
});
为什么不采用$server->task(...)的方式投递任务?因为上面提到的,在耗时且任务量大的情况下,task()函数推送任务会出现失败的情况,我们需要保证任务有序且完整的进行,则让task进程直接以while的方式从redis中pop出任务数据。
投递任务
在request
事件中,即worker
进程中处理任务的投递。
//绑定请求事件,当接受到http请求时会执行下面匿名函数中的代码
$server->on("request", function($req, $resp) use($server){
$redis = $server->redis;
$redis->lpush("hello");
});
总结
故事到了这里基本结束了,当http server
收到请求的时候就会推送一条hello
的数据到队列中,task
进程就会取出这条数据并打印在屏幕上。这里只提供了一个队列插入和取出的思路,还有很多的细节没有实现,比如不同的任务执行不同的代码逻辑等等,我想我可能真的是太懒了。
请问一下,如果我在 open 事件中获取到userid,然后此用户对应的队列是 user_xxx_messages ,xxx为用户的id, 如何监听到对应的队列?在workerstart中无法获取到 userid