定制 event-stream/laravel 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

event-stream/laravel

最新稳定版本:v1.0.0

Composer 安装命令:

composer require event-stream/laravel

包简介

Subscribable, resumable server-sent event streams over Redis for Laravel, with an optional synced state-graph layer.

README 文档

README

Subscribable, resumable server-sent event (SSE) streams over Redis for Laravel.

Define a stream once on the server, publish typed messages to it from anywhere in your application, and let any number of browser clients subscribe to it over a single SSE connection. The server tracks a per-stream resume cursor, so a client that drops and reconnects picks up exactly where it left off — no duplicated and no missed messages. An optional state-stream layer keeps a JSON object graph automatically synced from the server to every client.

This is the backend package. The companion frontend package is event-stream-react. The two communicate over a small, versioned wire protocol and should be kept on matching major versions.

  • Multi-stream multiplexing — one browser EventSource can subscribe to many streams at once; each is tracked and resumed independently.
  • Resumable — every message carries a cursor; reconnects resume from the last delivered message using the standard Last-Event-ID mechanism.
  • Authorization per stream — each stream decides who may subscribe.
  • Interceptors — transform or suppress messages on their way to the client.
  • Preludes — hand a freshly connected client a snapshot before live messages begin.
  • State streams — mirror a server-side JSON graph to clients via granular set / delete / reset mutations instead of re-fetching.

Requirements

  • PHP 8.2+
  • Laravel 10, 11, or 12
  • A Redis connection configured in config/database.php (the streams are backed by Redis Streams)
  • A server setup that allows long-lived PHP responses (see Deployment notes)

Installation

composer require event-stream/laravel

The service provider is auto-discovered. Publish the config file:

php artisan vendor:publish --tag=event-stream-config

This writes config/event-stream.php.

Quick start

This quick-start section is shared between the backend and frontend packages so both halves of the picture are in one place.

1. Backend — define a stream

A stream is a class. It declares a pattern for its key, decides who can subscribe, and exposes typed methods that publish messages.

// app/Streams/JobProgressStream.php
namespace App\Streams;

use EventStream\EventStream;
use Illuminate\Http\Request;

class JobProgressStream extends EventStream
{
    // {jobId} is a placeholder; a key like "job.progress:abc-123"
    // makes $this->args->jobId === "abc-123".
    const streamPattern = 'job.progress:{jobId}';

    public function authorize(Request $request): bool
    {
        // Return true to allow the subscription, false for a 403.
        return $request->user() !== null;
    }

    // A typed publish method. `send()` returns the new Redis message id.
    public function sendProgress(int $percent): string
    {
        return $this->send('progress', ['percent' => $percent]);
    }
}

2. Backend — register the stream

Add the class to config/event-stream.php:

'streams' => [
    App\Streams\JobProgressStream::class,
],

3. Backend — expose the SSE route

Mount the bundled controller wherever you like, behind whatever middleware your app needs (the package intentionally does not register a route for you):

// routes/web.php
use EventStream\Http\StreamSubscriptionController;

Route::get('/streams', [StreamSubscriptionController::class, 'stream'])
    ->middleware(['web', 'auth'])
    ->name('streams.subscribe');

4. Backend — publish messages

From a job, controller, event listener — anywhere:

use App\Streams\JobProgressStream;

JobProgressStream::make('abc-123')->sendProgress(50);

5. Frontend — wrap your app once

import { EventStreamProvider } from 'event-stream-react';

function Root() {
  return (
    // `url` must match the route you mounted the controller on.
    <EventStreamProvider url="/streams">
      <App />
    </EventStreamProvider>
  );
}

6. Frontend — subscribe

import { useState } from 'react';
import { useSubscribeToStreams } from 'event-stream-react';

function JobProgress({ jobId }: { jobId: string }) {
  const [percent, setPercent] = useState(0);

  useSubscribeToStreams(
    `job.progress:${jobId}`,
    ({ message }) => {
      if (message.type === 'progress') {
        setPercent((message.data as { percent: number }).percent);
      }
    }
  );

  return <div>{percent}% complete</div>;
}

That is the whole round trip: publish on the server, receive in the browser, with resumable delivery handled for you.

State streams (synced graph)

For a server-owned object graph you want mirrored to clients without writing per-field event handling, use a state stream.

// Backend
namespace App\Streams;

use EventStream\StateStream;
use Illuminate\Http\Request;

class DocumentStateStream extends StateStream
{
    const streamPattern = 'document:{documentId}';

    public function authorize(Request $request): bool
    {
        return $request->user() !== null;
    }
}

// Mutate the graph anywhere — each call publishes a granular mutation:
$doc = DocumentStateStream::make('abc-123');
$doc->setState('title', 'Quarterly report');
$doc->setState('author.name', 'Ada');
$doc->arrayAppend('comments', ['body' => 'Looks good']);
// Frontend — `state` stays current automatically.
import { observer } from 'mobx-react-lite';
import { useStreamStateObject } from 'event-stream-react';

const Document = observer(({ id }: { id: string }) => {
  const doc = useStreamStateObject(`document:${id}`);
  return <h1>{String(doc.state.title ?? '')}</h1>;
});

Backend reference

The rest of this document covers the backend in depth. For the frontend API, see the event-stream-react README.

How it works

  1. A browser opens one EventSource to your SSE route, naming one or more streams via ?streams[]=<key> query parameters.
  2. The controller resolves each key to a registered EventStream subclass, calls authorize() on each, and opens a single batched, blocking read across all of them against Redis.
  3. As messages arrive they are run through each stream's interceptors and written to the client as SSE frames. Every live frame carries a composite id: (the Last-Event-ID) encoding a resume cursor per stream.
  4. If the connection drops, the browser automatically reconnects and replays the last Last-Event-ID. The controller decodes it and resumes each stream from its own cursor.

Publishing is just an XADD to a Redis stream; subscribing is an XREAD. There is no broker process to run — Redis is the entire transport.

Defining streams

The stream pattern

Every subclass declares a streamPattern constant. Only { and } are special; everything else is a literal that must match exactly.

const streamPattern = 'job.progress:{jobId}';          // one placeholder
const streamPattern = 'report:{reportId}:{format}';    // two placeholders
const streamPattern = 'system.notifications';          // no placeholders

When a client subscribes to job.progress:abc-123, the registry finds the class whose pattern matches and instantiates it with that key. Extracted placeholder values are available as $this->args:

$this->args->jobId; // "abc-123"

Construct instances with make(), passing placeholder values left to right:

JobProgressStream::make('abc-123');           // job.progress:abc-123
ReportStream::make('rpt-99', 'pdf');          // report:rpt-99:pdf
SystemNotificationsStream::make();            // system.notifications

Authorization

authorize() is required and runs once per subscription, with $this->args already populated. Return false to reject with a 403.

public function authorize(Request $request): bool
{
    return Job::query()
        ->where('id', $this->args->jobId)
        ->where('user_id', $request->user()?->id)
        ->exists();
}

Publishing

Wrap send() in typed methods so call sites read clearly. The first argument is the message type (an arbitrary string your frontend switches on); the second is the JSON-serializable data payload. send() returns the Redis message id.

public function sendCompleted(array $summary): string
{
    return $this->send('completed', ['summary' => $summary]);
}

The published wire message is { type, data }; the client receives it as message.type and message.data (see Wire protocol).

Choosing where to start: initialCursor()

By default a new subscription only receives messages published after it connects ('$'). Override initialCursor() to replay history on first connect:

public function initialCursor(): string
{
    return '0-0'; // replay from the beginning of the Redis stream
}

A reconnect with a Last-Event-ID always takes precedence over this — so initialCursor() only governs the very first connection.

Preludes: a snapshot on connect

Return a payload from preludeMessage() to hand every freshly connected client a one-off message before live messages begin. A prelude is server-synthesized (not read from Redis) and never advances the resume cursor.

public function preludeMessage(): ?array
{
    return ['snapshot' => $this->currentSnapshot()];
}

On the client this arrives with meta.isPrelude === true.

Interceptors

Interceptors transform or suppress messages on their way to the client. Register them in registerInterceptors():

use EventStream\Messages\DeliveredStreamMessage;

protected function registerInterceptors(): void
{
    // Runs on every message: enrich the payload.
    $this->registerInterceptor(function (DeliveredStreamMessage $message) {
        return $message->withData([
            ...$message->content->data,
            'serverTime' => now()->toIso8601String(),
        ]);
    });

    // Runs only on a specific type: hydrate, or suppress by returning null.
    $this->registerInterceptor('progress', function (DeliveredStreamMessage $message) {
        return $this->shouldHide($message) ? null : $message;
    });
}

Return a (possibly modified) DeliveredStreamMessage to forward it, or null to drop it. Use withData() / withType() to produce modified copies. Interceptors run in registration order, each receiving the previous one's output. A suppressed message still advances the resume cursor, so it is never re-read.

Custom Redis key: redisStreamName()

The Redis stream defaults to the subscription key. Override it if the underlying key needs to differ:

public function redisStreamName(): string
{
    return 'prod:' . $this->streamKey;
}

State streams

StateStream extends EventStream to keep a JSON object graph synced to clients. The graph is stored flattened into a single Redis hash (one field per leaf, keyed by dot-notation path); every mutation also publishes a granular event describing exactly what changed, so the client patches its local copy instead of re-fetching.

Mutating the graph

$s = DocumentStateStream::make('abc-123');

$s->resetState(['title' => 'Draft', 'tags' => ['a', 'b']]); // replace whole graph
$s->setState('title', 'Final');                             // set one path
$s->setState(['title' => 'Final', 'author.name' => 'Ada']); // set many at once
$s->arrayAppend('comments', ['body' => 'Nice']);            // append to an array
$s->getState('tags');                                       // read back: ['a', 'b']
$s->getState();                                             // whole graph
$s->deleteState('title');                                   // remove a path
$s->deleteState();                                          // wipe the graph

Paths

Paths are dot-strings ('author.name') or segment arrays (['author', 'name']). Use the array form when a segment legitimately contains a dot; internally dots inside a segment are escaped so they round-trip. Setting a path first clears anything stored beneath it, so replacing a subtree never leaves stale leaves.

Container types: jsonPathTypes()

Because the graph is stored flat, an empty node has no inherent array-vs-object type. By default a node is rendered as an array only when its keys form a clean 0..n list. Override jsonPathTypes() to pin specific paths, using * as a single-segment wildcard:

protected function jsonPathTypes(): array
{
    return [
        'comments'   => 'array',  // always an array, even when empty
        'author'     => 'object',
        'sections.*' => 'array',
    ];
}

The first matching pattern (in declaration order) wins. This map is shipped to the client in the prelude so it materializes empty containers with the same type.

Default state and TTL

// Shown by getState() (and to clients) when nothing is stored yet.
protected function defaultState(): array
{
    return ['status' => 'pending'];
}

// Sliding-window lifetime of the state hash; refreshed on every mutation.
// Return null to keep state forever. Default: 3600 seconds.
protected function stateTtlSeconds(): ?int
{
    return 3600;
}

The state hash key defaults to redisStreamName() . ':state'; override stateHashKey() to change it.

Consuming a stream from the server

Sometimes the server itself needs to read a stream (e.g. a queued job that waits for a particular message). Call connect():

$connection = JobProgressStream::make('abc-123')->connect();

while (true) {
    $message = $connection->next(blockMs: 1000);
    if ($message === null) {
        continue; // timed out this interval — heartbeat point
    }
    // handle $message (a DeliveredStreamMessage)
}

Or block until a condition is met, with a total time budget:

$message = $connection->waitUntil(
    fn ($message) => $message->content->type === 'completed',
    deadlineMs: 30_000,
);
// $message is null if the deadline passed without a match.

Configuration

config/event-stream.php:

return [
    // EventStream subclasses the endpoint may resolve. Patterns must be unique.
    'streams' => [
        App\Streams\JobProgressStream::class,
    ],

    // ms the server blocks per read before sending a heartbeat comment.
    'heartbeat_interval_ms' => 10_000,

    // Max seconds a connection stays open before asking the browser to
    // reconnect (it reconnects automatically). Default: 55 minutes.
    'max_connection_seconds' => 55 * 60,
];

You may also pass an explicit list of stream classes to new EventStreamRegistry([...]) directly — handy in tests.

Wire protocol

The backend and event-stream-react share this contract. Keep the two packages on matching major versions.

Each SSE message frame's data: is JSON of the shape:

{
  "meta": {
    "streamKey": "job.progress:abc-123", // which stream this belongs to
    "messageId": "1713000000000-0",      // Redis id, or null for a prelude
    "isPrelude": false                   // true for the on-connect snapshot
  },
  "message": {
    "type": "progress",                  // your message type
    "data": { "percent": 50 }            // your payload
  }
}
  • The Last-Event-ID (and the lastEventId query fallback) is a composite cursor: streamKey1=redisId1;streamKey2=redisId2.
  • Preludes carry messageId: null and never advance the cursor.
  • State-stream mutations use reserved types: state_stream_mutation_reset, state_stream_mutation_set, state_stream_mutation_delete.

Deployment notes

An SSE response is a long-lived HTTP request. Make sure your stack allows it:

  • Workers — each open connection holds one PHP worker (php-fpm/Octane) for its lifetime. Size your worker pool for the number of concurrent subscribers.
  • Buffering — the controller sends X-Accel-Buffering: no and disables output buffering. Ensure no proxy (nginx, load balancer) re-enables response buffering on this route.
  • Timeouts — raise proxy read timeouts above max_connection_seconds, or rely on the built-in periodic reconnect.
  • Redis — the reader uses blocking XREAD; ensure your Redis client/proxy read timeout exceeds heartbeat_interval_ms.

Publishing new versions

See PUBLISHING.md.

License

MIT

统计信息

  • 总下载量: 1
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 3
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2026-06-10

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固