zaeem2396 / laravel-nats
Laravel NATS - v2 basis stack (basis-company/nats), subscriber, NatsV2 JetStream helpers, queue driver, legacy JetStream, Artisan commands
Requires
- php: ^8.2
- ext-json: *
- basis-company/nats: ^1.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- laravel/framework: ^10.0|^11.0|^12.0
- mockery/mockery: ^1.6
- orchestra/testbench: ^8.0|^9.0|^10.0
- pestphp/pest: ^2.0|^3.0
- pestphp/pest-plugin-laravel: ^2.0|^3.0
- phpstan/phpstan: ^1.10
This package is auto-updated.
Last update: 2026-03-31 19:35:53 UTC
README
A native NATS integration for Laravel that feels like home. Publish, subscribe, and request/reply with a familiar, expressive API.
NatsV2 (basis stack, package 1.3.0+ pub/sub; 1.4.0+ JetStream on the basis client,
nats_basisqueue driver, optional idempotency, and observability includingnats:ping; 1.5.0+ security validation, TLS production guard, optional subject ACL,nats:v2:config:validate): Laravel wrapper on basis-company/nats. Docs: Guide · Subscriber · JetStream · Queue · Idempotency · Observability · Security · FAQ · Migration. The legacyNatsfacade API for subscribe, queue, and JetStream is also documented in this README.
Requirements
- PHP 8.2+ (required for package and basis-company/nats)
- Laravel 10.x, 11.x, or 12.x
- NATS Server 2.x
Installation
composer require zaeem2396/laravel-nats
- v1.1.1+ for Phase 4 queue/worker commands (
nats:work,nats:consume). - v1.3.0+ for
NatsV2::publish/NatsV2::subscribe,ConnectionManager, JSON envelope (config/nats_basis.php),InboundMessage, andnats:v2:listen. - v1.4.0+ (first release after 1.3.0 on Packagist) for
NatsV2::jetstream(),jetStreamPublish/jetStreamPull, stream presets,nats:v2:jetstream:*commands (docs/v2/JETSTREAM.md), thenats_basisqueue driver (docs/v2/QUEUE.md), optional idempotency (docs/v2/IDEMPOTENCY.md), and observability (docs/v2/OBSERVABILITY.md). - v1.5.0+ for config validation on boot, TLS expectations in production, optional subject ACL for v2 publish/subscribe/JetStream publish, and
php artisan nats:v2:config:validate(docs/v2/SECURITY.md).
The service provider is auto-discovered. To publish configuration (includes nats_basis for v2):
php artisan vendor:publish --tag=nats-config
Configuration
Configure your NATS connection in config/nats.php or via environment variables:
NATS_HOST=localhost NATS_PORT=4222 NATS_USER= NATS_PASSWORD= NATS_TOKEN= # v2 basis client (`NatsV2`): password env is NATS_PASS (see docs/v2/MIGRATION.md) # NATS_PASS=
NatsV2 foundation (basis-company/nats, package 1.3.0+)
Full write-up: docs/v2/GUIDE.md, FAQ.
NatsV2 adds a Laravel wrapper on basis-company/nats: configuration, NatsV2, and the JSON envelope live in this package; the dependency handles the wire protocol. Settings go in config/nats_basis.php (merged automatically; publish with nats-config to get the file on disk).
Envelope (JSON body): { "id": "<uuid>", "type": "<subject>", "version": "v1", "data": { ... } }
use LaravelNats\Laravel\Facades\NatsV2; NatsV2::publish('orders.created', ['order_id' => 123], ['X-Request-Id' => 'abc']); // Underlying: Basis\Nats\Client - use NatsV2::connection() for advanced APIs.
NatsV2 JetStream (basis client, package 1.4.0+)
Use NatsV2::jetstream() for Basis\Nats\Api, synchronous stream publish with optional envelope, one-shot pull batches, config presets, and CLI helpers. See docs/v2/JETSTREAM.md (legacy Nats::jetstream() remains on the native client).
Multiple Connections
// config/nats.php 'connections' => [ 'default' => [ 'host' => env('NATS_HOST', 'localhost'), 'port' => (int) env('NATS_PORT', 4222), ], 'secondary' => [ 'host' => env('NATS_SECONDARY_HOST', 'nats-2.example.com'), 'port' => 4222, ], ],
Quick Start
Publishing Messages
use LaravelNats\Laravel\Facades\Nats; // Publish with array payload (auto-serialized to JSON) Nats::publish('orders.created', [ 'order_id' => 123, 'customer' => 'John Doe', ]); // Publish to a specific connection Nats::connection('secondary')->publish('events', $data);
Subscribing to Messages
use LaravelNats\Laravel\Facades\Nats; // Subscribe to a subject Nats::subscribe('orders.*', function ($message) { $payload = $message->getDecodedPayload(); logger('Order received', $payload); }); // Process incoming messages Nats::process(1.0); // Wait up to 1 second for messages
Queue Groups (Load Balancing)
// Messages are distributed across subscribers in the same queue group Nats::subscribe('orders.process', function ($message) { // Process order }, 'order-workers');
Request/Reply
use LaravelNats\Laravel\Facades\Nats; // Send a request and wait for a reply $reply = Nats::request('users.get', ['id' => 42], timeout: 5.0); $user = $reply->getDecodedPayload();
Wildcards
NATS supports two wildcards for subscriptions:
*matches a single token:orders.*matchesorders.created,orders.updated>matches one or more tokens:orders.>matchesorders.created,orders.us.created
Nats::subscribe('logs.>', function ($message) { // Receives all log messages });
Queue Driver
Use NATS as a Laravel queue backend. Two drivers are available:
nats— legacyLaravelNats\Core\Client(connection options inconfig/queue.phpas below).nats_basis—Basis\Nats\ClientviaConnectionManager(package 1.4.0+); configureconfig/queue.php+config/nats_basis.php. Full reference: docs/v2/QUEUE.md.
Configuration
Add the NATS connection to your config/queue.php:
'connections' => [ // ... other connections ... 'nats' => [ 'driver' => 'nats', 'host' => env('NATS_HOST', 'localhost'), 'port' => env('NATS_PORT', 4222), 'user' => env('NATS_USER'), 'password' => env('NATS_PASSWORD'), 'token' => env('NATS_TOKEN'), 'queue' => env('NATS_QUEUE', 'default'), 'retry_after' => 60, ], ],
Usage
// Dispatch a job to the NATS queue dispatch(new ProcessOrder($order))->onConnection('nats'); // Or set NATS as default in .env // QUEUE_CONNECTION=nats
Job Lifecycle & Retry
Configure retry behavior on your jobs:
class ProcessOrder implements ShouldQueue { use InteractsWithQueue; public $tries = 5; // Maximum attempts public $backoff = [10, 30, 60]; // Linear backoff: 10s, 30s, 60s delays // Or use exponential backoff // public $backoff = 60; // Fixed 60s delay between retries }
The queue driver supports:
- Configurable max attempts (
$triesormaxTries) - Linear backoff (array of delays)
- Fixed delay (integer delay)
- Retry deadlines (
retryUntil) - Exception limits (
maxExceptions)
Failed Jobs
Failed jobs are automatically stored in Laravel's failed_jobs table when:
- Maximum attempts are exceeded
- An exception is thrown during job execution
- The job explicitly calls
$this->fail($exception)
Handling Failed Jobs
class ProcessOrder implements ShouldQueue { use InteractsWithQueue; public function failed(Throwable $exception): void { // Handle the failure logger()->error('Order processing failed', [ 'order_id' => $this->orderId, 'exception' => $exception->getMessage(), ]); } }
Dead Letter Queue (DLQ)
Configure a Dead Letter Queue to route failed jobs to a separate NATS subject:
// config/queue.php 'nats' => [ 'driver' => 'nats', 'host' => env('NATS_HOST', 'localhost'), 'port' => env('NATS_PORT', 4222), 'queue' => env('NATS_QUEUE', 'default'), 'retry_after' => 60, 'dead_letter_queue' => env('NATS_QUEUE_DLQ', 'failed'), // Optional ],
When a job fails, it will be:
- Stored in the
failed_jobsdatabase table - Routed to the DLQ subject (if configured) with enhanced metadata:
- Original queue name
- Failure exception message
- Failure timestamp
- Stack trace
You can subscribe to the DLQ to process failed jobs:
use LaravelNats\Laravel\Facades\Nats; Nats::subscribe('laravel.queue.failed', function ($message) { $payload = $message->getDecodedPayload(); // Process failed job logger()->error('Failed job received', [ 'original_queue' => $payload['original_queue'], 'failure_message' => $payload['failure_message'], ]); });
Delayed Jobs (JetStream)
Delayed jobs require JetStream to be enabled on your NATS server. When enabled, jobs dispatched with later() are stored in a JetStream stream and delivered to the queue when due.
1. Enable delayed jobs in your queue connection or in config/nats.php:
// config/queue.php – enable on the connection 'nats' => [ 'driver' => 'nats', 'host' => env('NATS_HOST', 'localhost'), 'port' => env('NATS_PORT', 4222), 'queue' => env('NATS_QUEUE', 'default'), 'retry_after' => 60, 'delayed' => [ 'enabled' => true, 'stream' => env('NATS_QUEUE_DELAYED_STREAM', 'laravel_delayed'), 'subject_prefix' => env('NATS_QUEUE_DELAYED_SUBJECT_PREFIX', 'laravel.delayed.'), 'consumer' => env('NATS_QUEUE_DELAYED_CONSUMER', 'laravel_delayed_worker'), ], ],
Defaults for stream, subject_prefix, and consumer are also defined under queue.delayed in config/nats.php (or via NATS_QUEUE_DELAYED_* env vars).
2. Use later() to schedule jobs:
use Illuminate\Support\Facades\Queue; // Run job in 5 minutes Queue::connection('nats')->later(now()->addMinutes(5), new SendReminder($user)); // Or with a delay in seconds Queue::connection('nats')->later(60, new ProcessOrder($order));
When delayed is enabled, the connector automatically ensures the JetStream delay stream and durable consumer exist at connect time. A delay processor (or worker) consumes from the delay stream and pushes jobs to the main queue when they are due.
Running Queue Workers
You can use either Laravel's standard queue:work or the package's dedicated nats:work command (Phase 4.1), which defaults to the NATS connection and supports a PID file for process management:
# Dedicated NATS worker (Phase 4.1) - defaults to connection "nats", queue "default" php artisan nats:work # With PID file for process managers (Supervisor, systemd) php artisan nats:work --pidfile=/var/run/nats-worker.pid # Process one job and exit php artisan nats:work --once
Or use Laravel's standard queue worker:
# Start a queue worker php artisan queue:work nats # Process jobs from a specific queue php artisan queue:work nats --queue=high,default # Set maximum job attempts php artisan queue:work nats --tries=3 # Set job timeout php artisan queue:work nats --timeout=60 # Set memory limit php artisan queue:work nats --memory=128
Supported Worker Options (queue:work and nats:work):
--queue- Specify which queues to process--tries- Maximum number of attempts for a job--timeout- Seconds a child process can run--memory- Memory limit in megabytes--sleep- Seconds to sleep when no job available--once- Process a single job and exit- nats:work only:
--connection,--name,--pidfile,--stop-when-empty. Graceful shutdown via SIGTERM/SIGINT (Laravel Worker).
Subject-based consumer (Phase 4.2)
Consume messages from NATS subjects (with optional queue group and handler class):
# Consume a subject (messages printed to console) php artisan nats:consume "orders.*" # With queue group for load balancing php artisan nats:consume "events.>" --queue=workers # Dispatch each message to a handler class (must implement MessageHandlerInterface) php artisan nats:consume "notifications.email" --handler=App\\Handlers\\EmailNotificationHandler # Multiple subjects (comma-separated in --subjects=) php artisan nats:consume "alerts" --subjects="alerts.critical,alerts.info"
Implement a handler by creating a class that implements LaravelNats\Contracts\Messaging\MessageHandlerInterface (define handle(MessageInterface $message): void) and pass it with --handler=YourClass. The handler is resolved from the Laravel container (dependency injection supported).
nats:consume options: --connection= (NATS connection name), --queue= (queue group for load balancing), --handler= (class implementing MessageHandlerInterface), --subjects= (comma-separated additional subjects). Supports wildcards * (single token) and > (one or more tokens). Use Ctrl+C for graceful shutdown.
Current Limitations
- Delayed jobs: Require JetStream; enable via
queue.delayed.enabled(see Delayed Jobs (JetStream)). - Queue size: Returns 0 (NATS Core doesn't track queue size)
- Priority queues: Not supported in NATS Core
Authentication
Username/Password
NATS_USER=myuser NATS_PASSWORD=mypassword
Token
NATS_TOKEN=my-secret-token
Testing
v2 testing details: docs/v2/GUIDE.md.
This package uses Pest PHP for testing.
Running Tests
# Start the NATS server (requires Docker; from package root) docker compose up -d # Run all tests composer test # Run with coverage composer test:coverage
Static Analysis
composer analyse
Code Style
# Check code style composer format:check # Fix code style composer format
Troubleshooting
Connection Refused
Error: Connection to localhost:4222 refused
Solution: Ensure the NATS server is running:
# Using Docker docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:2.10 # Or from this package's compose file docker compose up -d
Authentication Failed
Error: Authorization Violation
Solutions:
- Verify credentials in your
.envfile match the NATS server configuration - Check if using token auth vs. username/password auth
- Ensure the NATS server is configured for the authentication method you're using
Queue Jobs Not Processing
Possible causes:
-
Worker not running: Start the queue worker:
php artisan queue:work nats
-
Wrong queue name: Ensure your job is dispatched to the correct queue:
dispatch(new MyJob())->onQueue('high');
Then process that queue:
php artisan queue:work nats --queue=high
-
NATS connection issues: Check NATS server logs for errors
Message Size Limits
NATS has a default maximum message size of 1MB. For larger payloads:
- Store the data externally (S3, database) and pass a reference
- Configure NATS server with a higher
max_payloadsetting
API Stability
This package follows Semantic Versioning. After v1.0.0:
-
Stable API: Classes in the
LaravelNats\LaravelnamespaceNatsfacadeNatsManagerNatsQueue,NatsJob,NatsConnector- Configuration structure
-
Internal API: Classes in the
LaravelNats\Corenamespace- May change in minor versions
- Use the facade for stability
JetStream Support
JetStream is NATS's persistence and streaming layer. This package provides access to JetStream functionality.
Prerequisites
Ensure your NATS server has JetStream enabled:
# Docker example
docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:2.10 --jetstream
Basic Usage
use LaravelNats\Laravel\Facades\Nats; // Get JetStream client $js = Nats::jetstream(); // Check if JetStream is available if ($js->isAvailable()) { // Use JetStream features } // Get account information (memory, storage, streams, consumers, limits) $accountInfo = $js->getAccountInfo();
Configuration
Configure JetStream in config/nats.php:
'jetstream' => [ 'domain' => env('NATS_JETSTREAM_DOMAIN'), // Optional: for multi-tenancy 'timeout' => (float) env('NATS_JETSTREAM_TIMEOUT', 5.0), ],
Domain Support
For multi-tenant setups, you can use JetStream domains:
$js = Nats::jetstream(null, new \LaravelNats\Core\JetStream\JetStreamConfig('my-domain'));
Stream Management
Create and manage JetStream streams:
use LaravelNats\Core\JetStream\StreamConfig; use LaravelNats\Laravel\Facades\Nats; $js = Nats::jetstream(); // Create a stream $config = new StreamConfig('my-stream', ['events.>']) ->withDescription('Event stream') ->withMaxMessages(10000) ->withMaxBytes(104857600) // 100MB ->withStorage(StreamConfig::STORAGE_FILE); $info = $js->createStream($config); // List streams (paged) $result = $js->listStreams(offset: 0); // $result has: total, offset, limit, streams // Get stream information $info = $js->getStreamInfo('my-stream'); echo $info->getMessageCount(); // Number of messages echo $info->getByteCount(); // Total bytes stored // Update stream configuration $updated = $config->withMaxMessages(20000); $info = $js->updateStream($updated); // Purge all messages $js->purgeStream('my-stream'); // Delete stream $js->deleteStream('my-stream');
Stream Operations
Get and delete individual messages:
// Get message by sequence number $message = $js->getMessage('my-stream', 123); // Delete message by sequence number $js->deleteMessage('my-stream', 123);
Consumer Management
Create and manage durable consumers on a stream:
use LaravelNats\Core\JetStream\ConsumerConfig; use LaravelNats\Laravel\Facades\Nats; $js = Nats::jetstream(); // Create a durable consumer $config = (new ConsumerConfig('my-consumer')) ->withFilterSubject('events.>') ->withDeliverPolicy(ConsumerConfig::DELIVER_NEW) ->withAckPolicy(ConsumerConfig::ACK_EXPLICIT); $info = $js->createConsumer('my-stream', 'my-consumer', $config); // Get consumer information $info = $js->getConsumerInfo('my-stream', 'my-consumer'); echo $info->getNumPending(); // Messages awaiting delivery echo $info->getNumAckPending(); // Messages awaiting ack // List consumers (paged) $result = $js->listConsumers('my-stream', offset: 0); foreach ($result['consumers'] as $consumer) { echo $consumer->getName(); } // $result has: total, offset, limit, consumers // Delete a consumer $js->deleteConsumer('my-stream', 'my-consumer');
Pull consumer and acknowledgements
Consume messages from a pull consumer and acknowledge them:
use LaravelNats\Core\JetStream\ConsumerConfig; use LaravelNats\Core\JetStream\JetStreamConsumedMessage; use LaravelNats\Laravel\Facades\Nats; $js = Nats::jetstream(); // Create a pull consumer (no deliver_subject) with explicit ack $config = (new ConsumerConfig('my-consumer')) ->withAckPolicy(ConsumerConfig::ACK_EXPLICIT) ->withDeliverPolicy(ConsumerConfig::DELIVER_ALL); $js->createConsumer('my-stream', 'my-consumer', $config); // Fetch next message (returns null when no_wait and no message) $msg = $js->fetchNextMessage('my-stream', 'my-consumer', timeout: 5.0, noWait: true); if ($msg instanceof JetStreamConsumedMessage) { echo $msg->getPayload(); $js->ack($msg); // Positive ack // $js->nak($msg); // Redeliver // $js->nak($msg, 30_000_000_000); // Redeliver after 30s (nanoseconds) // $js->term($msg); // Terminate (do not redeliver) // $js->inProgress($msg); // Extend ack wait (work in progress) }
Artisan Commands (JetStream and Worker)
Subject-based consumer (Phase 4.2 - Subject-Based Consumer):
php artisan nats:consume {subject} [--connection=] [--queue=] [--handler=] [--subjects=]
Queue worker (Phase 4.1): php artisan nats:work [--connection=nats] [--queue=default] [--pidfile=] ...
Manage streams and consumers from the CLI:
# Streams php artisan nats:stream:list [--connection=] [--offset=0] php artisan nats:stream:info {stream} [--connection=] php artisan nats:stream:create {name} {subjects*} [--connection=] [--description=] [--storage=file|memory] [--retention=limits|interest|workqueue] php artisan nats:stream:update {stream} [--connection=] [--description=] [--storage=] [--retention=] [--max-messages=] [--max-bytes=] [--max-age=] php artisan nats:stream:purge {stream} [--connection=] [--force] php artisan nats:stream:delete {stream} [--connection=] [--force] # Consumers php artisan nats:consumer:list {stream} [--connection=] [--offset=0] php artisan nats:consumer:info {stream} {consumer} [--connection=] php artisan nats:consumer:create {stream} {name} [--connection=] [--filter-subject=] [--deliver-policy=all|last|new] [--ack-policy=none|all|explicit] php artisan nats:consumer:delete {stream} {consumer} [--connection=] [--force] # JetStream account php artisan nats:jetstream:status [--connection=] [--json]
Use --connection= to target a non-default NATS connection from config/nats.php. For subject-based consumption (Phase 4.2), see Subject-based consumer.
Summary of Phase 4 Commands
| Command | Phase | Purpose |
|---|---|---|
nats:work |
4.1 | NATS queue worker (PID file, signals) |
nats:consume {subject} |
4.2 | Subject-based consumer (handler, queue group, wildcards) |
For release notes and version history, see CHANGELOG.md.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
The MIT License (MIT). Please see License File for more information.
