定制 easyswoole/queue 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

easyswoole/queue

Composer 安装命令:

composer require easyswoole/queue

包简介

A simple Queue implementation integrated into easySwoole

README 文档

README

安装

composer require easyswoole/queue 

队列驱动

任何队列驱动都必须实现EasySwoole\Queue\QueueDriverInterface这个接口定义。且实现的对象一定是必须可被克隆(可以看Queue中的Producer方法即知道为何)。 队列在被加载到对应topic的Producer和Consumer时,都会被分别执行一次init()方法。

创建队列

use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config;

$config = new Config([
    'host'=>"",
    'port'=>"",
    'auth'=>""
]);

$driver = new RedisQueue($config);
$queue = new Queue($driver);

普通生产

$job = new Job();
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
$queue->producer("topic")->push($job);

普通消费

$job = $queue->consumer("topic")->pop();
//或者是自定义进程中
$queue->consumer("topicName")->listen(function (Job $job){
    var_dump($job);
});

CLI 单独使用

use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config;
use Swoole\Coroutine;
use Swoole\Coroutine\Scheduler;

require "vendor/autoload.php";

$sc = new Scheduler();
$sc->add(function (){
    $config = new Config([
        'host'=>"",
        'port'=>"",
        'auth'=>""
    ]);

    $driver = new RedisQueue($config);
    $queue = new Queue($driver);

    Coroutine::create(function ()use($queue){
        while (1){
            Coroutine::sleep(3);
            $job = new Job();
            $job->setJobData("job test create at ".time());
            try {
                $queue->producer("test")->push($job);
            }catch (\Throwable $throwable){

            }
        }
    });

    Coroutine::create(function ()use($queue){
        while (1){
            Coroutine::sleep(5);
            $job = new Job();
            $job->setJobData("job another create at ".time());
            try {
                $queue->producer("another")->push($job);
            }catch (\Throwable $throwable){

            }
        }
    });

    Coroutine::create(function ()use($queue){
        $queue->consumer("test")->listen(function (Job $job){
            var_dump($job->getJobData() ." hande in test");
        },[],function (){

        });
    });

    Coroutine::create(function ()use($queue){
        $queue->consumer("another")->listen(function (Job $job){
            var_dump($job->getJobData() ." hande in another");
        },[],function (){

        });
    });

});

$sc->start();

延迟任务

$job = new Job();
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
$job->setDelayTime(5);//设置延后时间
$queue->producer("topic")->push($job);

可信任务

$job = new Job();
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
$job->setRetryTimes(3);//任务如果没有确认,则会执行三次
$job->setWaitConfirmTime(5);//如果5秒内没确认任务,会重新回到队列。默认为3秒
$queue->producer("topic")->push($job);//投递任务
//确认一个任务
$queue->consumer("topic")->confirm($job);

消费者控制

当队列未popjob时:

/** @var \EasySwoole\Queue\Queue $queue */
$queue->consumer("topic")->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {
     // todo 
 })->listen(function (\EasySwoole\Queue\Job $job){ });

设置breakTime

/** @var \EasySwoole\Queue\Queue $queue */
$queue->consumer("topic")->setBreakTime(0.1);

统计信息

  • 总下载量: 34.47k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 12
  • 点击次数: 1
  • 依赖项目数: 8
  • 推荐数: 0

GitHub 信息

  • Stars: 12
  • Watchers: 0
  • Forks: 10
  • 开发语言: PHP

其他信息

  • 授权协议: Apache-2.0
  • 更新时间: 2017-12-07

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固