<?php
/**
* 子进程管理工具,处理大数据时,防止创建过多的子进程导致内存过载
* swoole 官网 https://wiki.swoole.com/
*/
namespace app\library;
use Swoole\Process;
/**
* Class SubProcess 基于swoole实现的子简单进程管理,主要用于让"子进程"完成数据处理任务。
* eg:
* $subProcess = new SubProcess(10); //最多启用10个子进程
* $id = 0;
* while(1){
* $list = $db->where(['>', 'id', $id])->limit()->queryAll();//获取一批数据
*. if(!$list) break;
*.
* $id = $list[count($list)-1]['id'];
* $subProcess->do(function()use($list){//交付给子进程执行
* Yii::$app->db->close();//重新启用db,子进程中资源类型的变量将会失效,重新获取
* //do something
* });
* }
* $subProcess->wait();//等待所有子进程执行完毕
* @package app\library
*/
Class SubProcess
{
protected $maxNum = 0;
protected $list = [];
public function __construct($maxNum = 10)
{
$this->maxNum = $maxNum;
}
/**
* 在子进程中处理
* @param $callback
*/
public function do($callback)
{
$this->canAdd();
$process = new Process(function () use ($callback) {
call_user_func($callback);
});
$process->start();
$this->list[$process->pid] = 1;
}
/**
* 等待所有子进程完成
*/
public function wait()
{
while (count($this->list)) {
$this->waitASubProcess();
}
}
/**
* 是否需要添加子进程
* @return bool
*/
protected function canAdd()
{
if (count($this->list) < $this->maxNum) {
return true;
}
$this->waitASubProcess();
return true;
}
/**
* 等待一个子进程结束
*/
protected function waitASubProcess()
{
$status = Process::wait();
unset($this->list[$status['pid']]);
}
}