rasuvaeff/yii3-outbox
最新稳定版本:v1.0.0
Composer 安装命令:
composer require rasuvaeff/yii3-outbox
包简介
Transactional outbox pattern for Yii3
README 文档
README
Transactional outbox pattern implementation for Yii3. Provides a stateless core for reliably publishing messages with configurable retry policies.
Using an AI coding assistant? llms.txt has a compact API reference you can use.
Requirements
- PHP 8.3+
psr/clock^1.0psr/log^3.0
Installation
composer require rasuvaeff/yii3-outbox
Usage
Recording a message
use DateTimeImmutable; use Psr\Clock\ClockInterface; use Rasuvaeff\Yii3Outbox\InMemoryStorage; use Rasuvaeff\Yii3Outbox\Outbox; $clock = new class implements ClockInterface { public function now(): DateTimeImmutable { return new DateTimeImmutable(); } }; $outbox = new Outbox(storage: $storage, clock: $clock); $message = $outbox->record( type: 'order.created', payload: json_encode(['orderId' => 42]), aggregateId: 'order-42', );
Implementing storage
use Rasuvaeff\Yii3Outbox\StorageInterface; use Rasuvaeff\Yii3Outbox\OutboxMessage; final class DbStorage implements StorageInterface { public function save(OutboxMessage $message): void { // INSERT INTO outbox ... ON CONFLICT(id) DO UPDATE ... } public function findPending(array $types = [], int $limit = 1000): array { // SELECT * FROM outbox WHERE status = 'pending' // [AND type IN (:types)] LIMIT :limit -- empty $types = all types // For retry support, also return status = 'pending' with attempts > 0 } public function markPublished(OutboxMessage $message): void { // UPDATE outbox SET status = 'published' WHERE id = ? } public function markFailed(OutboxMessage $message): void { // UPDATE outbox SET status = 'failed' WHERE id = ? } public function getById(string $id): ?OutboxMessage { // SELECT * FROM outbox WHERE id = ? } }
Implementing a publisher
use Rasuvaeff\Yii3Outbox\PublisherInterface; use Rasuvaeff\Yii3Outbox\OutboxMessage; use Rasuvaeff\Yii3Outbox\PublishException; final class RabbitPublisher implements PublisherInterface { public function publish(OutboxMessage $message): void { try { // publish to RabbitMQ, Kafka, etc. } catch (\Throwable $e) { throw new PublishException( message: $e->getMessage(), outboxMessage: $message, previous: $e, ); } } }
Processing the outbox
use Rasuvaeff\Yii3Outbox\Processor; use Rasuvaeff\Yii3Outbox\RetryPolicy; $processor = new Processor( storage: $storage, publisher: $publisher, retryPolicy: new RetryPolicy(maxAttempts: 3, delaySeconds: 60), clock: $clock, batchSize: 100, ); $result = $processor->process(); // $result->published — successfully published // $result->failed — publish exceptions (message kept Pending if retries remain) // $result->skipped — not yet ready for retry
Retry behaviour
When a publish fails:
- If attempts <
maxAttempts→ message staysPending, will be retried afterdelaySeconds - If attempts >=
maxAttempts→ message is markedFailed(terminal)
$policy = new RetryPolicy(maxAttempts: 3, delaySeconds: 60); $policy->shouldRetry($message); // bool — attempts remaining? $policy->isReadyForRetry($message, $now); // bool — delay elapsed?
Using InMemoryStorage for tests
use Rasuvaeff\Yii3Outbox\InMemoryStorage; $storage = new InMemoryStorage(); $storage->save($message); $pending = $storage->findPending(); $storage->count(); $storage->clear();
API reference
Outbox
| Method | Description |
|---|---|
__construct(storage, clock) |
Main entry point |
record(type, payload, aggregateId?) |
Create and persist message, returns OutboxMessage |
OutboxMessage
| Method | Description |
|---|---|
create(type, payload, aggregateId?, createdAt?) |
Factory with auto-generated ID |
getId() |
Message ID (32-char hex) |
getType() |
Message type |
getPayload() |
Raw payload string |
getStatus() |
OutboxStatus enum |
getCreatedAt() |
DateTimeImmutable |
getAttempts() |
Number of publish attempts |
getLastAttemptAt() |
?DateTimeImmutable |
getAggregateId() |
?string |
withStatus(status) |
Returns new instance with status |
withAttempt(at) |
Returns new instance with incremented attempts and timestamp |
OutboxStatus
| Case | Value |
|---|---|
Pending |
'pending' |
Published |
'published' |
Failed |
'failed' |
RetryPolicy
| Method | Description |
|---|---|
__construct(maxAttempts, delaySeconds) |
Default: 3 attempts, 60s delay |
shouldRetry(message) |
Checks attempt count |
isReadyForRetry(message, now) |
Checks attempts + delay elapsed |
Processor
| Method | Description |
|---|---|
__construct(storage, publisher, retryPolicy, clock, batchSize, logger) |
Default batch: 100 |
process() |
Returns ProcessingResult |
ProcessingResult
| Property/Method | Description |
|---|---|
$published |
Count of successfully published messages |
$failed |
Count of publish exceptions this run |
$skipped |
Count of messages not ready for retry |
total() |
Sum of all counters |
Serializer
| Method | Description |
|---|---|
serialize(message) |
Message to JSON |
deserialize(data) |
JSON to Message |
Security
- Storage implementations must use parameterized queries for all user values.
- Message payload is stored as-is; validate before saving if needed.
Examples
See examples/ for complete usage examples.
Development
make install
make build
make cs-fix
make test
make test-coverage
make mutation
make release-check
make test-coverage and make mutation bootstrap pcov inside the
composer:2 container because the base image has no coverage driver.
License
BSD-3-Clause. See LICENSE.md.
统计信息
- 总下载量: 0
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 2
- 依赖项目数: 3
- 推荐数: 0
其他信息
- 授权协议: BSD-3-Clause
- 更新时间: 2026-06-12