yangusik/thrun-laravel
最新稳定版本:v0.2.0
Composer 安装命令:
composer require yangusik/thrun-laravel
包简介
Laravel adapter for Thrun async queue worker
README 文档
README
Laravel adapter for the async queue worker Thrun. Built on real OS threads (TrueAsync) and provides two workflow styles: clean architecture (recommended) and self-handling jobs (for simple tasks).
Benchmarks
Measured on WSL2, 8GB RAM, PHP 8.6 TrueAsync fork:
| Scenario | Config | Jobs | Time | Throughput | RSS |
|---|---|---|---|---|---|
| Horizon IO | 12 workers | 1,000 | 12.1s | 83/s | 872 MB |
| Thrun IO | 1 thread, 100 coroutines | 1,000 | 2.3s | 434/s | 80 MB |
| Horizon IO | 12 workers | 10,000 | 55.0s | 182/s | 1019 MB |
| Thrun IO | 1 thread, 100 coroutines | 10,000 | 6.3s | 1580/s | 84 MB |
| Horizon CPU | 12 workers | 100 | 18.4s | 5.4/s | 1022 MB |
| Thrun CPU | 12 threads | 100 | 16.3s | 6.1/s | 100 MB |
| Horizon CPU | 12 workers | 1,000 | 162.6s | 6.2/s | 1023 MB |
| Thrun CPU | 12 threads | 1,000 | 139.5s | 7.2/s | 101 MB |
| Horizon NOOP | 12 workers | 1,000 | 5.0s | 198/s | 656 MB |
| Thrun NOOP | 12 threads | 1,000 | 2.3s | 434/s | 103 MB |
TrueAsync 12x10 uses 17x less RSS than Horizon 12 workers, 11x more IO throughput.
Installation
composer require yangusik/thrun-laravel
Publish the configuration file:
php artisan vendor:publish --tag=thrun-config
Configuration (config/thrun.php)
Redis
'redis' => [ 'host' => env('THRUN_REDIS_HOST', '127.0.0.1'), 'port' => (int) env('THRUN_REDIS_PORT', 6379), 'prefix' => env('THRUN_REDIS_PREFIX', 'thrun:queue'), 'timeout' => 1.0, ],
Note:
prefixaffects Redis keys. Default isthrun:queue, but you can change it when multiple environments share the same Redis instance.
Queues
Each queue is a separate transport. Currently supported: redis and memory.
'queues' => [ 'emails' => ['transport' => 'redis'], 'notifications' => ['transport' => 'redis'], ],
Supervisors
Each supervisor is an isolated worker group with its own queues, strategy, and policies.
'supervisors' => [ 'default' => [ 'queues' => ['emails', 'notifications'], 'worker' => ['threads' => 2, 'concurrency' => 100], 'supervisor'=> ['max_crashes' => 3, 'restart_window' => 300, 'restart_backoff' => 1.0], 'strategy' => ['class' => PriorityStrategy::class, 'priorities' => ['emails' => 3, 'notifications' => 1]], 'policy' => ['enabled' => false, 'class' => MaxConcurrencyPolicy::class, 'options' => ['max_per_partition' => 5]], 'handlers' => [], // manual routing ], ],
Failed Jobs
When a message exhausts all retries, it is sent to the global failed job store (configured separately from supervisors):
'failed' => [ 'driver' => env('THRUN_FAILED_DRIVER', 'redis'), 'redis' => [ 'prefix' => env('THRUN_FAILED_PREFIX', 'thrun:failed'), ], ],
Supported drivers: redis (default) and null (no-op).
You can register custom failed job drivers:
use Thrun\Laravel\Transport\TransportFactory; $factory = app(TransportFactory::class); $factory->extendFailed('database', function (array $config) { return new DatabaseFailedJobSender( table: $config['database']['table'] ?? 'thrun_failed_jobs', ); });
Auto-discover
'auto_discover' => [ 'App\\Handlers', // regular Handler classes 'App\\Jobs', // self-handling Job classes ],
Two Workflow Styles
1. Clean Architecture (recommended) — Message + Handler
Message is a DTO with data. Handler is an invokable class with business logic. Pure Symfony-style separation of concerns.
Message:
namespace App\Messages; use Thrun\Laravel\Handler\Attribute\Delay; use Thrun\Laravel\Handler\Attribute\Queue; use Thrun\Laravel\Handler\Attribute\Retry; #[Queue('emails')] #[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)] #[Delay(5000)] final readonly class SendEmailMessage { public function __construct( public string $to, public string $subject, ) {} }
Handler:
namespace App\Handlers; use App\Messages\SendEmailMessage; use Thrun\Laravel\Handler\AsThrunHandler; use Thrun\Worker\Acknowledger; #[AsThrunHandler] // auto-wired to SendEmailMessage final class SendEmailHandler { public function __construct(private MailerInterface $mailer) {} public function __invoke(SendEmailMessage $message, Acknowledger $ack): void { $this->mailer->send($message->to, $message->subject); $ack->ack(); } }
Dispatch:
use Thrun\Laravel\Bus\ThrunMessageBus; use Thrun\Laravel\Bus\DispatchOptions; $bus->dispatch(new SendEmailMessage('user@test.com', 'Hello')); // or with option override: $bus->dispatch( new SendEmailMessage('user@test.com', 'Hello'), 'emails', new DispatchOptions(delayMs: 10_000, messageId: 'email-42'), );
2. Alternative — Self-handling Job
A single class acts as both Message and Handler. Data goes in the constructor, logic in __invoke(). For simple tasks when you don't want extra files.
namespace App\Jobs; use Thrun\Laravel\Handler\Attribute\Queue; use Thrun\Laravel\Handler\Attribute\Retry; use Thrun\Laravel\Handler\Attribute\ThrunJob; use Thrun\Worker\Acknowledger; #[ThrunJob] #[Queue('emails')] #[Retry(backoff: [1000, 2000, 4000], maxAttempts: 3)] final readonly class SendEmailJob { public function __construct( public string $to, public string $subject, ) {} public function __invoke(MailerInterface $mailer, Acknowledger $ack): void { $mailer->send($this->to, $this->subject); $ack->ack(); } }
// queue is taken from #[Queue] attribute $bus->dispatch(new SendEmailJob('user@test.com', 'Hello')); // or override: $bus->dispatch(new SendEmailJob('user@test.com', 'Hello'), 'urgent-emails');
Important: the constructor accepts scalar data only (
int,string,array, etc.) — these values get serialized to Redis. Never inject services or objects into the constructor. All services are injected via DI in__invoke()— LaravelContainer::call()resolves them automatically.
Attributes
| Attribute | Purpose | Applies to |
|---|---|---|
#[ThrunJob] |
Self-handling job marker | Job |
#[Queue('emails')] |
Default queue for the message | Job / Message |
#[Retry(backoff: [...], maxAttempts: 3)] |
Retry policy | Job / Message |
#[Delay(5000)] |
Delay in ms | Job / Message |
#[Timeout(30000)] |
Hard execution timeout | Job / Message |
#[AsThrunHandler(messageClass: ...)] |
Explicit Handler → Message binding | Handler |
Dispatch priority:
dispatch()argument (explicit)#[Queue]attribute on class'default'
Acknowledger
Acknowledger is the explicit processing acknowledgement object.
public function __invoke(MyMessage $message, Acknowledger $ack): void { // ... logic ... $ack->ack(); // confirm success // $ack->nack(); // reject (goes to retry or failure transport) }
Recommendation: always accept
Acknowledger $ackexplicitly and call$ack->ack(). This gives you full control over the message lifecycle.
Auto-discover
Class scanning happens automatically for namespaces listed in auto_discover.
Handlers
#[AsThrunHandler]— explicit binding tomessageClass- Naming convention:
SendEmailHandler→SendEmailMessage
Self-handling Jobs
#[ThrunJob]on an invokable class — the class registers as its own handler
Middleware
You can register worker middleware per supervisor via config. Classes are resolved through the Laravel container (constructor injection is supported).
'supervisors' => [ 'default' => [ // ... 'middleware' => [ \App\Middleware\LogMiddleware::class, \App\Middleware\MetricsMiddleware::class, ], ], ],
A middleware must implement WorkerMiddlewareInterface from the core thrun package:
namespace App\Middleware; use Thrun\Worker\Acknowledger; use Thrun\Contract\WorkerMiddlewareInterface; final class LogMiddleware implements WorkerMiddlewareInterface { public function handle(object $message, Acknowledger $ack, \Closure $next): void { try { $next($message, $ack); } catch (\Throwable $e) { // log, metrics, etc. throw $e; } } }
Failed Jobs & CLI
When a message exhausts all retries, it is persisted to the failed job store (configured in config/thrun.php under failed).
List failed jobs
php artisan thrun:failed php artisan thrun:failed --queue=emails php artisan thrun:failed --limit=100
Show details of a failed job
php artisan thrun:failed:show 019e9c83-c3d7-7216-b37d-04b1c154a5c8
Shows: type, queue, exception, message, file, line, full trace, payload, stamps.
Retry a failed job
php artisan thrun:retry 019e9c83-c3d7-7216-b37d-04b1c154a5c8 php artisan thrun:retry --all
Retry creates a new message with a fresh JobIdStamp but preserves MessageIdStamp.
Flush failed jobs
php artisan thrun:failed:flush
Flush queues
# Flush a specific queue (ready, processing, delayed) php artisan thrun:flush emails # Flush all configured queues php artisan thrun:flush # Flush queues + failed jobs php artisan thrun:flush --failed
Running the Worker
# All supervisors php artisan thrun:work # Single supervisor php artisan thrun:work --supervisor=default # With stats php artisan thrun:work --stats
DispatchOptions
Explicit stamp control when dispatching (overrides class attributes):
use Thrun\Laravel\Bus\DispatchOptions; $bus->dispatch($message, 'emails', new DispatchOptions( messageId: 'uuid-42', delayMs: 5000, retryBackoff: [1000, 2000, 4000], maxAttempts: 3, timeoutMs: 30000, ));
For edge cases you can use dispatchCustom() with a ready-made Envelope:
use Thrun\Envelope\Envelope; use Thrun\Envelope\Stamp\QueueStamp; $bus->dispatchCustom( Envelope::wrap($message, new QueueStamp('custom')), 'emails', );
Message IDs
You can generate dynamic message IDs directly from the message payload using IdentifiableMessage:
use Thrun\Laravel\Contract\IdentifiableMessage; #[Queue('emails')] final readonly class SendEmailMessage implements IdentifiableMessage { public function __construct( public string $to, public string $subject, public int $userId, public int $productId, ) {} public function getId(): string { return "{$this->userId}-{$this->productId}"; } }
$bus->dispatch(new SendEmailMessage('a@b.com', 'Hi', 42, 7)); // messageId = "42-7" automatically
Priority:
DispatchOptions->messageIdIdentifiableMessage->getId()null(no ID)
Fluent Builder
For quick one-off overrides without creating a DispatchOptions object:
$bus->builder() ->id('custom-42') ->retry([1000, 2000], 3) ->delay(5000) ->timeout(30000) ->send($message, 'emails');
Extending Transports
You can register custom transports (e.g. RabbitMQ) via closures or config drivers.
Closure driver
use Thrun\Laravel\Transport\TransportFactory; $factory = app(TransportFactory::class); $factory->extend('rabbitmq', function (string $name, array $config) { return new RabbitMQTransport( host: $config['host'], port: $config['port'], queue: $name, ); });
Config-based driver
'queues' => [ 'orders' => [ 'transport' => 'custom', 'driver' => \App\Transport\RabbitMQTransport::class, 'host' => 'localhost', ], ],
The factory will try to resolve the class via Laravel Container::make() with ['name' => ..., 'config' => ...], or fall back to new $driverClass($name, $config).
Requirements
- PHP (TrueAsync Core) ^8.6
- Laravel ^11.0
- ext-async (TrueAsync extension)
- phpredis (TrueAsync fork) (if using Redis transport)
统计信息
- 总下载量: 0
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 3
- 点击次数: 2
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-06-05