承接 sbooker/command-bus 相关项目开发

从需求分析到上线部署,全程专人跟进,保证项目质量与交付效率

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

sbooker/command-bus

最新稳定版本:1.2.1

Composer 安装命令:

composer require sbooker/command-bus

包简介

Command bus

README 文档

README

Read in English

Asynchronous Command Bus (sbooker/command-bus)

Latest Version Software License PHP Version Total Downloads

Надежная, транзакционная, персистентная шина для асинхронного выполнения команд с поддержкой повторных попыток и отслеживанием состояния.

Назначение библиотеки

Эта библиотека решает две ключевые задачи в современных приложениях:

1. Надежное асинхронное выполнение задач. Библиотека позволяет атомарно сохранить команду на выполнение долгой или ненадежной операции (отправка email, запрос к API) в базу данных. Отдельный фоновый процесс (воркер) гарантированно выполнит эту команду позже.

2. Создание фундамента для отказоустойчивых Process Managers (Sagas). В event driven системах обработка доменных событий может быть хрупкой. Например, если подписчик на событие OrderPaid пытается выполнить операцию Warehouse::reserve() и получает ошибку, вся очередь обработки событий может быть заблокирована. Выход — сделать обработку событий простой и безошибочной, преобразуя событие или последовательность событий в команду(ы). Обычно для этого используют паттерн Process Manager (Saga). Ответственность за преобразование события в команду лежит на вашем коде (в подписчике на события). Сама библиотека не реализует паттерн Process Manager, но существенно облегчает его реализацию, предоставляя надежный, транзакционный механизм для асинхронного выполнения этой команды, изолируя сбои и не блокируя обработку других событий.

Ключевые особенности

  • Основа для Process Manager / Saga: Предоставляет надежный механизм выполнения команд, который является ключевой частью реализации Process Manager.
  • Транзакционная надежность: Прием команды происходит атомарно благодаря интеграции с sbooker/transaction-manager.
  • Декларативные стратегии ретраев: Логика повторных попыток не находится в обработчике. Она декларативно настраивается для каждой команды через TimeoutCalculator. Сама персистентная сущность Command отслеживает количество попыток и время следующего запуска. Воркер лишь пытается выполнить готовые к запуску команды.
  • Гарантированное выполнение (At-Least-Once): Команда будет выполняться до тех пор, пока не завершится успешно или не исчерпает все попытки.
  • Отслеживание состояния: В любой момент можно узнать статус команды (created, pending, success, fail).
  • Хуки на результат: Определяйте колбэки (Invoker) на успешное или неуспешное завершение.

Установка

composer require sbooker/command-bus

Вам также понадобятся зависимости из экосистемы и адаптеры:

composer require sbooker/transaction-manager sbooker/workflow sbooker/event-loop-worker
composer require sbooker/doctrine-transaction-handler # Если вы используете Doctrine

Быстрый старт

Сценарий 1: Асинхронное выполнение задачи

Задача: Отправить приветственный email, не заставляя пользователя ждать.

1. Команда и ее Endpoint:

// src/Mailing/Command/SendEmail.php
final class SendEmail { /* ... DTO с to, subject, body ... */ }

// src/Mailing/Endpoint/SendEmailEndpoint.php
use Sbooker\CommandBus\Endpoint;
final class SendEmailEndpoint implements Endpoint
{
    private Mailer $mailer;
    public function process(UuidInterface $id, object $payload): void
    {
        /** @var SendEmail $payload */
        $this->mailer->send(/* ... */);
    }
}

2. Отправка команды:

// src/UseCase/RegisterUser/Handler.php
/** @var Sbooker\CommandBus\CommandBus $commandBus */

$command = new SendEmail('welcome@user.com', 'Welcome!', '...');
$commandBus->accept(Uuid::uuid4(), $command); // Быстро и надежно

Сценарий 2 (Продвинутый): Построение Process Manager

Задача: После оплаты заказа (OrderPaid event) надежно зарезервировать товары на складе.

1. Команда и ее Endpoint:

// src/Warehouse/Command/ReserveItems.php
final class ReserveItems { /* ... DTO с orderId, items ... */ }

// src/Warehouse/Endpoint/ReserveItemsEndpoint.php
use Sbooker\CommandBus\Endpoint;
final class ReserveItemsEndpoint implements Endpoint
{
    private WarehouseService $warehouseService;
    public function process(UuidInterface $id, object $payload): void
    {
        /** @var ReserveItems $payload */
        $this->warehouseService->reserve(/* ... */); // Может выбросить исключение
    }
}

2. Подписчик на событие (Process Manager):

// src/Order/Subscriber/OrderPaidSubscriber.php
use Sbooker\DomainEvents\DomainEventSubscriber;
use Sbooker\CommandBus\CommandBus;

final class OrderPaidSubscriber implements DomainEventSubscriber
{
    private CommandBus $commandBus;
    public function handleEvent(DomainEvent $event): void
    {
        if (!$event instanceof OrderPaid) return;
        $command = new ReserveItems($event->getOrderId(), $event->getItems());
        $this->commandBus->accept(Uuid::uuid4(), $command);
    }
    // ...
}

Общие шаги: Конфигурация и запуск воркера

Независимо от сценария, вам нужно настроить Registry и запустить воркер.

1. Конфигурация Registry:

// bootstrap.php или ваш DI-контейнер
use Sbooker\CommandBus\Registry\Containerized\ContainerizedRegistry;
use Sbooker\CommandBus\Registry\Containerized\ContainerAdapter;

/** @var Psr\Container\ContainerInterface $container */

$endpointsMap = new ContainerAdapter($container, null, [
    'send-email' => SendEmailEndpoint::class,
    'reserve-items' => ReserveItemsEndpoint::class,
]);
$timeoutsMap = new ContainerAdapter($container, 'default_timeout');
$registry = new ContainerizedRegistry($endpointsMap, $timeoutsMap);

2. Запуск воркера:

// worker.php
/** @var Sbooker\CommandBus\Handler $handler */
while (true) {
    if (!$handler->handleNext()) sleep(5);
}
// Рекомендуется использовать sbooker/event-loop-worker для более эффективной реализации

3. Настройка Doctrine: Вам нужно предоставить маппинг для сущности Sbooker\CommandBus\Command и ее embeddables, а также зарегистрировать кастомный тип для Status. Или использовать поставляемые с библиотекой

4. Так же в комплекте:

  • Консольная команда для ручного запуска обработки команды по ее идентификатору ExecuteCommand
  • Консольная команда для очистки отработанных команд Clean

License

See LICENSE file.

统计信息

  • 总下载量: 6.15k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 1
  • 点击次数: 0
  • 依赖项目数: 1
  • 推荐数: 0

GitHub 信息

  • Stars: 1
  • Watchers: 1
  • Forks: 1
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2020-06-04

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固