定制 etouches/phplumber 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

etouches/phplumber

Composer 安装命令:

composer require etouches/phplumber

包简介

Pipelining library which allows a mix of asynchronous and synchronous processes.

README 文档

README

Phplumber is a very simple pipelining library for PHP which allows a mix of asynchronous and synchronous processes. Use it to break a large process into multiple steps using a queue and multi-processing. Phplumber contains no hard-coded dependencies and can be backed by any queue setup and any storage mechanism.

Requirements

  • PHP 5.3+
  • A queue, such as RabbitMQ or Redis
  • Storage, such as a relational database table or Redis

Example

Let's take creating and filling a database as an example. It takes multiple steps and some can be done concurrently.

  1. Create database (one process)
  2. Create and populate tables (one process per table)
  3. Create views dependent on multiple tables (one process, dependent on all tables existing)
                         Create table 1
                       /                \
    Create database ->   Create table 2   -> Create views
                       \                /
                         Create table 3

First we would define our processes. Sequential steps extend Process. A step that can run multiple times with different data extends MultiProcess.

class CreateDatabase extends Process
{
    public function invoke($payload)
    {
        $database_name = $payload['database_name'];
        echo "Drop database $database_name if it exists...\n";
        echo "Creating database $database_name...\n";
    }
}
class CreateTable extends MultiProcess
{
    // Determine the data we need to queue for async processes
    public function getAsyncPayloads($payload)
    {
        $database_name = $payload['database_name'];
        $table_names = array('first_table', 'second_table', 'third_table');
        
        $payloads = array();
        foreach ($table_names as $table) {
            $payloads[] = array('database_name' => $database_name, 'table_name' => $table);
        }
        return $payloads;
    }
    
    public function invoke($payload)
    {
        $database_name = $payload['database_name'];
        $table_name = $payload['table'];
        echo "Connecting to database $database_name...\n";
        echo "Creating and populating $table_name...\n";
        
        switch ($payload['table']) {
            case 'first_table':
                // Create table and insert rows...
                break;
            // ...
        }
    }
}

Then we define the sequence of processes.

class CreateAndFillDatabase extends ProcessList
{
    protected function setup()
    {
        $this
            ->add('CreateDatabase')
            ->add('CreateTable')
            ->add('CreateViews');
    }
}

We can now kick off the process sequence.

$equation = new CreateAndFillDatabase();
$equation->process(array('database_name' => 'test_db'));

See the examples directory for complete, working demos.

Getting Started

  1. Implement StorageInterface. This will hold semaphore data. Appropriate storage engines include any relational database, nosql, or key-value stores such as Redis.
  2. Extend the Queue class to integrate with a queue system. Appropriate queue engines include Redis and RabbitMQ.
  3. Write each process as a class that extends Process (for synchronous) or MultiProcess (for asynchronous).
  4. Implement ProcessFactoryInterface. This will create each instance of ProcessInterface, allowing you to set up your processes with any prerequisites, such as a database connection or configuration options.
  5. Put the processes together by extending ProcessList.
  6. Implement a worker daemon which instantiates your Queue implementation and calls consume() to listen for incoming messages. Each message is to invoke a single part of a multi-process. Run multiple workers to execute processes concurrently.
  7. Choose a place in your system to start the entire workflow, instantiate your ProcessList, and call process(), passing it the initial payload.

统计信息

  • 总下载量: 8
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 7
  • 点击次数: 1
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 6
  • Watchers: 2
  • Forks: 1
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2017-07-27

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固