定制 qxsch/worker-pool 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

qxsch/worker-pool

最新稳定版本:v2.0.2

Composer 安装命令:

composer require qxsch/worker-pool

包简介

Runs tasks in a parallel processing workerpool.

README 文档

README

Project Status

Latest Stable Version Total Downloads License

Parallel Processing WorkerPool for PHP

10K Downloads within 4 Months, thank you very much! We're adding features as anyone requires them.

Examples

The WorkerPool class provides a very simple interface to pass data to a worker pool and have it processed. You can at any time fetch the results from the workers. Each worker child can receive and return any value that can be serialized.

A simple example

<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /**  * @param mixed $input the input from the WorkerPool::run() Method  * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers  * @param \ArrayObject $storage a persistent storage for the current child process  */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! return $input; // return null here, in case you do not want to pass any data to the parent  } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { echo $val->dump() . "\n"; // dump the returned values // var_dump($val); // dump the returned values }

A more sophisticated example

<?php use QXS\WorkerPool\WorkerPool; use QXS\WorkerPool\WorkerInterface; use QXS\WorkerPool\Semaphore; /**  * Our Worker Class  */ Class MyWorker implements WorkerInterface { protected $sem; /**  * after the worker has been forked into another process  *  * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks  * @throws \Exception in case of a processing Error an Exception will be thrown  */ public function onProcessCreate(Semaphore $semaphore) { // semaphore can be used in the run method to synchronize the workers $this->sem=$semaphore; // write something to the stdout echo "\t[".getmypid()."] has been created.\n"; // initialize mt_rand list($usec, $sec) = explode(' ', microtime()); mt_srand((int)( (float) $sec + ((float) $usec * 100000) )); } /**  * before the worker process is getting destroyed  *  * @throws \Exception in case of a processing Error an Exception will be thrown  */ public function onProcessDestroy() { // write something to the stdout echo "\t[".getmypid()."] will be destroyed.\n"; } /**  * run the work  *  * @param Serializeable $input the data, that the worker should process  * @return Serializeable Returns the result  * @throws \Exception in case of a processing Error an Exception will be thrown  */ public function run($input) { $input=(string)$input; echo "\t[".getmypid()."] Hi $input\n"; sleep(mt_rand(0,10)); // this is the workload! // and sometimes exceptions might occur if(mt_rand(0,10)==9) { throw new \RuntimeException('We have a problem for '.$input.'.'); } return "Hi $input"; // return null here, in case you do not want to pass any data to the parent } } $wp=new WorkerPool(); $wp->setWorkerPoolSize(10) ->create(new MyWorker()); // produce some tasks for($i=1; $i<=50; $i++) { $wp->run($i); } // some statistics echo "Busy Workers:".$wp->getBusyWorkers()." Free Workers:".$wp->getFreeWorkers()."\n"; // wait for completion of all tasks $wp->waitForAllWorkers(); // collect all the results foreach($wp as $val) { if(isset($val['data'])) { echo "RESULT: ".$val['data']."\n"; } elseif(isset($val['workerException'])) { echo "WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n"; } elseif(isset($val['poolException'])) { echo "POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n"; } } // write something, before the parent exits echo "ByeBye\n";

Synchronize your workers

In case you need to access shared ressources, you can synchronize your workers.

<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /**  * @param mixed $input the input from the WorkerPool::run() Method  * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers  * @param \ArrayObject $storage a persistent storge for the current child process  */ function($input, $semaphore, $storage) { $semaphore->synchronizedBegin(); try { // this code is being synchronized accross all workers // so here we have just one worker at a time echo "[A][".getmypid()."]"." hi $input\n"; } finally { $semaphore->synchronizedEnd(); } // alternative example $semaphore->synchronize(function() use ($input, $storage) { // this code is being synchronized accross all workers // so here we have just one worker at a time echo "[B][".getmypid()."]"." hi $input\n"; }); sleep(rand(1,3)); // this is the working load! return $input; } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }

Disable semaphore (ability to synchronize workers)

You can disable the semaphore. Some people complained about opening semaphores, that they do not need at all.

<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->disableSemaphore() // <--- this disables the semaphore support (you can still use it in the worker, but it will have no effect) ->create(new \QXS\WorkerPool\ClosureWorker( /**  * @param mixed $input the input from the WorkerPool::run() Method  * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers  * @param \ArrayObject $storage a persistent storage for the current child process  */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! return $input; // return null here, in case you do not want to pass any data to the parent  } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }

Automatic respawn

You can choose to automatically respawn dead workers.

<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->respawnAutomatically() ->create(new \QXS\WorkerPool\ClosureWorker( /**  * @param mixed $input the input from the WorkerPool::run() Method  * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers  * @param \ArrayObject $storage a persistent storage for the current child process  */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! // Simulate unexpected worker death if (rand(1, 10) > 5) exit; return $input; // return null here, in case you do not want to pass any data to the parent  } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }

Each time a worker dies, a new one will be created with an incremented index.

You should avoid the situation where a worker dies but the respawn capability can be a useful workaround until you fix the situation.

Transparent output to ps

See what's happening when running a PS:

root 2378 \_ simpleExample.php: Parent root 2379 \_ simpleExample.php: Worker 1 of QXS\WorkerPool\ClosureWorker [busy] root 2380 \_ simpleExample.php: Worker 2 of QXS\WorkerPool\ClosureWorker [busy] root 2381 \_ simpleExample.php: Worker 3 of QXS\WorkerPool\ClosureWorker [free] root 2382 \_ simpleExample.php: Worker 4 of QXS\WorkerPool\ClosureWorker [free] 

Documentation

The documentation can be found here http://qxsch.github.io/WorkerPool/doc/

统计信息

  • 总下载量: 319.63k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 108
  • 点击次数: 3
  • 依赖项目数: 2
  • 推荐数: 1

GitHub 信息

  • Stars: 105
  • Watchers: 8
  • Forks: 21
  • 开发语言: PHP

其他信息

  • 授权协议: GPL-3.0
  • 更新时间: 2026-01-04

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固