event-stream/laravel
最新稳定版本:v1.0.0
Composer 安装命令:
composer require event-stream/laravel
包简介
Subscribable, resumable server-sent event streams over Redis for Laravel, with an optional synced state-graph layer.
关键字:
README 文档
README
Subscribable, resumable server-sent event (SSE) streams over Redis for Laravel.
Define a stream once on the server, publish typed messages to it from anywhere in your application, and let any number of browser clients subscribe to it over a single SSE connection. The server tracks a per-stream resume cursor, so a client that drops and reconnects picks up exactly where it left off — no duplicated and no missed messages. An optional state-stream layer keeps a JSON object graph automatically synced from the server to every client.
This is the backend package. The companion frontend package is
event-stream-react. The
two communicate over a small, versioned wire protocol and should be kept on
matching major versions.
- Multi-stream multiplexing — one browser
EventSourcecan subscribe to many streams at once; each is tracked and resumed independently. - Resumable — every message carries a cursor; reconnects resume from the
last delivered message using the standard
Last-Event-IDmechanism. - Authorization per stream — each stream decides who may subscribe.
- Interceptors — transform or suppress messages on their way to the client.
- Preludes — hand a freshly connected client a snapshot before live messages begin.
- State streams — mirror a server-side JSON graph to clients via granular
set/delete/resetmutations instead of re-fetching.
Requirements
- PHP 8.2+
- Laravel 10, 11, or 12
- A Redis connection configured in
config/database.php(the streams are backed by Redis Streams) - A server setup that allows long-lived PHP responses (see Deployment notes)
Installation
composer require event-stream/laravel
The service provider is auto-discovered. Publish the config file:
php artisan vendor:publish --tag=event-stream-config
This writes config/event-stream.php.
Quick start
This quick-start section is shared between the backend and frontend packages so both halves of the picture are in one place.
1. Backend — define a stream
A stream is a class. It declares a pattern for its key, decides who can subscribe, and exposes typed methods that publish messages.
// app/Streams/JobProgressStream.php namespace App\Streams; use EventStream\EventStream; use Illuminate\Http\Request; class JobProgressStream extends EventStream { // {jobId} is a placeholder; a key like "job.progress:abc-123" // makes $this->args->jobId === "abc-123". const streamPattern = 'job.progress:{jobId}'; public function authorize(Request $request): bool { // Return true to allow the subscription, false for a 403. return $request->user() !== null; } // A typed publish method. `send()` returns the new Redis message id. public function sendProgress(int $percent): string { return $this->send('progress', ['percent' => $percent]); } }
2. Backend — register the stream
Add the class to config/event-stream.php:
'streams' => [ App\Streams\JobProgressStream::class, ],
3. Backend — expose the SSE route
Mount the bundled controller wherever you like, behind whatever middleware your app needs (the package intentionally does not register a route for you):
// routes/web.php use EventStream\Http\StreamSubscriptionController; Route::get('/streams', [StreamSubscriptionController::class, 'stream']) ->middleware(['web', 'auth']) ->name('streams.subscribe');
4. Backend — publish messages
From a job, controller, event listener — anywhere:
use App\Streams\JobProgressStream; JobProgressStream::make('abc-123')->sendProgress(50);
5. Frontend — wrap your app once
import { EventStreamProvider } from 'event-stream-react'; function Root() { return ( // `url` must match the route you mounted the controller on. <EventStreamProvider url="/streams"> <App /> </EventStreamProvider> ); }
6. Frontend — subscribe
import { useState } from 'react'; import { useSubscribeToStreams } from 'event-stream-react'; function JobProgress({ jobId }: { jobId: string }) { const [percent, setPercent] = useState(0); useSubscribeToStreams( `job.progress:${jobId}`, ({ message }) => { if (message.type === 'progress') { setPercent((message.data as { percent: number }).percent); } } ); return <div>{percent}% complete</div>; }
That is the whole round trip: publish on the server, receive in the browser, with resumable delivery handled for you.
State streams (synced graph)
For a server-owned object graph you want mirrored to clients without writing per-field event handling, use a state stream.
// Backend namespace App\Streams; use EventStream\StateStream; use Illuminate\Http\Request; class DocumentStateStream extends StateStream { const streamPattern = 'document:{documentId}'; public function authorize(Request $request): bool { return $request->user() !== null; } } // Mutate the graph anywhere — each call publishes a granular mutation: $doc = DocumentStateStream::make('abc-123'); $doc->setState('title', 'Quarterly report'); $doc->setState('author.name', 'Ada'); $doc->arrayAppend('comments', ['body' => 'Looks good']);
// Frontend — `state` stays current automatically. import { observer } from 'mobx-react-lite'; import { useStreamStateObject } from 'event-stream-react'; const Document = observer(({ id }: { id: string }) => { const doc = useStreamStateObject(`document:${id}`); return <h1>{String(doc.state.title ?? '')}</h1>; });
Backend reference
The rest of this document covers the backend in depth. For the frontend API,
see the event-stream-react
README.
How it works
- A browser opens one
EventSourceto your SSE route, naming one or more streams via?streams[]=<key>query parameters. - The controller resolves each key to a registered
EventStreamsubclass, callsauthorize()on each, and opens a single batched, blocking read across all of them against Redis. - As messages arrive they are run through each stream's interceptors and
written to the client as SSE frames. Every live frame carries a composite
id:(theLast-Event-ID) encoding a resume cursor per stream. - If the connection drops, the browser automatically reconnects and replays the
last
Last-Event-ID. The controller decodes it and resumes each stream from its own cursor.
Publishing is just an XADD to a Redis stream; subscribing is an XREAD. There
is no broker process to run — Redis is the entire transport.
Defining streams
The stream pattern
Every subclass declares a streamPattern constant. Only { and } are
special; everything else is a literal that must match exactly.
const streamPattern = 'job.progress:{jobId}'; // one placeholder const streamPattern = 'report:{reportId}:{format}'; // two placeholders const streamPattern = 'system.notifications'; // no placeholders
When a client subscribes to job.progress:abc-123, the registry finds the
class whose pattern matches and instantiates it with that key. Extracted
placeholder values are available as $this->args:
$this->args->jobId; // "abc-123"
Construct instances with make(), passing placeholder values left to right:
JobProgressStream::make('abc-123'); // job.progress:abc-123 ReportStream::make('rpt-99', 'pdf'); // report:rpt-99:pdf SystemNotificationsStream::make(); // system.notifications
Authorization
authorize() is required and runs once per subscription, with $this->args
already populated. Return false to reject with a 403.
public function authorize(Request $request): bool { return Job::query() ->where('id', $this->args->jobId) ->where('user_id', $request->user()?->id) ->exists(); }
Publishing
Wrap send() in typed methods so call sites read clearly. The first argument is
the message type (an arbitrary string your frontend switches on); the second
is the JSON-serializable data payload. send() returns the Redis message id.
public function sendCompleted(array $summary): string { return $this->send('completed', ['summary' => $summary]); }
The published wire message is { type, data }; the client receives it as
message.type and message.data (see Wire protocol).
Choosing where to start: initialCursor()
By default a new subscription only receives messages published after it
connects ('$'). Override initialCursor() to replay history on first connect:
public function initialCursor(): string { return '0-0'; // replay from the beginning of the Redis stream }
A reconnect with a Last-Event-ID always takes precedence over this — so
initialCursor() only governs the very first connection.
Preludes: a snapshot on connect
Return a payload from preludeMessage() to hand every freshly connected client
a one-off message before live messages begin. A prelude is server-synthesized
(not read from Redis) and never advances the resume cursor.
public function preludeMessage(): ?array { return ['snapshot' => $this->currentSnapshot()]; }
On the client this arrives with meta.isPrelude === true.
Interceptors
Interceptors transform or suppress messages on their way to the client. Register
them in registerInterceptors():
use EventStream\Messages\DeliveredStreamMessage; protected function registerInterceptors(): void { // Runs on every message: enrich the payload. $this->registerInterceptor(function (DeliveredStreamMessage $message) { return $message->withData([ ...$message->content->data, 'serverTime' => now()->toIso8601String(), ]); }); // Runs only on a specific type: hydrate, or suppress by returning null. $this->registerInterceptor('progress', function (DeliveredStreamMessage $message) { return $this->shouldHide($message) ? null : $message; }); }
Return a (possibly modified) DeliveredStreamMessage to forward it, or null
to drop it. Use withData() / withType() to produce modified copies.
Interceptors run in registration order, each receiving the previous one's output.
A suppressed message still advances the resume cursor, so it is never re-read.
Custom Redis key: redisStreamName()
The Redis stream defaults to the subscription key. Override it if the underlying key needs to differ:
public function redisStreamName(): string { return 'prod:' . $this->streamKey; }
State streams
StateStream extends EventStream to keep a JSON object graph synced to
clients. The graph is stored flattened into a single Redis hash (one field per
leaf, keyed by dot-notation path); every mutation also publishes a granular
event describing exactly what changed, so the client patches its local copy
instead of re-fetching.
Mutating the graph
$s = DocumentStateStream::make('abc-123'); $s->resetState(['title' => 'Draft', 'tags' => ['a', 'b']]); // replace whole graph $s->setState('title', 'Final'); // set one path $s->setState(['title' => 'Final', 'author.name' => 'Ada']); // set many at once $s->arrayAppend('comments', ['body' => 'Nice']); // append to an array $s->getState('tags'); // read back: ['a', 'b'] $s->getState(); // whole graph $s->deleteState('title'); // remove a path $s->deleteState(); // wipe the graph
Paths
Paths are dot-strings ('author.name') or segment arrays (['author', 'name']).
Use the array form when a segment legitimately contains a dot; internally dots
inside a segment are escaped so they round-trip. Setting a path first clears
anything stored beneath it, so replacing a subtree never leaves stale leaves.
Container types: jsonPathTypes()
Because the graph is stored flat, an empty node has no inherent array-vs-object
type. By default a node is rendered as an array only when its keys form a clean
0..n list. Override jsonPathTypes() to pin specific paths, using * as a
single-segment wildcard:
protected function jsonPathTypes(): array { return [ 'comments' => 'array', // always an array, even when empty 'author' => 'object', 'sections.*' => 'array', ]; }
The first matching pattern (in declaration order) wins. This map is shipped to the client in the prelude so it materializes empty containers with the same type.
Default state and TTL
// Shown by getState() (and to clients) when nothing is stored yet. protected function defaultState(): array { return ['status' => 'pending']; } // Sliding-window lifetime of the state hash; refreshed on every mutation. // Return null to keep state forever. Default: 3600 seconds. protected function stateTtlSeconds(): ?int { return 3600; }
The state hash key defaults to redisStreamName() . ':state'; override
stateHashKey() to change it.
Consuming a stream from the server
Sometimes the server itself needs to read a stream (e.g. a queued job that waits
for a particular message). Call connect():
$connection = JobProgressStream::make('abc-123')->connect(); while (true) { $message = $connection->next(blockMs: 1000); if ($message === null) { continue; // timed out this interval — heartbeat point } // handle $message (a DeliveredStreamMessage) }
Or block until a condition is met, with a total time budget:
$message = $connection->waitUntil( fn ($message) => $message->content->type === 'completed', deadlineMs: 30_000, ); // $message is null if the deadline passed without a match.
Configuration
config/event-stream.php:
return [ // EventStream subclasses the endpoint may resolve. Patterns must be unique. 'streams' => [ App\Streams\JobProgressStream::class, ], // ms the server blocks per read before sending a heartbeat comment. 'heartbeat_interval_ms' => 10_000, // Max seconds a connection stays open before asking the browser to // reconnect (it reconnects automatically). Default: 55 minutes. 'max_connection_seconds' => 55 * 60, ];
You may also pass an explicit list of stream classes to
new EventStreamRegistry([...]) directly — handy in tests.
Wire protocol
The backend and event-stream-react share this contract. Keep the two
packages on matching major versions.
Each SSE message frame's data: is JSON of the shape:
{
"meta": {
"streamKey": "job.progress:abc-123", // which stream this belongs to
"messageId": "1713000000000-0", // Redis id, or null for a prelude
"isPrelude": false // true for the on-connect snapshot
},
"message": {
"type": "progress", // your message type
"data": { "percent": 50 } // your payload
}
}
- The
Last-Event-ID(and thelastEventIdquery fallback) is a composite cursor:streamKey1=redisId1;streamKey2=redisId2. - Preludes carry
messageId: nulland never advance the cursor. - State-stream mutations use reserved types:
state_stream_mutation_reset,state_stream_mutation_set,state_stream_mutation_delete.
Deployment notes
An SSE response is a long-lived HTTP request. Make sure your stack allows it:
- Workers — each open connection holds one PHP worker (php-fpm/Octane) for its lifetime. Size your worker pool for the number of concurrent subscribers.
- Buffering — the controller sends
X-Accel-Buffering: noand disables output buffering. Ensure no proxy (nginx, load balancer) re-enables response buffering on this route. - Timeouts — raise proxy read timeouts above
max_connection_seconds, or rely on the built-in periodic reconnect. - Redis — the reader uses blocking
XREAD; ensure your Redis client/proxy read timeout exceedsheartbeat_interval_ms.
Publishing new versions
See PUBLISHING.md.
License
统计信息
- 总下载量: 1
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 3
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-06-10