承接 danil-kashin/file-queue 相关项目开发

从需求分析到上线部署,全程专人跟进,保证项目质量与交付效率

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

danil-kashin/file-queue

最新稳定版本:v1.2.0

Composer 安装命令:

composer require danil-kashin/file-queue

包简介

A simple, file-based FIFO queue that keeps messages in order across restarts.

README 文档

README

A simple, file-based FIFO queue for PHP. Messages are persisted to disk using an append-only log with exclusive file locking, so the queue survives process restarts and is safe to use across multiple processes.

Requirements

  • PHP 8.1+
  • Extension: json

Installation

composer require danil-kashin/file-queue

Usage

1. Create a worker

Extend FileQueueWorker and implement processMessage(). Return true on success, false on failure. Failed messages are dequeued and not retried automatically.

The worker scans a directory for queue files and processes one message per tick, rotating across discovered queues. Each queue may contribute up to 10 messages per cycle (a full pass over all queues) before the worker moves on, which ensures equal resource distribution across queues and avoids the noisy-neighbor problem. The worker handles SIGTERM/SIGINT for graceful shutdown.

use DanilKashin\FileQueue\Queue\QueueMessage;
use DanilKashin\FileQueue\Workers\FileQueueWorker;

class OrderWorker extends FileQueueWorker
{
    protected function processMessage(QueueMessage $message): bool
    {
        return handleOrder($message->payload);
    }
}

For each processed message the worker prints + (success) or - (failure) to stdout.

Tuning

  • Override getMessagesPerQueueLimit() to change the per-queue per-cycle limit (default: 10).
  • Pass maxTicks as the second constructor argument to stop the worker after a fixed number of ticks — useful for tests, supervised one-shot runs, or external schedulers that prefer to recycle the process periodically.
class OrderWorker extends FileQueueWorker
{
    protected function getMessagesPerQueueLimit(): int
    {
        return 50;
    }

    protected function processMessage(QueueMessage $message): bool
    {
        return handleOrder($message->payload);
    }
}

$worker = new OrderWorker(queuesDir: '/var/queues', maxTicks: 1000);
$worker->run();

2. Run the worker

The worker is a long-running process — run it in the background so your application keeps responding. Progress is written to stdout (+ / - per message and . per tick); errors go to stderr.

vendor/bin/run_worker "App\Workers\OrderWorker" --queuesDir=/var/queues > worker.log 2>&1 &

3. Enqueue messages

Messages can be enqueued from anywhere in your application as long as they share the same baseDir. No running worker is required at enqueue time; messages will be picked up on the next tick.

use DanilKashin\FileQueue\Queue\FileQueue;
use DanilKashin\FileQueue\Queue\QueueMessage;

$queue = new FileQueue(queueName: 'orders', baseDir: '/var/queues');
$queue->enqueue(new QueueMessage(['order_id' => 42, 'status' => 'pending']));

Architecture

flowchart LR
    App["Your App"] -->|"enqueue(QueueMessage)"| FileQueue
    FileQueue -->|"dequeue()"| FileQueueWorker
    FileQueueWorker -->|"processMessage(QueueMessage)"| YourWorker["Your Worker"]
Loading

If you need to supply queues from a custom source (e.g. a database-backed list of queue names), extend QueueWorker directly and implement both getQueues() and processMessage().

Queue API

$queue = new FileQueue(queueName: 'orders', baseDir: '/var/queues');

// Write
$queue->enqueue(new QueueMessage(['order_id' => 42]));

// Read
$message = $queue->dequeue(); // QueueMessage|null
if ($message !== null) {
    $payload = $message->payload; // ['order_id' => 42]
}

// Inspect
$queue->isEmpty(); // bool
$queue->size();    // int — counts remaining (unconsumed) messages

// Cleanup
$queue->compact();

Compaction

Over time the data file grows as messages are appended and consumed. compact() rewrites the file to contain only unread messages, reclaiming disk space.

No need to call it regularly if your FileQueueWorker runs continuously and the queues are emptied regularly. When the last message is dequeued, all associated files are removed immediately, so no manual cleanup is needed.

How it works

Each queue is backed by three files:

File Purpose
{name}.queue.data Append-only binary log of framed records
{name}.queue.pointer Read offset — tracks the next unread message
{name}.queue.lock Exclusive lock file — guards all mutations

Records are size-prefixed with a 4-byte big-endian length header followed by the JSON payload. Reads advance the pointer without touching the data file. compact() rewrites the data file to drop consumed records. All mutations are guarded by an exclusive lock.

Exceptions

Class When thrown
QueueException I/O failure (unreadable file, write error, …)
CorruptedQueueException Truncated or structurally invalid record

CorruptedQueueException extends QueueException, so catching QueueException covers both.

统计信息

  • 总下载量: 46
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 3
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: BSD-3-Clause
  • 更新时间: 2026-03-29

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固