danil-kashin/file-queue
最新稳定版本:v1.2.0
Composer 安装命令:
composer require danil-kashin/file-queue
包简介
A simple, file-based FIFO queue that keeps messages in order across restarts.
README 文档
README
A simple, file-based FIFO queue for PHP. Messages are persisted to disk using an append-only log with exclusive file locking, so the queue survives process restarts and is safe to use across multiple processes.
Requirements
- PHP 8.1+
- Extension:
json
Installation
composer require danil-kashin/file-queue
Usage
1. Create a worker
Extend FileQueueWorker and implement processMessage(). Return true on success, false on failure. Failed messages are dequeued and not retried automatically.
The worker scans a directory for queue files and processes one message per tick, rotating across discovered queues. Each queue may contribute up to 10 messages per cycle (a full pass over all queues) before the worker moves on, which ensures equal resource distribution across queues and avoids the noisy-neighbor problem. The worker handles SIGTERM/SIGINT for graceful shutdown.
use DanilKashin\FileQueue\Queue\QueueMessage; use DanilKashin\FileQueue\Workers\FileQueueWorker; class OrderWorker extends FileQueueWorker { protected function processMessage(QueueMessage $message): bool { return handleOrder($message->payload); } }
For each processed message the worker prints + (success) or - (failure) to stdout.
Tuning
- Override
getMessagesPerQueueLimit()to change the per-queue per-cycle limit (default: 10). - Pass
maxTicksas the second constructor argument to stop the worker after a fixed number of ticks — useful for tests, supervised one-shot runs, or external schedulers that prefer to recycle the process periodically.
class OrderWorker extends FileQueueWorker { protected function getMessagesPerQueueLimit(): int { return 50; } protected function processMessage(QueueMessage $message): bool { return handleOrder($message->payload); } } $worker = new OrderWorker(queuesDir: '/var/queues', maxTicks: 1000); $worker->run();
2. Run the worker
The worker is a long-running process — run it in the background so your application keeps responding. Progress is written to stdout (+ / - per message and . per tick); errors go to stderr.
vendor/bin/run_worker "App\Workers\OrderWorker" --queuesDir=/var/queues > worker.log 2>&1 &
3. Enqueue messages
Messages can be enqueued from anywhere in your application as long as they share the same baseDir. No running worker is required at enqueue time; messages will be picked up on the next tick.
use DanilKashin\FileQueue\Queue\FileQueue; use DanilKashin\FileQueue\Queue\QueueMessage; $queue = new FileQueue(queueName: 'orders', baseDir: '/var/queues'); $queue->enqueue(new QueueMessage(['order_id' => 42, 'status' => 'pending']));
Architecture
flowchart LR
App["Your App"] -->|"enqueue(QueueMessage)"| FileQueue
FileQueue -->|"dequeue()"| FileQueueWorker
FileQueueWorker -->|"processMessage(QueueMessage)"| YourWorker["Your Worker"]
Loading
If you need to supply queues from a custom source (e.g. a database-backed list of queue names), extend QueueWorker directly and implement both getQueues() and processMessage().
Queue API
$queue = new FileQueue(queueName: 'orders', baseDir: '/var/queues'); // Write $queue->enqueue(new QueueMessage(['order_id' => 42])); // Read $message = $queue->dequeue(); // QueueMessage|null if ($message !== null) { $payload = $message->payload; // ['order_id' => 42] } // Inspect $queue->isEmpty(); // bool $queue->size(); // int — counts remaining (unconsumed) messages // Cleanup $queue->compact();
Compaction
Over time the data file grows as messages are appended and consumed. compact() rewrites the file to contain only unread messages, reclaiming disk space.
No need to call it regularly if your FileQueueWorker runs continuously and the queues are emptied regularly. When the last message is dequeued, all associated files are removed immediately, so no manual cleanup is needed.
How it works
Each queue is backed by three files:
| File | Purpose |
|---|---|
{name}.queue.data |
Append-only binary log of framed records |
{name}.queue.pointer |
Read offset — tracks the next unread message |
{name}.queue.lock |
Exclusive lock file — guards all mutations |
Records are size-prefixed with a 4-byte big-endian length header followed by the JSON payload. Reads advance the pointer without touching the data file. compact() rewrites the data file to drop consumed records. All mutations are guarded by an exclusive lock.
Exceptions
| Class | When thrown |
|---|---|
QueueException |
I/O failure (unreadable file, write error, …) |
CorruptedQueueException |
Truncated or structurally invalid record |
CorruptedQueueException extends QueueException, so catching QueueException covers both.
统计信息
- 总下载量: 46
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 3
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: BSD-3-Clause
- 更新时间: 2026-03-29