承接 rasuvaeff/yii3-outbox 相关项目开发

从需求分析到上线部署,全程专人跟进,保证项目质量与交付效率

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

rasuvaeff/yii3-outbox

最新稳定版本:v1.0.0

Composer 安装命令:

composer require rasuvaeff/yii3-outbox

包简介

Transactional outbox pattern for Yii3

README 文档

README

Stable Version Total Downloads Build Static analysis Psalm Level PHP License

Transactional outbox pattern implementation for Yii3. Provides a stateless core for reliably publishing messages with configurable retry policies.

Using an AI coding assistant? llms.txt has a compact API reference you can use.

Requirements

  • PHP 8.3+
  • psr/clock ^1.0
  • psr/log ^3.0

Installation

composer require rasuvaeff/yii3-outbox

Usage

Recording a message

use DateTimeImmutable;
use Psr\Clock\ClockInterface;
use Rasuvaeff\Yii3Outbox\InMemoryStorage;
use Rasuvaeff\Yii3Outbox\Outbox;

$clock = new class implements ClockInterface {
    public function now(): DateTimeImmutable { return new DateTimeImmutable(); }
};

$outbox = new Outbox(storage: $storage, clock: $clock);

$message = $outbox->record(
    type: 'order.created',
    payload: json_encode(['orderId' => 42]),
    aggregateId: 'order-42',
);

Implementing storage

use Rasuvaeff\Yii3Outbox\StorageInterface;
use Rasuvaeff\Yii3Outbox\OutboxMessage;

final class DbStorage implements StorageInterface
{
    public function save(OutboxMessage $message): void
    {
        // INSERT INTO outbox ... ON CONFLICT(id) DO UPDATE ...
    }

    public function findPending(array $types = [], int $limit = 1000): array
    {
        // SELECT * FROM outbox WHERE status = 'pending'
        //   [AND type IN (:types)] LIMIT :limit  -- empty $types = all types
        // For retry support, also return status = 'pending' with attempts > 0
    }

    public function markPublished(OutboxMessage $message): void
    {
        // UPDATE outbox SET status = 'published' WHERE id = ?
    }

    public function markFailed(OutboxMessage $message): void
    {
        // UPDATE outbox SET status = 'failed' WHERE id = ?
    }

    public function getById(string $id): ?OutboxMessage
    {
        // SELECT * FROM outbox WHERE id = ?
    }
}

Implementing a publisher

use Rasuvaeff\Yii3Outbox\PublisherInterface;
use Rasuvaeff\Yii3Outbox\OutboxMessage;
use Rasuvaeff\Yii3Outbox\PublishException;

final class RabbitPublisher implements PublisherInterface
{
    public function publish(OutboxMessage $message): void
    {
        try {
            // publish to RabbitMQ, Kafka, etc.
        } catch (\Throwable $e) {
            throw new PublishException(
                message: $e->getMessage(),
                outboxMessage: $message,
                previous: $e,
            );
        }
    }
}

Processing the outbox

use Rasuvaeff\Yii3Outbox\Processor;
use Rasuvaeff\Yii3Outbox\RetryPolicy;

$processor = new Processor(
    storage: $storage,
    publisher: $publisher,
    retryPolicy: new RetryPolicy(maxAttempts: 3, delaySeconds: 60),
    clock: $clock,
    batchSize: 100,
);

$result = $processor->process();
// $result->published — successfully published
// $result->failed   — publish exceptions (message kept Pending if retries remain)
// $result->skipped  — not yet ready for retry

Retry behaviour

When a publish fails:

  • If attempts < maxAttempts → message stays Pending, will be retried after delaySeconds
  • If attempts >= maxAttempts → message is marked Failed (terminal)
$policy = new RetryPolicy(maxAttempts: 3, delaySeconds: 60);

$policy->shouldRetry($message);          // bool — attempts remaining?
$policy->isReadyForRetry($message, $now); // bool — delay elapsed?

Using InMemoryStorage for tests

use Rasuvaeff\Yii3Outbox\InMemoryStorage;

$storage = new InMemoryStorage();
$storage->save($message);

$pending = $storage->findPending();
$storage->count();
$storage->clear();

API reference

Outbox

Method Description
__construct(storage, clock) Main entry point
record(type, payload, aggregateId?) Create and persist message, returns OutboxMessage

OutboxMessage

Method Description
create(type, payload, aggregateId?, createdAt?) Factory with auto-generated ID
getId() Message ID (32-char hex)
getType() Message type
getPayload() Raw payload string
getStatus() OutboxStatus enum
getCreatedAt() DateTimeImmutable
getAttempts() Number of publish attempts
getLastAttemptAt() ?DateTimeImmutable
getAggregateId() ?string
withStatus(status) Returns new instance with status
withAttempt(at) Returns new instance with incremented attempts and timestamp

OutboxStatus

Case Value
Pending 'pending'
Published 'published'
Failed 'failed'

RetryPolicy

Method Description
__construct(maxAttempts, delaySeconds) Default: 3 attempts, 60s delay
shouldRetry(message) Checks attempt count
isReadyForRetry(message, now) Checks attempts + delay elapsed

Processor

Method Description
__construct(storage, publisher, retryPolicy, clock, batchSize, logger) Default batch: 100
process() Returns ProcessingResult

ProcessingResult

Property/Method Description
$published Count of successfully published messages
$failed Count of publish exceptions this run
$skipped Count of messages not ready for retry
total() Sum of all counters

Serializer

Method Description
serialize(message) Message to JSON
deserialize(data) JSON to Message

Security

  • Storage implementations must use parameterized queries for all user values.
  • Message payload is stored as-is; validate before saving if needed.

Examples

See examples/ for complete usage examples.

Development

make install
make build
make cs-fix
make test
make test-coverage
make mutation
make release-check

make test-coverage and make mutation bootstrap pcov inside the composer:2 container because the base image has no coverage driver.

License

BSD-3-Clause. See LICENSE.md.

统计信息

  • 总下载量: 0
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 2
  • 依赖项目数: 3
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: BSD-3-Clause
  • 更新时间: 2026-06-12

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固