idct/symfony-nats-messenger
Composer 安装命令:
composer require idct/symfony-nats-messenger
包简介
Symfony NATS (JetStream) Messenger Bridge
README 文档
README
A Symfony Messenger transport integration for NATS JetStream, enabling reliable asynchronous messaging with persistent message streaming.
Features
- 🚀 High-Performance Messaging - Leverage NATS JetStream for fast, reliable message delivery
- 📦 Symfony Integration - Implements Messenger's
TransportInterface,MessageCountAwareInterface,SetupableTransportInterface,KeepaliveReceiverInterface, andCloseableTransportInterface - ⚙️ Configurable Consumers - Support for multiple consumer strategies
- 🔄 Flexible Batching - Adjustable message batch sizes and timeouts
- 🔐 Authentication Support - Built-in support for NATS authentication
- 📊 Stream Configuration - Configurable retention policies and replication
- 🧪 Thoroughly Tested - 303 unit tests, ~99.6% coverage, mutation-tested (100% MSI)
🚀 This project looks for funding. Love my work? Support it! 💖
-
☕ Buy me a coffee: https://buymeacoffee.com/idct
-
💝 Sponsor: https://github.com/sponsors/ideaconnect
Requirements
System Requirements
- PHP: ^8.2
- Symfony: ^7.2 || ^8
- NATS Server: ^2.9 with JetStream enabled, ^2.12 for scheduled messages support.
Installation
composer require idct/symfony-nats-messenger
Development Setup
For contributors and development:
# Install dependencies composer install # Run static analysis and the default unit test suite after every modification composer test # Start NATS server for testing composer nats:start # Run unit tests with coverage composer test:unit # Set up functional tests composer test:functional:setup # Run functional tests composer test:functional # Stop NATS server composer nats:stop
Quick Start
1. Configure NATS Server
Ensure your NATS server has JetStream enabled:
nats-server -js
2. Set Up Transport in Symfony
Add the NATS transport to your Symfony Messenger configuration:
# config/packages/messenger.yaml framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic' options: consumer: 'my-consumer' batching: 5 max_batch_timeout: 1.0 routing: 'App\Message\MyAsyncMessage': nats_transport
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: quick-start transport],testReadmeConfigurationOptionsAreAccepted,testReadmeBatchingExamplesAreAccepted,testReadmeTimeoutExamplesAreAccepted
3. Configure Custom Serializers (Optional)
This transport ships with a high-performance IgbinarySerializer, but under the Symfony framework it is not selected automatically. Symfony Messenger always resolves a serializer and passes it to the transport factory - its framework default is the native PhpSerializer - so to use igbinary you must set the transport's serializer: key explicitly (shown below). The transport's own igbinary auto-selection (and the PhpSerializer fallback, with an E_USER_WARNING, when ext-igbinary is unavailable) only applies when you construct NatsTransport directly without passing a serializer, e.g. outside the framework.
Using IgbinarySerializer (recommended)
# config/packages/messenger.yaml framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic' serializer: 'IDCT\NatsMessenger\Serializer\IgbinarySerializer' options: consumer: 'my-consumer'
Register the serializer service the serializer: key refers to. For example:
igbinary_serializer: class: IDCT\NatsMessenger\Serializer\IgbinarySerializer
or:
IDCT\NatsMessenger\Serializer\IgbinarySerializer: ~
Tested by:
createTransport_UsesProvidedSerializer,serialize_WithValidEnvelope_ReturnsSerializedString,decode_WithValidEncodedEnvelope_ReturnsEnvelope,testConstructorWithoutIgbinaryDoesNotCrash
Creating Custom Serializers
You can create your own serializer by extending AbstractEnveloperSerializer:
use IDCT\NatsMessenger\Serializer\AbstractEnveloperSerializer; use Symfony\Component\Messenger\Envelope; class MyCustomSerializer extends AbstractEnveloperSerializer { protected function serialize(Envelope $envelope): string { // Your custom serialization logic return serialize($envelope); } protected function deserialize(string $data): mixed { // Your custom deserialization logic return unserialize($data); } }
Tested by:
readmeCustomSerializerExample_EncodeDecode_RoundTrips,readmeCustomSerializerExample_DecodeInvalidBody_ThrowsException- the exact code above is compiled and exercised viaReadmeExampleSerializerin the unit tests.
For reference implementations, see:
src/Serializer/IgbinarySerializer.php- Binary serializationsrc/Serializer/AbstractEnveloperSerializer.php- Base class
4. Send Messages
use App\Message\MyAsyncMessage; use Symfony\Component\Messenger\MessageBusInterface; class MyController { public function __construct(private MessageBusInterface $bus) {} public function send(): void { $this->bus->dispatch(new MyAsyncMessage('Hello NATS!')); } }
Tested by:
testSendPublishesEncodedBodyWithoutHeaders,testSendUsesPublishWithHeadersWhenHeadersArePresent, Behat scenarioComplete message flow - send, check stats, consume, verify
5. Handle Messages
use App\Message\MyAsyncMessage; use Symfony\Component\Messenger\Attribute\AsMessageHandler; #[AsMessageHandler] class MyAsyncMessageHandler { public function __invoke(MyAsyncMessage $message): void { echo "Processing: " . $message->getText(); } }
Tested by: Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andHigh-volume message processing with file output verification- handlers are exercised through realmessenger:consumeruns.
6. Consume Messages
symfony console messenger:consume nats_transport
Tested by: Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andPartial message consumption with multiple consumers- the Behat context runsmessenger:consumeas a Symfony CLI process.
Configuration Guide
DSN Format
nats-jetstream://[user:password@]host:port/stream-name/topic-name
Tested by:
testBuildWithValidDsnReturnsConfiguration,testBuildWithoutPathThrowsException,testBuildWithoutTopicThrowsException,createTransport_WithValidDsn_ReturnsNatsTransportInstance
Examples:
# Default port (4222) nats-jetstream://localhost/my-stream/my-topic # Custom port nats-jetstream://localhost:5000/my-stream/my-topic # With authentication nats-jetstream://user:password@localhost:4222/my-stream/my-topic # With query parameters nats-jetstream://localhost/my-stream/my-topic?consumer=worker&batching=10 # TLS transport scheme nats-jetstream+tls://localhost:4222/my-stream/my-topic
Tested by:
testReadmeDsnExamplesParseSuccessfully- each DSN above is parsed through the configuration builder via a dedicated data provider case.
Configuration Options
framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic' options: # Consumer Configuration consumer: 'my-consumer' # Consumer group name (default: 'client') # Performance Tuning batching: 5 # Messages per batch (default: 1) max_batch_timeout: 1.0 # Timeout in seconds for batch fetching (default: 1) connection_timeout: 1.0 # Connection (dial) timeout in seconds (default: 1) # Stream Retention Policies stream_max_age: 86400 # Max message age in seconds (0 = unlimited, default: 0) stream_max_bytes: 1073741824 # Max storage size in bytes (null = unlimited) stream_max_messages: 1000000 # Max number of messages in the stream (null = unlimited) stream_max_messages_per_subject: 1000 # Max number of messages retained per subject (null = unlimited) # Storage Backend stream_storage: 'file' # Storage type: 'file' or 'memory' (default: 'file') # High Availability stream_replicas: 1 # Number of replicas (default: 1) # Failure Handling Strategy retry_handler: 'symfony' # symfony|nats (default: symfony) # symfony => TERM on failed/rejected message # nats => NAK on failed/rejected message # NATS-native Redelivery Tuning (mainly relevant with retry_handler: nats) nak_delay: 0 # Seconds to wait before NATS redelivers a NAK'd # message (default: 0 = immediate). Use to back off # instead of hot-looping a failing message. ack_wait: null # Seconds JetStream waits for an ACK before redelivering # (default: null = server default, ~30s). Raise it for # slow handlers to avoid premature redelivery. max_deliver: null # Max redelivery attempts before NATS gives up # (default: null = unlimited). Set it to stop a poison # message redelivering forever under retry_handler: nats. backoff: null # List of per-attempt delays in seconds, e.g. [1, 5, 30]. # Pairs with max_deliver. (default: null) # Acknowledgement Mode ack_sync: false # Wait for server confirmation of each ACK (default: false) # false => fire-and-forget ACK (lower latency) # true => JetStream double-ack; a dropped ACK cannot # silently cause redelivery, at a latency cost # Scheduled / Delayed Messages (requires NATS >= 2.12) scheduled_messages: false # Enable scheduled message support (default: false) # When enabled, Symfony DelayStamp triggers NATS # scheduled message delivery via Nats-Schedule headers # TLS Configuration tls_required: false # Force TLS for NATS connection (default: false) tls_handshake_first: false # Use TLS-first handshake mode (default: false) tls_ca_file: null # Path to CA certificate file tls_cert_file: null # Path to client certificate file tls_key_file: null # Path to client private key tls_key_passphrase: null # Passphrase for encrypted private key tls_peer_name: null # Override TLS peer name for certificate validation tls_verify_peer: true # Verify TLS peer certificate (default: true) # Additional Authentication token: null # NATS token authentication username: null # Overrides DSN username if provided password: null # Overrides DSN password if provided jwt: null # JWT authentication value nkey: null # NKey public value
Tested by:
testReadmeConfigurationOptionsAreAccepted(all options above),testReadmeBatchingExamplesAreAccepted,testReadmeTimeoutExamplesAreAccepted,testReadmeStreamRetentionExamplesAreAccepted,testBuildWithTlsAndAuthOptionsPropagatesToNatsOptions
Retry Handler Behavior
retry_handler: symfony(default) sendsTERMwhen a message fails during transport decoding or is rejected. Symfony's retry/failure transport then handles redelivery.retry_handler: natssendsNAKwhen a message fails during transport decoding or is rejected, so NATS redelivers the message itself.
When NATS manages redelivery (retry_handler: nats), tune it with nak_delay, ack_wait, max_deliver, and backoff:
nak_delaydelays each NAK so a failing message backs off instead of redelivering immediately (a hot loop).max_delivercaps redeliveries. ⚠️ Without it,retry_handler: natsredelivers a permanently-failing ("poison") message forever - setmax_deliverin production.backoffsets an escalating per-attempt delay schedule (e.g.[1, 5, 30]seconds); pair it withmax_delivergreater than the number of backoff steps.ack_waitis how long JetStream waits for an ACK before considering a delivery failed and redelivering - raise it for handlers that legitimately take a while.
Tested by:
testRejectUsesTermByDefault,testRejectUsesNakWhenRetryHandlerIsNats,testHandleFailedDeliveryUsesNakWithDelayWhenConfigured,testSetupAppliesConsumerRetryTuning,testBuildAcceptsNatsRetryTuningOptions,testBuildUsesRetryHandlerFromQuery, Behat scenariosnats_nak.featureandnats_term.feature
Important: Consumer Strategies
This is critical to understand before setting up multiple transport instances:
⚠️ Strategy A: Same Consumer, Batching = 1
Use when: Multiple instances should cooperate on the same consumer
# All instances use the same consumer with batching=1 transports: nats_worker_1: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'shared-consumer' # Same consumer name batching: 1 # MUST be 1 for shared consumers nats_worker_2: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'shared-consumer' # Same consumer name batching: 1 # MUST be 1 for shared consumers
Why batching must be 1:
- With explicit acknowledge (ACK) mode, only messages that are explicitly acknowledged are considered processed
- Multiple instances sharing the same consumer need to ACK individually
- Batching > 1 with multiple instances causes delivery conflicts
- Each instance should fetch and ACK one message at a time
Benefits:
- Automatic load balancing across instances
- NATS handles message distribution
- Guaranteed single processing per message
Tested by:
testReadmeBatchingExamplesAreAccepted(batching=1), Behat scenarioPartial message consumption with multiple consumers
✅ Strategy B: Different Consumers, Any Batching
Use when: Each instance needs independent message processing (duplicates allowed)
# Each instance uses a different consumer transports: nats_worker_1: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'worker-1-consumer' # Unique consumer per instance batching: 10 # Can use any batching nats_worker_2: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'worker-2-consumer' # Unique consumer per instance batching: 10 # Can use any batching
Why this works:
- Each consumer maintains its own state
- All messages are delivered to all consumers independently
- Each instance can use higher batching for better throughput
- Duplicate processing is expected (fan-out pattern)
Use cases:
- Event broadcasting to multiple systems
- Multiple independent processors
- Audit logging / event replay
Tested by:
testReadmeBatchingExamplesAreAccepted(batching=10), Behat scenarioPartial message consumption with multiple consumers
Batching & Timeouts
Batching Explained
- Higher batching: Better throughput, slightly higher latency
- Lower batching: Lower latency, slightly reduced throughput
- Optimal batching: Depends on message size and processing time
options: batching: 1 # Fetch 1 message at a time (low latency) batching: 5 # Fetch 5 messages (balanced) batching: 20 # Fetch 20 messages (high throughput)
Tested by:
testReadmeBatchingExamplesAreAccepted- values 1, 5, 10, 20, 50 are all verified.
Batch Timeout
Controls how long to wait for a batch to fill:
options: batching: 10 max_batch_timeout: 0.5 # Wait max 0.5s for batch to fill # Returns early if timeout reached
Tested by:
testReadmeTimeoutExamplesAreAccepted- values 0.5, 1.0, 2.0 are verified. Behat scenariosnats_batching.feature.
Example scenarios:
- If you set
batching: 10andmax_batch_timeout: 0.5 - If 10 messages arrive quickly, all are fetched immediately
- If only 3 messages arrive in 0.5s, return those 3
Connection Timeout
Controls the timeout for establishing the NATS connection (initial and reconnect dial attempts):
options: connection_timeout: 2.0 # Connection (dial) timeout in seconds
Tested by:
testReadmeTimeoutExamplesAreAccepted(1.0, 2.0, 3.0),testBuildWithConnectionTimeoutPropagatesMs
Purpose:
- Sets the timeout for the initial TCP/TLS dial and handshake when connecting to NATS
- Does not govern per-operation read/write timeouts (publish/ack/request keep the client's own request timeout); the batch fetch is bounded separately by
max_batch_timeout - Lower values fail faster on connection issues
- Higher values tolerate slower connection establishment
When to adjust:
- Increase for high-latency networks or geographically distant NATS servers
- Decrease for faster failure detection in local environments
- Default of 1 second works well for most local/regional deployments
- Don't wait forever for the batch to fill
Stream Configuration
Retention Policies
Control how long messages are kept in the stream:
options: # By age (24 hours) stream_max_age: 86400 # By total size (1GB) stream_max_bytes: 1073741824 # By total message count across the entire stream (NATS: max_msgs) stream_max_messages: 1000000 # By message count per individual subject (NATS: max_msgs_per_subject) stream_max_messages_per_subject: 1000 # Unlimited (default) stream_max_age: 0 stream_max_bytes: null stream_max_messages: null stream_max_messages_per_subject: null
Tested by:
testReadmeStreamRetentionExamplesAreAccepted- all retention options above are verified. Behat scenariosnats_stream_limits.feature.
Note:
stream_max_messageslimits the total number of messages stored in the stream (maps to NATSmax_msgs), whilestream_max_messages_per_subjectlimits messages retained per individual subject (maps to NATSmax_msgs_per_subject). The per-subject limit is especially useful with multi-subject streams to prevent one high-volume subject from dominating retention.
High Availability
options: # Single replica (no redundancy) stream_replicas: 1 # 3 replicas (recommended for production) stream_replicas: 3
Tested by:
testReadmeStreamRetentionExamplesAreAccepted(replicas 1 and 3),testSetupPassesConfiguredStreamOptions
Testing
Unit Tests
# Install dependencies composer install # Run static analysis and the fast unit suite after every modification composer test # Run NATS composer nats:start # Run all unit tests with coverage (recommended) composer test:unit # Or run tests manually ./vendor/bin/phpunit
The target is to have at least 90% of code coverage.
What's tested:
- DSN parsing and validation
- Configuration option handling
- Authentication support
- Port configuration
- Error handling
- Interface compliance
Mutation Tests
The unit suite is mutation-tested with Infection to ensure the tests actually detect behavioral changes (not just execute lines):
# Requires a coverage driver (xdebug or pcov)
composer test:mutation
Configuration lives in infection.json5. It enforces a minimum MSI of 90% and a minimum covered MSI of
95%; the suite currently scores 100% covered MSI with 100% mutation code coverage. CI runs it on the
PHP 8.5 job.
Functional Tests
Functional tests require a running NATS server with JetStream enabled:
# Set up functional test dependencies composer test:functional:setup # Start NATS server in Docker composer nats:start # Run functional tests composer test:functional # Stop NATS server composer nats:stop
Manual approach:
# Set up NATS in Docker (optional) cd tests/nats docker-compose up -d # Run functional tests cd ../functional ./vendor/bin/behat features/ # Stop NATS cd ../nats docker-compose down
What's tested:
- Message publishing
- Message consumption
- Message acknowledgment
- Consumer setup
- Stream persistence
See also: tests/functional/README.md
Advanced Usage
Multiple Transports
Set up multiple independent transports for different use cases:
framework: messenger: transports: # High-priority, low-latency messages nats_fast: dsn: 'nats-jetstream://localhost/fast-stream/fast-topic' options: consumer: 'fast-consumer' batching: 1 # Bulk processing, high throughput nats_bulk: dsn: 'nats-jetstream://localhost/bulk-stream/bulk-topic' options: consumer: 'bulk-consumer' batching: 50 # Audit logging nats_audit: dsn: 'nats-jetstream://localhost/audit-stream/audit-topic' options: consumer: 'audit-consumer' stream_max_age: 2592000 # 30 days stream_replicas: 3
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: fast transport],testReadmeDsnExamplesParseSuccessfully[README: bulk transport],testReadmeDsnExamplesParseSuccessfully[README: audit transport],testReadmeAuditTransportOptionsAreAccepted,testReadmeBatchingExamplesAreAccepted
Multi-Subject Streams
Multiple transports can share the same NATS stream with different subjects. When messenger:setup-transports runs, each transport adds its subject to the existing stream rather than overwriting it:
framework: messenger: transports: # Both transports share the "events" stream nats_orders: dsn: 'nats-jetstream://localhost/events/orders' options: consumer: 'order-consumer' batching: 1 stream_max_age: 300 nats_payments: dsn: 'nats-jetstream://localhost/events/payments' options: consumer: 'payment-consumer' batching: 2
The events stream will have both orders and payments as subjects.
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: multi-subject orders],testReadmeDsnExamplesParseSuccessfully[README: multi-subject payments],testReadmeMultiSubjectOptionsAreAccepted,testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenarioSetup command merges subjects for transports sharing one stream
Note: When a stream already exists, setup reads the current JetStream configuration, merges in any new subjects, and then overlays the stream settings managed by this transport. Existing subjects are preserved, duplicate subjects are not added, and the existing storage backend is kept for already-created streams.
Setup on Initialization
Automatically create streams and consumers on first run:
framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'my-consumer'
Then call setup command:
symfony console messenger:setup-transports nats_transport
Tested by:
testSetupCreatesStreamAndConsumer,testSetupPassesConfiguredStreamOptions,testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenariosSetup NATS stream with max age configuration,Setup command handles existing streams gracefully, andCustom consumer name is registered in JetStream
Delayed / Scheduled Messages
Requires NATS Server >= 2.12 with JetStream enabled. If scheduled_messages is enabled against an
older server, messenger:setup-transports fails with a clear, actionable error telling you to upgrade
NATS or disable the option.
Enable scheduled_messages in the DSN to use Symfony's DelayStamp for deferred delivery:
framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost/my-stream/my-topic?scheduled_messages=true'
Then dispatch messages with a delay:
use Symfony\Component\Messenger\Stamp\DelayStamp; // Deliver after 30 seconds $bus->dispatch(new MyMessage(), [new DelayStamp(30000)]);
Tested by:
testSendWithDelayStampPublishesToDelayedSubjectWithScheduleHeaders,testSendDelayedMessageSchedulesAtRequestedDelay,testSendDelayedMessageNeverSchedulesBeforeRequestedDelay,testReadmeScheduledMessagesDsnEnablesFeature, Behat scenariosDelayed messages are delivered after the scheduled timeandDelayed messages are not available to the consumer before the scheduled time
When scheduled_messages is enabled and a DelayStamp is present:
- The message is published to a
{topic}.delayed.{uuid}subject withNats-ScheduleandNats-Schedule-Targetheaders - The stream is created with an additional
{topic}.delayed.>subject andallow_msg_schedulesenabled - NATS holds the message and delivers it to the original topic at the scheduled time
- The consumer processes it like any other message
The DelayStamp delay (milliseconds) maps onto a NATS @at schedule, which has whole-second
resolution. The delay is rounded up to the next whole second, so a message is never delivered
before the requested delay elapses (it may arrive up to ~1 second later); a sub-second delay therefore
schedules at the next whole second rather than firing immediately.
When scheduled_messages is disabled (the default), any DelayStamp on the envelope is silently ignored and messages are published immediately.
This will:
- Create the stream with configured settings
- Create the consumer with explicit ACK policy
- Verify consumer creation
Stream Monitoring
View stream and consumer information:
# List streams nats stream list # View stream info nats stream info my-stream # List consumers nats consumer list my-stream # View consumer info nats consumer info my-stream my-consumer # View message count nats consumer info my-stream my-consumer --json | jq '.state.num_pending'
Manual Message Operations
use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\TransportInterface; // Get message count $count = $transport->getMessageCount(); // Check if messages are pending if ($count > 0) { echo "Pending messages: $count"; }
Tested by:
testGetMessageCountReturnsConsumerPendingMessages,testGetMessageCountFallsBackToStreamState,testGetMessageCountReturnsZeroWhenLookupsFail,testGetMessageCountSumsAckPendingAndPending
Troubleshooting
Connection Issues
Error: "Connection refused"
# Check NATS is running nats-server --js # Verify host and port nats-jetstream://localhost:4222/stream/topic
Tested by: Behat scenario
Setup command fails gracefully when NATS is unavailable
Error: "Stream not found"
# Run setup command to create stream
symfony console messenger:setup-transports nats_transport
Tested by:
testSetupCreatesStreamAndConsumer,testSetupUpdatesStreamWhenItAlreadyExists, Behat scenariosSetup NATS stream with max age configurationandSetup command handles existing streams gracefully
Message Processing Issues
Messages not being consumed
# Check consumer exists nats consumer list my-stream # View consumer status nats consumer info my-stream my-consumer # Check for errors in consumer nats consumer info my-stream my-consumer --json | jq '.state'
Messages stuck in pending
# Check handler is not throwing exceptions # Verify handler implementation # Check application logs for errors
Architecture
The bridge consists of two main components:
NatsTransportFactory
- Handles DSN scheme detection (
nats-jetstream://) - Creates
NatsTransportinstances - Validates configuration
NatsTransport
- Implements Symfony's
TransportInterface,MessageCountAwareInterface,SetupableTransportInterface,KeepaliveReceiverInterface, andCloseableTransportInterface - Manages stream and consumer connections
- Handles message serialization via a pluggable
SerializerInterface(igbinary when constructed directly without one) - Supports batching and explicit acknowledgment
Performance Tips
-
Choose appropriate batching
- Start with
batching: 5for balanced performance - Increase to 20+ for high throughput workloads
- Use 1 for strict low-latency requirements
- Start with
-
Set reasonable timeouts
max_batch_timeout: 0.5for responsive systemsmax_batch_timeout: 2.0for background jobsconnection_timeout: 1.0for local/regional deploymentsconnection_timeout: 3.0+for cross-region or high-latency networks
-
Use appropriate replicas
stream_replicas: 1for developmentstream_replicas: 3for production
-
Monitor performance
- Use
getMessageCount()to track queue depth - Monitor handler execution time
- Watch for stuck messages
- Use
Security Considerations
⚠️ Deserialization of Untrusted Data
The default IgbinarySerializer (and any serializer extending AbstractEnveloperSerializer) deserializes raw message payloads from NATS into PHP objects. PHP object unserialization is a well-known attack vector - a crafted payload can trigger arbitrary code execution via magic methods (__wakeup, __destruct, etc.).
⚠️ PhpSerializer fallback: when no serializer is configured and
ext-igbinaryis not installed, the transport automatically falls back to Symfony'sPhpSerializer, which uses nativeunserialize()- the same untrusted-deserialization (object injection) risk as igbinary, not a safer alternative. The transport emits anE_USER_WARNINGwhen this happens. Do not rely on the fallback in production: either installext-igbinaryor explicitly configure a serializer (ideally a safe-format one, per below).
If your NATS topics are not fully trusted (e.g. shared infrastructure, external publishers), you should:
- Implement a custom serializer that uses a safe format (JSON, Protobuf) instead of PHP object serialization
- Add message-level authentication (e.g. HMAC signatures) to verify publisher identity before deserializing
- Restrict NATS topic publish permissions via ACLs so only trusted services can publish
The type check (instanceof Envelope) happens after deserialization, which is too late to prevent exploitation.
Stream-Exists Detection During Setup
During setup(), if createStream fails the transport detects a pre-existing stream deterministically by querying JetStream stream info: a 404 means the stream is absent (so the creation error was genuine and is rethrown), while a successful lookup means the stream exists (so it is updated, reusing the fetched configuration). This relies on the JetStream stream-info API rather than matching server-specific conflict strings ("already in use" / "already exists"), whose wording varies across NATS versions.
If you experience unexpected behavior during stream setup, confirm the stream can be queried via JetStream stream-info APIs and review the exact error returned by your NATS server version.
Publish Response Validation
On send(), the transport awaits the JetStream publish acknowledgement returned by the client's publish() call. The client validates that acknowledgement and raises a JetStreamException if JetStream reports an error or returns an empty/malformed response, so a proxy or protocol misconfiguration fails closed instead of silently accepting an invalid publish acknowledgement.
General Recommendations
-
Authentication
- Prefer environment variables or explicit options for credentials over hard-coded DSNs
- If you use credentials in a DSN, avoid logging the full DSN because it may expose secrets
- Store credentials in environment variables
- Never commit credentials to version control
-
Message Encryption
- Encrypt sensitive data before dispatching
- NATS can be configured with TLS for transit encryption
- Implement application-level encryption for sensitive payloads
-
Access Control
- Restrict stream/consumer creation to authorized users
- Use NATS access control lists (ACLs) for fine-grained permissions
- Audit stream operations
Contributing
Contributions are welcome! Please ensure:
- Every modification runs the relevant verification commands before it is considered done
- Minimum verification for PHP changes:
composer test - All tests pass:
composer test:unit - Code coverage remains above 90%
- New features include corresponding tests
- Documentation is updated
- Functional tests pass:
composer test:functional(if applicable) docs/TESTS.mdis kept up to date when tests are added, removed, or renamed- Each release has an entry in
docs/CHANGELOG.mdfollowing Keep a Changelog format - When a PR is merged or its features are adapted, a description is added to
docs/PRs/
Quick Development Workflow
# 1. Run static analysis and the default unit suite after each modification composer test # 2. Set up functional tests (first time only) composer test:functional:setup # 3. Start NATS for functional tests composer nats:start # 4. Run functional tests composer test:functional # 5. Clean up composer nats:stop
License
MIT License - see LICENSE file for details
Support
For issues, questions, or suggestions:
- Check the troubleshooting section
- Check existing issues on GitHub
- Create a new issue with detailed information
统计信息
- 总下载量: 6.77k
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 21
- 点击次数: 5
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2025-11-02