idct / php-nats-jetstream-client
Async-first NATS + JetStream client for PHP 8.2+
Package info
github.com/ideaconnect/php-nats-jetstream-client
pkg:composer/idct/php-nats-jetstream-client
Requires
- php: ^8.2
- amphp/amp: ^3.1
- amphp/socket: ^2.3
- psr/log: ^3.0
Requires (Dev)
- behat/behat: ^3.14
- friendsofphp/php-cs-fixer: ^3.64
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^11.0
This package is auto-updated.
Last update: 2026-06-07 15:43:29 UTC
README
Async-first NATS and JetStream client for PHP 8.2+ with first-class support for core NATS messaging, JetStream, KeyValue, ObjectStore, and NATS microservices.
The library is built around Amp and provides a typed, high-level API for connection management, publish/subscribe, request/reply, reconnect handling, authentication flows, and JetStream resource management without falling back to blocking I/O.
It is intended for real application use, including service-to-service messaging, event processing, JetStream-backed persistence patterns, and NATS-based microservice discovery.
Installation
Install from Packagist:
composer require idct/php-nats-jetstream-client
Package name: idct/php-nats-jetstream-client
Source repository: https://github.com/ideaconnect/php-nats-jetstream-client
Index
- Installation
- Features
- TODO
- Usage
- Authentication Options
- Connect and Publish/Subscribe
- Request/Reply
- Headers and Server Info
- JetStream Stream and Durable Consumer
- JetStream Stream Update and Consumer Info
- JetStream Pull Consumer (Fetch + ACK)
- JetStream Pull Consumer (NAK, Delayed NAK, TERM, In-Progress)
- Queue Group Subscribe
- Polling Subscribe (SubscriptionQueue)
- JetStream Push Consumer (Durable)
- JetStream Ephemeral Consumers
- Scheduled Publish Example (
@at) - KeyValue Bucket
- Object Store Bucket
- Object Store Streaming to Callback
- Object Store Streaming Upload
- Services Framework
- Services: SCHEMA Discovery
- Graceful Drain
- Ordered Consumer
- Consumer Pause/Resume
- Fetch Batch
- Stream Purge and List
- Consumer List
- Stream Message Get
- JetStream Direct Get
- Credentials File Authentication
- Typed Stream Configuration
- Pull Consumer Batching/Iteration
- Stream Mirroring and Sourcing
- Republish and Subject Transform
- Compatibility Mapping
- Behavior Notes
- Configuration Option Mapping
- Performance Benchmark Recipe
- Testing
- Contributing and contributors
- Current Test Baseline
Features
Current functionality includes:
- Core NATS connect/disconnect with graceful drain
- Publish and subscribe
- Request/reply with timeout and cancellation
- Reconnect with exponential backoff, server rotation, validated subscription replay, and async INFO updates
- Ping/pong heartbeat with
maxPingsOutdetection max_payloadenforcement andno_respondersnegotiation- Subject validation against NATS naming rules
- JetStream account info
- JetStream stream CRUD (create, update, get, delete, purge, list)
- JetStream consumer CRUD (durable + ephemeral, pull + push, list)
- JetStream pull consumers (fetch next, fetch batch, ACK/NAK/TERM/WPI, delayed NAK)
- JetStream push consumers with heartbeat/flow-control handling
- JetStream ordered consumers with automatic sequence tracking and gap recovery
- JetStream consumer pause/resume
- JetStream publish ACK
- JetStream stream message get by sequence — both the regular
STREAM.MSG.GETrequest and the Direct Get API (directGetStreamMessage()/directGetLastMessageForSubject()) - Scheduled publish (
@atsupport) - KeyValue API (bucket lifecycle with history/TTL/storage options, put/get/update/delete/purge, watch, getAll/status)
- ObjectStore API (bucket lifecycle, put/get/delete/list/watch, chunked uploads, streaming upload via
putStream(), SHA-256 digest verification) - Connection
flush()(PING/PONG round-trip) to confirm the server has processed prior writes - Microservices framework (service registration, PING/INFO/STATS/SCHEMA discovery, grouped endpoints)
- Server authorization methods: token, username/password, JWT + nonce signer, built-in NKey seed signer, credentials file parser
- Standalone NKey authentication (Ed25519 challenge signing without JWT)
no_echoCONNECT optiontlsHandshakeFirstTLS option- Typed JetStream configuration enums (RetentionPolicy, StorageBackend, DiscardPolicy, DeliverPolicy, AckPolicy, ReplayPolicy)
- Max frame size limit in protocol parser (DoS protection)
- Queue-based polling subscribe API (
SubscriptionQueuewithfetch(),next(),fetchAll()) - Pull-consumer batching/iteration chain API (
PullConsumerIteratorwithsetBatching(),setIterations(),handle()) - Stream mirroring and sourcing configuration helpers (
StreamSource) - Republish and subject transform configuration helpers (
Republish,SubjectTransform)
Current scheduling note: scheduled messages are implemented with NATS scheduler headers and currently accept only @at expressions.
Use IDCT\\NATS\\JetStream\\Schedule::at(...) or Schedule::atTimestamp(...) to generate valid @at expressions.
TODO
- Align
ProtocolParseroperation detection more closely with the NATS wire spec by accepting case-insensitive operation names (verbs are currently matched case-sensitively and must be followed by a space). Field separators within a recognized control line already accept any whitespace, including tabs.
🚀 This project looks for funding. Love my work? Support it! 💖
-
☕ Buy me a coffee: https://buymeacoffee.com/idct
-
💝 Sponsor: https://github.com/sponsors/ideaconnect
-
🪙 BTC: bc1qntms755swm3nplsjpllvx92u8wdzrvs474a0hr
-
💎 ETH: 0x08E27250c91540911eD27F161572aFA53Ca24C0a
-
⚡ TRX: TVXWaU4ScNV9RBYX5RqFmySuB4zF991QaE
-
🚀 LTC: LN5ApP1Yhk4iU9Bo1tLU8eHX39zDzzyZxB
Usage
Authentication Options
Verified by: NkeySeedSignerTest, NatsConnectionTest::testConnectIncludesJwtSignatureFromInfoNonce; NatsClientIntegrationTest (testTokenAuthSuccessAndFailure, testUserPasswordAuthSuccessAndFailure, testJwtNonceAuthenticationFlow, testTlsHandshakeFirstConnection); features/auth/.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Auth\NkeySeedSigner; // Token auth. $tokenClient = new NatsClient(new NatsOptions( servers: ['nats://127.0.0.1:4222'], token: 's3cr3t-token', )); // Username/password. $passwordClient = new NatsClient(new NatsOptions( servers: ['nats://127.0.0.1:4222'], username: 'alice', password: 's3cr3t', )); $signer = new NkeySeedSigner('SU...USER NKEY SEED...'); $jwtClient = new NatsClient(new NatsOptions( servers: ['nats://127.0.0.1:4222'], jwt: 'your-jwt-token', nkey: $signer->publicKey(), nonceSigner: $signer, )); // TLS with CA and client cert/key. $tlsClient = new NatsClient(new NatsOptions( servers: ['tls://127.0.0.1:4222'], tlsRequired: true, tlsCaFile: '/path/to/ca.pem', tlsCertFile: '/path/to/client-cert.pem', tlsKeyFile: '/path/to/client-key.pem', ));
NkeySeedSigner derives the public NKey from an encoded seed and emits the base64url Ed25519 nonce signature expected by NATS servers.
NkeySeedSigner requires the PHP sodium extension because NATS NKey authentication uses Ed25519 challenge signing.
Connect and Publish/Subscribe
Verified by: NatsClientTest::testClientSubscribeAndProcessIncoming; features/core/connection.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; $client = new NatsClient(new NatsOptions(servers: ['nats://127.0.0.1:4222'])); $client->connect()->await(); $sid = $client->subscribe('orders.created', static function (NatsMessage $message): void { // Handle delivery. echo $message->payload . PHP_EOL; })->await(); $client->publish('orders.created', '{"id":123}')->await(); $client->processIncoming()->await(); $client->unsubscribe($sid)->await(); $client->disconnect()->await();
Request/Reply
Verified by: NatsClientTest::testClientRequestReturnsReply, NatsConnectionTest::testRequestReturnsFirstReplyMessage; features/core/request_reply.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $reply = $client->request('svc.echo', '{"hello":"world"}', 2000)->await(); echo $reply->payload . PHP_EOL; $client->disconnect()->await();
Headers and Server Info
Verified by: NatsClientTest::testClientPublishWithHeadersAndRequestWithHeaders, NatsConnectionTest::testProcessIncomingDispatchesHmsgWithRawHeaders; features/core/headers_queueing.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $client->publishWithHeaders('events.orders', '{"id":123}', [ 'Nats-Msg-Id' => 'orders-123', 'Content-Type' => 'application/json', ])->await(); $reply = $client->requestWithHeaders('svc.echo', 'hello', [ 'X-Request-Id' => 'req-123', ], 2000)->await(); echo $reply->payload . PHP_EOL; echo $client->serverInfo()?->serverName . PHP_EOL; $client->disconnect()->await();
JetStream Stream and Durable Consumer
Verified by: JetStreamContextTest (testStreamCrud, testConsumerCrud, testPublishWithAck, testCreateConsumerDefaultsAckPolicyToExplicit); JetStreamIntegrationTest::testJetStreamConsumerAndPublishAck; features/jetstream-core/stream_lifecycle.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('ORDERS', ['orders.>'])->await(); // If you omit ack_policy, helper methods default it to explicit. // Pass ack_policy explicitly when you need none/all. $js->createConsumer('ORDERS', 'PROC', 'orders.created')->await(); $ack = $js->publish('orders.created', '{"id":123}')->await(); echo $ack->stream . ':' . $ack->seq . PHP_EOL; $js->deleteConsumer('ORDERS', 'PROC')->await(); $js->deleteStream('ORDERS')->await(); $client->disconnect()->await();
JetStream Stream Update and Consumer Info
Verified by: JetStreamContextTest (testUpdateStream, testConsumerCrud); JetStreamIntegrationTest::testJetStreamUpdateStreamConfiguration; features/jetstream-core/management.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('ORDERS', ['orders.created'])->await(); $js->updateStream('ORDERS', [ 'subjects' => ['orders.created', 'orders.updated'], ])->await(); $js->createConsumer('ORDERS', 'PROC', 'orders.created')->await(); $consumerInfo = $js->getConsumer('ORDERS', 'PROC')->await(); echo $consumerInfo->streamName . PHP_EOL; echo $consumerInfo->name . PHP_EOL; $js->deleteConsumer('ORDERS', 'PROC')->await(); $js->deleteStream('ORDERS')->await(); $client->disconnect()->await();
JetStream Pull Consumer (Fetch + ACK)
Verified by: JetStreamContextTest (testFetchNext, testFetchBatchThrowsTerminalStatusDescription); JetStreamIntegrationTest::testJetStreamPullFetchAndAck; features/jetstream-core/consumer_helpers.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('ORDERS', ['orders.created'])->await(); $js->createConsumer('ORDERS', 'PULL', 'orders.created')->await(); $js->publish('orders.created', '{"id":123}')->await(); $message = $js->fetchNext('ORDERS', 'PULL', 3000)->await(); $js->ack($message)->await(); $client->disconnect()->await();
When a pull request ends with a terminal JetStream status frame and no user message is delivered, fetchNext() / fetchBatch() raise JetStreamException with the server status code and description, for example JetStream pull request ended with status 404: No Messages.
JetStream Pull Consumer (NAK, Delayed NAK, TERM, In-Progress)
Verified by: JetStreamContextTest::testAckHelpersPublishProtocolTokens; JetStreamIntegrationTest (testJetStreamPullNakWithDelayRedelivery, testJetStreamTermAndInProgressTokens); features/jetstream-core/consumer_helpers.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('JOBS', ['jobs.>'])->await(); $js->createConsumer('JOBS', 'WORKER', 'jobs.>')->await(); $js->publish('jobs.process', '{"task":"rebuild"}')->await(); $message = $js->fetchNext('JOBS', 'WORKER', 3000)->await(); // Signal work-in-progress to extend the ack deadline. $js->inProgress($message)->await(); // NAK: redeliver the message immediately. $js->nak($message)->await(); // NAK with delay: redeliver after 5 seconds. // $js->nakWithDelay($message, 5000)->await(); // TERM: terminate delivery, do not redeliver. // $js->term($message)->await(); $js->deleteConsumer('JOBS', 'WORKER')->await(); $js->deleteStream('JOBS')->await(); $client->disconnect()->await();
Queue Group Subscribe
Verified by: SubscriptionQueueTest::testSubscribeQueueWithQueueGroup; features/core/headers_queueing.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); // Subscribe with a queue group for load-balanced delivery across workers. $sid = $client->subscribe('tasks.process', static function (NatsMessage $message): void { echo 'Worker received: ' . $message->payload . PHP_EOL; }, queue: 'workers')->await(); $client->publish('tasks.process', '{"job":"build"}')->await(); $client->processIncoming()->await(); $client->unsubscribe($sid)->await(); $client->disconnect()->await();
Polling Subscribe (SubscriptionQueue)
Verified by: SubscriptionQueueTest (fetch/next/fetchAll/setTimeout); features/core/headers_queueing.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); // subscribeQueue() returns a SubscriptionQueue for polling-style consumption. $queue = $client->subscribeQueue('events.>', queue: 'workers')->await(); $queue->setTimeout(5.0); // Non-blocking fetch — returns null if nothing available. $msg = $queue->fetch(); // Blocking fetch — waits up to the configured timeout, returns null on timeout. // With no timeout configured it performs a single processIncoming() cycle (like fetch()). $msg = $queue->next(); // Batch fetch — collects up to 10 messages within the timeout window. $messages = $queue->fetchAll(limit: 10); $client->unsubscribe($queue->sid)->await(); $client->disconnect()->await();
JetStream Push Consumer (Durable)
Verified by: JetStreamContextTest (testSubscribePushConsumerHandlesFlowControl, testSubscribePushConsumerIgnoresHeartbeat); JetStreamIntegrationTest (testJetStreamPushConsumerHelperDelivery, testJetStreamPushFlowControlAndHeartbeat); features/jetstream-core/consumer_helpers.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('ORDERS', ['orders.created'])->await(); $sid = $js->subscribePushConsumer( stream: 'ORDERS', consumer: 'PUSH_PROC', handler: static function (NatsMessage $message) use ($js): void { // Heartbeats / flow-control are handled automatically by helper. $js->ack($message)->await(); }, filterSubject: 'orders.created', )->await(); $js->publish('orders.created', '{"id":123}')->await(); $client->processIncoming()->await(); $client->unsubscribe($sid)->await(); $js->deleteConsumer('ORDERS', 'PUSH_PROC')->await(); $js->deleteStream('ORDERS')->await(); $client->disconnect()->await();
JetStream Ephemeral Consumers
Verified by: JetStreamContextTest (testCreateEphemeralConsumer, testSubscribeEphemeralPushConsumer); JetStreamIntegrationTest (testJetStreamEphemeralPullConsumerFetchAndAck, testJetStreamEphemeralPushConsumerDelivery); features/jetstream-core/consumer_helpers.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('ORDERS', ['orders.created'])->await(); // Ephemeral pull consumer. $ephemeral = $js->createEphemeralConsumer('ORDERS', 'orders.created')->await(); $js->publish('orders.created', '{"id":123}')->await(); $pullMessage = $js->fetchNext('ORDERS', $ephemeral->name)->await(); $js->ack($pullMessage)->await(); // Ephemeral push consumer. $js->subscribeEphemeralPushConsumer( stream: 'ORDERS', handler: static function (NatsMessage $message) use ($js): void { $js->ack($message)->await(); }, filterSubject: 'orders.created', )->await(); $client->disconnect()->await();
Scheduled Publish Example (@at)
Verified by: ScheduleTest, JetStreamContextTest (testPublishScheduled, testPublishScheduledOmitsTtlWhenNotProvided, testPublishScheduledRejectsUnsupportedPattern); JetStreamIntegrationTest (testJetStreamScheduledPublish, testJetStreamScheduledPublishWithPerMessageTtl, testJetStreamScheduledPublishRejectsUnsupportedPatterns); features/jetstream-data/scheduled_publish.feature.
Prerequisites: the backing stream must be created with allow_msg_schedules: true, and because this example sets scheduleTtl, also allow_msg_ttl: true. The stream's subject list must cover both the schedule subject and the target subject. Without these flags the server rejects the publish with message schedules is disabled or per-message TTL is disabled.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\JetStream\Schedule; use DateTimeImmutable; $client = new NatsClient(new NatsOptions(servers: ['nats://127.0.0.1:4222'])); $client->connect()->await(); $jetStream = $client->jetStream(); // The backing stream must cover the schedule and target subjects and enable scheduling. // allow_msg_schedules is required for scheduled publish; allow_msg_ttl is required when // you pass scheduleTtl. $jetStream->createStream('ORDERS', [ 'schedules.orders.one', 'events.orders', ], [ 'allow_msg_schedules' => true, 'allow_msg_ttl' => true, ])->await(); $jetStream->publishScheduled( scheduleSubject: 'schedules.orders.one', targetSubject: 'events.orders', payload: json_encode(['id' => 123], JSON_THROW_ON_ERROR), schedule: Schedule::at(new DateTimeImmutable('+30 seconds')), scheduleTtl: '5m', )->await(); $client->disconnect()->await();
KeyValue Bucket
Verified by: KeyValueBucketTest; JetStreamIntegrationTest (testJetStreamKeyValueLifecycle, testJetStreamKeyValueAdvancedParityOperations); features/jetstream-data/key_value.feature.
<?php declare(strict_types=1); use Amp\CancelledException; use Amp\TimeoutCancellation; use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\JetStream\KeyValue\KeyValueEntry; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $kv = $client->jetStream()->keyValue('cfg'); $kv->create()->await(); // Register the watcher BEFORE the writes it should observe: watch() delivers live updates only // (deliver_policy=new) and does not replay pre-existing values. Each entry carries its revision. $watchSid = $kv->watch(static function (KeyValueEntry $entry): void { echo $entry->key . ':' . ($entry->value ?? '<deleted>') . ' (rev ' . ($entry->revision ?? 0) . ')' . PHP_EOL; }, 'theme')->await(); $kv->put('theme', 'dark')->await(); $entry = $kv->get('theme')->await(); echo $entry?->value . PHP_EOL; if ($entry !== null) { $kv->update('theme', 'light', $entry->revision ?? 1)->await(); } $all = $kv->getAll()->await(); echo ($all['theme'] ?? '') . PHP_EOL; $status = $kv->getStatus()->await(); echo $status['stream'] . PHP_EOL; $kv->delete('theme')->await(); $kv->purge('theme')->await(); // Drive delivery so the watcher receives the buffered updates, bounded so it cannot block forever. try { $cancellation = new TimeoutCancellation(2.0); while (true) { $client->processIncoming($cancellation)->await(); } } catch (CancelledException) { } $client->unsubscribe($watchSid)->await(); $kv->deleteBucket()->await(); $client->disconnect()->await();
Object Store Bucket
Verified by: ObjectStoreBucketTest; JetStreamIntegrationTest::testJetStreamObjectStoreLifecycle; features/jetstream-data/object_store.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $store = $client->jetStream()->objectStore('assets'); $store->create()->await(); $stored = $store->put('logo.txt', 'hello-object', ['content-type' => 'text/plain'])->await(); echo $stored->name . PHP_EOL; $info = $store->info('logo.txt')->await(); echo $info?->digest . PHP_EOL; $objectData = $store->get('logo.txt')->await(); echo $objectData?->data . PHP_EOL; $objects = $store->list()->await(); foreach ($objects as $object) { echo $object->name . PHP_EOL; } $store->delete('logo.txt')->await(); $store->deleteBucket()->await(); $client->disconnect()->await();
Object Store Streaming to Callback
Verified by: ObjectStoreBucketTest (testGetToCallbackStreamsChunks, testGetToCallbackInvokesCallbackOncePerChunk); features/jetstream-data/object_store.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $store = $client->jetStream()->objectStore('assets'); $store->create()->await(); $store->put('logo.txt', 'hello-object')->await(); // getToCallback streams the object chunk-by-chunk: the callback is invoked once per stored // chunk as it is downloaded (the whole object is never buffered in memory), and the SHA-256 // digest is verified incrementally after the final chunk. $info = $store->getToCallback('logo.txt', static function (string $chunk): void { echo $chunk; })->await(); echo PHP_EOL; echo $info?->name . PHP_EOL; $store->deleteBucket()->await(); $client->disconnect()->await();
Object Store Streaming Upload
Verified by: ObjectStoreBucketTest (testPutStreamReChunksAndComputesDigestIncrementally, testPutStreamReChunksLargeBlockAcrossManyChunks); JetStreamIntegrationTest (testJetStreamObjectStorePutStreamRoundTrip).
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $store = $client->jetStream()->objectStore('assets'); $store->create()->await(); // putStream() pulls the object's bytes from a producer callback (return the next block, or null at // end of stream), so the whole payload is never held in memory. Blocks of any size are re-chunked to // the bucket's chunk size, published in bounded in-flight windows, and the SHA-256 digest is computed // incrementally — the streaming counterpart to getToCallback(). $handle = fopen('/path/to/large.bin', 'rb'); $info = $store->putStream('large.bin', static function () use ($handle): ?string { $block = fread($handle, 1 << 16); return ($block === '' || $block === false) ? null : $block; })->await(); fclose($handle); echo $info->size . ' bytes in ' . $info->chunks . ' chunks' . PHP_EOL; $store->deleteBucket()->await(); $client->disconnect()->await();
Services Framework
Verified by: ServiceTest; NatsClientIntegrationTest (testServiceDiscoveryAndEndpoint, testServiceMultipleEndpoints, testServiceGroupedEndpointsHierarchy, testServiceEndpointsLoadBalanceAcrossInstances); features/services/.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; $serviceClient = new NatsClient(new NatsOptions()); $serviceClient->connect()->await(); $service = $serviceClient->service('echo', '1.0.0', 'Echo demo') ->addEndpoint('echo', 'svc.echo', static function (NatsMessage $message): string { return 'reply:' . $message->payload; }); // Handlers can also be provided as objects implementing // IDCT\NATS\Services\ServiceEndpointHandlerInterface or class-string adapters. $service->addGroup('svc')->addGroup('v1')->addEndpoint( 'echo-v1', 'echo', static function (NatsMessage $message): string { return 'v1:' . $message->payload; }, ); $service->start()->await(); // In another client you can call discovery or endpoint subjects: // - $SRV.PING.echo // - $SRV.INFO.echo // - $SRV.STATS.echo // - $SRV.SCHEMA.echo // - svc.echo $service->stop()->await(); $serviceClient->disconnect()->await(); // Optional runtime helper: start + process loop + auto-stop on timeout. // $service->run(timeoutSeconds: 30.0)->await();
Services: SCHEMA Discovery
Verified by: ServiceTest (schema validation + lifecycle observers), BasicJsonSchemaValidatorTest; NatsClientIntegrationTest::testServiceStatsAndObserversWithHeaders; features/services/service_discovery.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; use IDCT\NATS\Services\BasicJsonSchemaValidator; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $service = $client->service('calc', '1.0.0', 'Calculator') ->withSchemaValidator(new BasicJsonSchemaValidator()) ->addObserver(static function (string $event, $endpoint, NatsMessage $message, array $context): void { // Example events: request_start, request_error, request_end // Example context key: correlation_id (from X-Request-Id/traceparent headers) }) ->addEndpoint('add', 'calc.add', static function (NatsMessage $message): string { return 'result'; }, schema: [ 'type' => 'object', 'required' => ['a', 'b'], 'properties' => [ 'a' => ['type' => 'integer'], 'b' => ['type' => 'integer'], ], ]); $service->start()->await(); // Another client can discover the schema: // $reply = $client->request('$SRV.SCHEMA.calc', '')->await(); // The response includes endpoint schemas in the JSON payload. // Invalid request payloads receive a structured envelope: // {"type":"io.nats.micro.v1.error","code":"VALIDATION_ERROR","message":"...","error":"...","correlation_id":"..."} $service->stop()->await(); $client->disconnect()->await();
Graceful Drain
Verified by: NatsConnectionTest (testDrainUnsubscribesAllAndCloses, testDrainDeliversBufferedMessagesBeforeClosing); features/resilience/client_resilience.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $client->subscribe('events.>', static function (NatsMessage $message): void { echo $message->payload . PHP_EOL; })->await(); // Gracefully drain: unsubscribes all SIDs, delivers pending messages, then closes. $client->drain()->await();
Ordered Consumer
Verified by: JetStreamContextTest (testSubscribeOrderedConsumerSendsCorrectConfig, testSubscribeOrderedConsumerRecreatesOnSequenceGap, testSubscribeOrderedConsumerDeliversFilteredMessagesWithoutSpuriousRecreate); JetStreamIntegrationTest::testJetStreamOrderedConsumerWithFilteredSubjectAfterPriorMessages; features/jetstream-core/consumer_helpers.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('EVENTS', ['events.>'])->await(); // Ordered consumer: ephemeral push consumer with flow control, // idle heartbeat, and ack_policy=none for ordered delivery. $sid = $js->subscribeOrderedConsumer( stream: 'EVENTS', handler: static function (NatsMessage $message): void { echo $message->payload . PHP_EOL; }, filterSubject: 'events.>', )->await(); $js->publish('events.order', '{"id":1}')->await(); $client->processIncoming()->await(); $client->unsubscribe($sid)->await(); $js->deleteStream('EVENTS')->await(); $client->disconnect()->await();
Consumer Pause/Resume
Verified by: JetStreamContextTest (testPauseConsumerSendsCorrectPayload, testResumeConsumerSendsEmptyBody); JetStreamIntegrationTest::testJetStreamPauseAndResumeConsumer; features/jetstream-core/consumer_helpers.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('ORDERS', ['orders.>'])->await(); $js->createConsumer('ORDERS', 'PROC', 'orders.created')->await(); // Pause the consumer until a specific time (ISO 8601 format). $js->pauseConsumer('ORDERS', 'PROC', '2026-03-12T00:00:00Z')->await(); // Resume the consumer immediately. $js->resumeConsumer('ORDERS', 'PROC')->await(); $js->deleteConsumer('ORDERS', 'PROC')->await(); $js->deleteStream('ORDERS')->await(); $client->disconnect()->await();
Fetch Batch
Verified by: JetStreamContextTest (testFetchBatch, testFetchBatchIgnoresTerminalStatusFrames, testFetchBatchThrowsWhenNoMessagesArrive); JetStreamIntegrationTest::testJetStreamFetchBatchHandlesStatusFrames; features/jetstream-core/consumer_helpers.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('LOGS', ['logs.>'])->await(); $js->createConsumer('LOGS', 'BATCH', 'logs.>')->await(); for ($i = 0; $i < 5; $i++) { $js->publish('logs.app', "log entry $i")->await(); } // Fetch up to 5 messages in one batch. $messages = $js->fetchBatch('LOGS', 'BATCH', batch: 5, expiresMs: 3000)->await(); foreach ($messages as $message) { $js->ack($message)->await(); } $js->deleteConsumer('LOGS', 'BATCH')->await(); $js->deleteStream('LOGS')->await(); $client->disconnect()->await();
Notes:
- A partial batch is valid. If the server delivers some messages and then ends the pull with a terminal status, the delivered messages are returned.
- A terminal status only becomes an exception when no user message was delivered for that pull request.
Stream Purge and List
Verified by: JetStreamContextTest (testPurgeStream, testPurgeStreamWithSubjectFilter, testListStreams); JetStreamIntegrationTest (testJetStreamPurgeStreamByFilter, testJetStreamListStreams); features/jetstream-core/management.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('LOGS', ['logs.>'])->await(); $js->publish('logs.app', 'entry 1')->await(); $js->publish('logs.app', 'entry 2')->await(); // Purge all messages from the stream. $result = $js->purgeStream('LOGS')->await(); echo 'Purged: ' . $result['purged'] . PHP_EOL; // Purge by subject filter. // $js->purgeStream('LOGS', ['filter' => 'logs.app'])->await(); // List all streams. $streams = $js->listStreams()->await(); foreach ($streams as $stream) { echo $stream->name . PHP_EOL; } $js->deleteStream('LOGS')->await(); $client->disconnect()->await();
Consumer List
Verified by: JetStreamContextTest::testListConsumers; JetStreamIntegrationTest::testJetStreamListConsumers; features/jetstream-core/management.feature.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('ORDERS', ['orders.>'])->await(); $js->createConsumer('ORDERS', 'PROC_A', 'orders.created')->await(); $js->createConsumer('ORDERS', 'PROC_B', 'orders.updated')->await(); $consumers = $js->listConsumers('ORDERS')->await(); foreach ($consumers as $consumer) { echo $consumer->name . ' (push=' . ($consumer->push ? 'yes' : 'no') . ')' . PHP_EOL; } $js->deleteConsumer('ORDERS', 'PROC_A')->await(); $js->deleteConsumer('ORDERS', 'PROC_B')->await(); $js->deleteStream('ORDERS')->await(); $client->disconnect()->await();
Stream Message Get
Verified by: JetStreamContextTest (testGetStreamMessage, testGetStreamMessagePreservesZeroPayload, testGetStreamMessageDecodesHeaders); JetStreamIntegrationTest (testJetStreamGetStreamMessage, testJetStreamGetStreamMessagePreservesZeroAndHeaders).
getStreamMessage() fetches a stored message by sequence using the standard JetStream
$JS.API.STREAM.MSG.GET API. The returned NatsMessage preserves the stored subject, payload
(including a body that is exactly "0"), and any stored headers on rawHeaders.
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('EVENTS', ['events.>'])->await(); $js->publish('events.order', '{"id":1}')->await(); // Fetch message by stream sequence number. $message = $js->getStreamMessage('EVENTS', 1)->await(); echo $message->payload . PHP_EOL; $js->deleteStream('EVENTS')->await(); $client->disconnect()->await();
JetStream Direct Get
directGetStreamMessage() and directGetLastMessageForSubject() use the JetStream Direct Get API
($JS.API.DIRECT.GET), which requires the stream to be created with allow_direct: true. Unlike
getStreamMessage() (regular $JS.API.STREAM.MSG.GET, served by the stream leader), Direct Get can
be answered by any replica. The server returns the stored message directly: the payload is the raw
body and the stream/sequence/subject/timestamp travel as Nats-* headers (preserved on rawHeaders).
A miss is raised as a JetStreamException (for example 404 Message Not Found).
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); $js->createStream('EVENTS', ['events.>'], ['allow_direct' => true])->await(); $js->publish('events.order', '{"id":1}')->await(); // Direct Get by stream sequence. $bySeq = $js->directGetStreamMessage('EVENTS', 1)->await(); echo $bySeq->subject . ': ' . $bySeq->payload . PHP_EOL; // Direct Get the last message stored on a subject. $last = $js->directGetLastMessageForSubject('EVENTS', 'events.order')->await(); echo $last->payload . PHP_EOL; $js->deleteStream('EVENTS')->await(); $client->disconnect()->await();
Verified by: JetStreamContextTest (testDirectGetStreamMessageReturnsRawBodyAndHeaders, testDirectGetLastMessageForSubjectRequestsLastBySubj, testDirectGetStreamMessageThrowsOnNotFound); JetStreamIntegrationTest::testJetStreamDirectGetStreamMessage.
Credentials File Authentication
Verified by: CredentialsParserTest (testParseAcceptsCanonicalNscMarkersWithSixDashEnd, testFromFileParsesRealNscFixtureWhenPresent), NkeySeedSignerTest; features/auth/jwt_and_nkey_auth.feature.
<?php declare(strict_types=1); use IDCT\NATS\Auth\CredentialsParser; use IDCT\NATS\Auth\NkeySeedSigner; use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; // Parse a .creds file to extract JWT and NKey seed. $creds = CredentialsParser::fromFile('/path/to/user.creds'); $signer = new NkeySeedSigner($creds['nkeySeed']); $client = new NatsClient(new NatsOptions( servers: ['nats://127.0.0.1:4222'], jwt: $creds['jwt'], nkey: $signer->publicKey(), nonceSigner: $signer, ));
Typed Stream Configuration
Verified by: the JetStream enums are exercised end-to-end (all six via ->value) in features/jetstream-core/management.feature and JetStreamContextTest.
Stream and consumer configuration supports typed enums for type-safe options:
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\JetStream\Enum\AckPolicy; use IDCT\NATS\JetStream\Enum\DeliverPolicy; use IDCT\NATS\JetStream\Enum\DiscardPolicy; use IDCT\NATS\JetStream\Enum\ReplayPolicy; use IDCT\NATS\JetStream\Enum\RetentionPolicy; use IDCT\NATS\JetStream\Enum\StorageBackend; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); // Create stream with typed configuration. $js->createStream('ORDERS', ['orders.>'], [ 'retention' => RetentionPolicy::Limits->value, 'storage' => StorageBackend::Memory->value, 'discard' => DiscardPolicy::Old->value, 'max_msgs' => 100_000, 'max_bytes' => 50 * 1024 * 1024, 'max_age' => 86_400_000_000_000, // 24h in nanoseconds 'num_replicas' => 1, 'duplicate_window' => 120_000_000_000, // 2 min in nanoseconds ])->await(); // Create consumer with typed configuration. $js->createConsumer('ORDERS', 'PROC', 'orders.created', [ 'deliver_policy' => DeliverPolicy::New->value, 'ack_policy' => AckPolicy::Explicit->value, 'replay_policy' => ReplayPolicy::Instant->value, 'max_deliver' => 5, 'max_ack_pending' => 1000, 'ack_wait' => 30_000_000_000, // 30s in nanoseconds ])->await(); $js->deleteConsumer('ORDERS', 'PROC')->await(); $js->deleteStream('ORDERS')->await(); $client->disconnect()->await();
Pull Consumer Batching/Iteration
Verified by: PullConsumerIteratorTest; JetStreamIntegrationTest::testJetStreamPullIteratorBatching; features/jetstream-core/consumer_helpers.feature.
The fluent PullConsumerIterator wraps fetchBatch() with configurable batch size, iterations, and a handler callback:
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; use IDCT\NATS\JetStream\JetStreamContext; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); // Process messages in batches of 10, up to 5 iterations. $totalProcessed = $js->pullConsumer('ORDERS', 'PROC') ->setBatching(10) ->setExpiresMs(5000) ->setIterations(5) ->handle(function (NatsMessage $msg, JetStreamContext $js): void { echo 'Processing: ' . $msg->payload . PHP_EOL; $js->ack($msg)->await(); })->await(); echo "Processed {$totalProcessed} messages total." . PHP_EOL; $client->disconnect()->await();
Stream Mirroring and Sourcing
Verified by: StreamSourceTest; features/jetstream-core/config_helpers.feature (live source filtering + mirror replication).
Use StreamSource to build mirror/source configuration arrays:
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\JetStream\Configuration\StreamSource; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $mirror = StreamSource::mirror('ORDERS')->toArray(); $aggregateSources = [ StreamSource::source('ORDERS')->filterSubject('orders.>')->toArray(), StreamSource::source('PAYMENTS')->startSeq(100)->toArray(), ]; $remoteMirror = StreamSource::mirror('ORIGIN') ->external('$JS.hub.API', '_DELIVER.hub') ->toArray(); var_dump($mirror, $aggregateSources, $remoteMirror); $client->disconnect()->await();
Use those arrays in createStream() or updateStream() options. Source configurations work with the current high-level API and are covered against the live fixture stack. Mirror-only stream configs also work through createStream() when you pass an empty subjects list together with the mirror configuration.
Republish and Subject Transform
Verified by: RepublishAndTransformTest; features/jetstream-core/config_helpers.feature (live republish + subject transform).
Configure republish rules and subject transforms on streams:
<?php declare(strict_types=1); use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\JetStream\Configuration\Republish; use IDCT\NATS\JetStream\Configuration\SubjectTransform; $client = new NatsClient(new NatsOptions()); $client->connect()->await(); $js = $client->jetStream(); // Republish all order messages to a monitoring subject. $js->createStream('ORDERS', ['orders.>'], [ 'republish' => Republish::create('orders.>', 'monitor.orders.>')->toArray(), ])->await(); // Republish headers only (strip payload) for lightweight notifications. $js->createStream('EVENTS', ['events.>'], [ 'republish' => Republish::create('events.>', 'notify.events.>')->headersOnly()->toArray(), ])->await(); // Apply a subject transform to remap subjects on ingest. $js->createStream('MAPPED', ['raw.>'], [ 'subject_transform' => SubjectTransform::create('raw.>', 'processed.>')->toArray(), ])->await(); $client->disconnect()->await();
Compatibility Mapping
This repository tracks parity against the basis-company nats.php README examples while exposing an Amp-first API tailored to this library.
| Section | Status | Notes |
|---|---|---|
| Connecting and Auth | workflow parity | Basic, token, username/password, JWT nonce signing, credentials file, and TLS CA/cert/key options are supported. |
| Publish Subscribe | workflow parity | Callback, queue-group, and polling queue (SubscriptionQueue with fetch()/next()/fetchAll()) patterns are supported. |
| Request Response | workflow parity | Awaited request/reply with timeout and cancellation is covered, but the API shape differs from basis-company's dispatch() and callback request helpers. |
| JetStream API Usage | workflow parity | Stream/consumer lifecycle, pull/push flows, ephemeral consumers, scheduling, ordered-consumer helpers, batching/iteration chain API, republish/subject-transform live behavior, mirror/source live behavior, and typed enums are covered. |
| Microservices | workflow parity | Service registration, discovery (PING/INFO/STATS/SCHEMA), grouped hierarchy, enriched endpoint stats (requests/errors/last-error/processing time), reset API, opt-in schema validation hook with built-in adapter, handler adapters (callable/object/class-string), request lifecycle observers, standardized error envelopes, and run-loop helper are covered. |
| Key Value Storage | workflow parity | Core KV flows plus update/purge/getAll/status parity are covered. |
| Object Store | parity | Uses the official on-wire layout (meta subjects keyed by base64url(name), chunks under a per-object NUID subject, SHA-256=<base64url> digest, rollup meta), so buckets interoperate with the nats CLI and other clients. Bucket/object lifecycle, streaming download, object listing, chunked uploads, and digest verification are covered. Overwrites and deletes purge the previous revision's chunks. |
Behavior Notes
processIncoming()
Verified by: NatsConnectionTest (testProcessIncomingDispatchesMsgToSubscriber, testProcessIncomingUpdatesServerInfoFromAsyncInfoFrame).
processIncoming() reads a single transport chunk, parses all complete frames from it, and dispatches them to subscription callbacks. It is non-blocking — if no data is available, it returns immediately with a frame count of 0. Because one read returns only a single chunk (and TCP may coalesce several protocol messages into one chunk), call it in a loop to process all available messages:
// Process all available messages for up to 1 second. $deadline = microtime(true) + 1.0; while (microtime(true) < $deadline) { $frames = $client->processIncoming()->await(); if ($frames === 0) { break; } }
The client also applies asynchronous INFO updates received after connect, so serverInfo() can change during the lifetime of an open connection when the server advertises updated capabilities such as max_payload or cluster topology details.
Heartbeat and Request Timeouts
Verified by: NatsConnectionTest (testIdleConnectionStaysOpenViaHeartbeatSelfRead, testHeartbeatReadHandlesEmptyErrorAndFatalFrames, testRequestTimeoutCancelsReadAndAllowsSubsequentRequest).
The heartbeat timer answers its own PONG: after sending a PING it performs a short, bounded read to consume the reply (unless an application processIncoming() read is already in flight). Liveness detection therefore does not depend on the application continuously calling processIncoming(), so an otherwise idle connection (for example a pure publisher) is not closed by spurious maxPingsOut detection. Only an actual server PONG resets the outstanding-ping counter — other inbound frames (data, INFO, PING) do not — so a server that keeps sending data but stops answering PINGs is still detected via maxPingsOut.
Request and pull-fetch timeouts cancel the underlying socket read rather than leaving it pending. A timed-out request() (or fetchBatch()/fetchNext()) cleanly releases the read, so it cannot orphan an in-flight read or trigger a spurious reconnect on the next operation.
Reconnect Behavior
Verified by: NatsConnectionTest (testBackoffDelayIsExponential, testConnectRotatesServersOnReconnectAttempts, testReconnectRetriesWhenResubscribeGetsFatalServerError).
When a connection drops and reconnectEnabled is true:
- Exponential backoff: delay is computed as
reconnectDelayMs * 2^(attempt - 1), capped atreconnectMaxDelayMs, with random jitter up toreconnectJitterMs. - Server rotation: the client cycles through configured servers in order.
- Subscription replay: all active subscriptions are replayed (SUB commands resent) after reconnect.
- Replay validation: reconnect does not treat replayed subscriptions as successful if the server immediately answers with a fatal
-ERRduring replay. In that case reconnect keeps retrying until a healthy server accepts the replay or attempts are exhausted. - Published messages during reconnect are lost: there is no outbound buffer for in-flight publishes. Only subscriptions are restored.
Recoverable server -ERR frames such as Invalid Subject or Permissions Violation for Publish/Subscription to ... do not automatically close an already-open connection. Fatal connection-level errors still do.
The initial handshake is bounded by connectTimeoutMs, not by a fixed number of transport reads. During bootstrap the client will also answer server PING frames and process async INFO updates while waiting for the initial PONG.
Ordered Consumer Gap Recovery
Verified by: JetStreamContextTest (testSubscribeOrderedConsumerRecreatesOnSequenceGap, testSubscribeOrderedConsumerDeliversFilteredMessagesWithoutSpuriousRecreate); JetStreamIntegrationTest::testJetStreamOrderedConsumerWithFilteredSubjectAfterPriorMessages.
subscribeOrderedConsumer() tracks the JetStream consumer delivery sequence (which is contiguous per delivery, even for a filtered consumer over a stream that also carries non-matching subjects). If a push is missed — the consumer sequence skips — the consumer is transparently deleted and recreated starting just after the last in-order message; the out-of-order message that exposed the gap is discarded (not forwarded), and the recreated consumer replays the missing range in order. Delivery to your callback therefore stays in order and gap-free, with no duplicates and no recreate storm. If the restart point has been pruned/expired, recovery resumes from the next available message. If the recreate itself fails (for example the stream was deleted or a leadership change is in progress), the error is contained to this ordered consumer rather than disrupting delivery to other subscriptions on the connection.
Configuration Option Mapping
NatsOptions fields and defaults (verified by NatsOptionsTest):
| Option | Type | Default | Notes |
|---|---|---|---|
servers |
list<string> |
['nats://127.0.0.1:4222'] |
Supports nats:// and tls:// endpoints. |
name |
string |
idct-php-nats-client |
Sent in CONNECT payload. |
inboxPrefix |
string |
_INBOX |
Prefix for generated request inbox subjects. |
connectTimeoutMs |
int |
5000 |
Transport connect timeout in milliseconds. |
requestTimeoutMs |
int |
10000 |
Default request/reply timeout. |
reconnectEnabled |
bool |
true |
Enables reconnect flow. |
maxReconnectAttempts |
int |
10 |
Max reconnect attempts before closing. |
reconnectDelayMs |
int |
100 |
Base reconnect backoff delay. |
reconnectMaxDelayMs |
int |
10000 |
Maximum reconnect delay (caps exponential backoff). |
reconnectJitterMs |
int |
50 |
Random jitter added to reconnect delay. |
pingIntervalSeconds |
int |
30 |
Client heartbeat interval setting. |
maxPingsOut |
int |
2 |
Max outstanding pings before failure. |
verbose |
bool |
false |
NATS verbose protocol mode. |
pedantic |
bool |
false |
NATS pedantic protocol mode. |
noEcho |
bool |
false |
Suppresses server echo of own published messages. |
tlsRequired |
bool |
false |
Forces TLS context in transport. |
tlsHandshakeFirst |
bool |
false |
When true, performs the TLS handshake immediately after connecting (before server INFO). When false (default), the client uses the standard NATS flow: read the plaintext INFO, then upgrade to TLS when TLS is required (option, tls:// scheme, or server tls_required). |
tlsCaFile |
?string |
null |
CA bundle path for peer verification. |
tlsCertFile |
?string |
null |
Client certificate path. |
tlsKeyFile |
?string |
null |
Client private key path. |
tlsKeyPassphrase |
?string |
null |
Passphrase for encrypted key file. |
tlsPeerName |
?string |
null |
Overrides TLS peer name (SNI/verification). |
tlsVerifyPeer |
bool |
true |
Enables certificate verification. |
token |
?string |
null |
Token auth, encoded as auth_token. |
username |
?string |
null |
Username auth field. |
password |
?string |
null |
Password auth field. |
jwt |
?string |
null |
JWT user credential. |
nkey |
?string |
null |
Public NKey for JWT auth mode or standalone NKey challenge-response auth. |
nonceSigner |
?NonceSignerInterface |
null |
Signs the server nonce for JWT or standalone NKey auth. |
maxPendingMessagesPerSubscription |
int |
1024 |
Slow consumer queue bound per SID. |
slowConsumerPolicy |
SlowConsumerPolicy |
DropOldest |
One of DropOldest, DropNewest, Error. |
Performance Benchmark Recipe
Quick local publish/request benchmark (single process).
The responder is pumped by a single long-lived background loop rather than one
processIncoming() call per request. processIncoming() consumes one transport chunk, and TCP
can coalesce several protocol messages into one chunk, so a one-call-per-request responder
desynchronizes and stalls. A continuous read loop (cancelled when the run finishes) avoids that:
<?php declare(strict_types=1); use Amp\CancelledException; use Amp\DeferredCancellation; use IDCT\NATS\Connection\NatsOptions; use IDCT\NATS\Core\NatsClient; use IDCT\NATS\Core\NatsMessage; use function Amp\async; $iterations = 5000; $subject = 'bench.echo'; $server = new NatsClient(new NatsOptions()); $client = new NatsClient(new NatsOptions()); $server->connect()->await(); $client->connect()->await(); $server->subscribe($subject, static function (NatsMessage $message) use ($server): void { if ($message->replyTo !== null) { $server->publish($message->replyTo, 'ok')->await(); } })->await(); // Drive the responder from one continuous background read loop. $serverCancel = new DeferredCancellation(); $serverLoop = async(static function () use ($server, $serverCancel): void { $cancellation = $serverCancel->getCancellation(); while (!$cancellation->isRequested()) { try { $server->processIncoming($cancellation)->await(); } catch (CancelledException) { break; } } }); $start = hrtime(true); for ($i = 0; $i < $iterations; $i++) { $client->request($subject, 'x', 2000)->await(); } $elapsedNs = hrtime(true) - $start; $serverCancel->cancel(); $serverLoop->await(); $totalMs = $elapsedNs / 1_000_000; $rps = $iterations / max(0.001, ($elapsedNs / 1_000_000_000)); echo 'iterations=' . $iterations . PHP_EOL; echo 'total_ms=' . number_format($totalMs, 2, '.', '') . PHP_EOL; echo 'req_per_sec=' . number_format($rps, 2, '.', '') . PHP_EOL; $client->disconnect()->await(); $server->disconnect()->await();
Run recipe:
docker compose up -d php -d zend.assertions=1 path/to/benchmark.php docker compose down
Testing
Typical local workflow:
composer install composer test:unit composer test:bdd composer stan composer test:e2e
Additional useful commands:
composer test
RUN_INTEGRATION=1 composer test:integration
composer test:bdd
BEHAT_SUITE=core composer test:bdd
composer test:integration:repeat
composer fixture:jwt:check
composer fixture:jwt
composer fix
composer test:e2e is the preferred compose-backed validation path. It checks the committed JWT fixtures, starts the local NATS stack, waits for readiness, runs unit tests, runs integration tests, runs the Behat feature suite, and tears the stack down again.
composer test:bdd runs only the Behat feature suite against the same local Docker Compose fixtures. Use BEHAT_SUITE=core composer test:bdd to run a narrower slice while iterating locally, or BEHAT_SUITE=core composer test:e2e to keep the rest of the e2e flow and narrow only the Behat stage.
Base integration endpoint:
NATS_URL(default:nats://127.0.0.1:14222)
When you run docker compose up -d in this repository, additional local auth fixtures are available by default:
- token auth:
nats://127.0.0.1:14223with tokenlocal-test-token - username/password auth:
nats://127.0.0.1:14224withlocal-user/local-pass - TLS handshake-first auth:
tls://127.0.0.1:14225using the generated files underbuild/tls/ - JWT auth:
nats://127.0.0.1:14227using the generated operator/account resolver chain underbuild/nats/jwt/ - standalone NKey auth:
nats://127.0.0.1:14226with seedSUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY
The integration tests use those defaults automatically. Override them with environment variables when you want to target external infrastructure instead.
To regenerate the committed local JWT fixture artifacts and resolver config intentionally, run:
composer fixture:jwt
If the local nats-jwt compose service is already running, the regeneration script recreates it so the server picks up the new operator/account resolver state immediately.
To verify the committed JWT fixture is in sync with the regeneration script, run:
composer fixture:jwt:check
NATS_TLS_URL: TLS-enabled server URL used bytestTlsHandshakeFirstConnectionNATS_TLS_CA_FILE: optional CA bundle path for TLS verificationNATS_TLS_CERT_FILE: optional client certificate path for TLS/mTLSNATS_TLS_KEY_FILE: optional client private key path for TLS/mTLSNATS_TLS_SKIP_VERIFY: set to1to disable peer verification in the TLS integration testNATS_TOKEN_URL: token-auth server URL used bytestTokenAuthSuccessAndFailureNATS_TOKEN: valid token for the token-auth endpointNATS_TOKEN_INVALID: invalid token used for the negative token-auth pathNATS_USERPASS_URL: username/password-auth server URL used bytestUserPasswordAuthSuccessAndFailureNATS_USERNAME: valid username for the user/password endpointNATS_PASSWORD: valid password for the user/password endpointNATS_BAD_PASSWORD: invalid password used for the negative user/password pathNATS_JWT_URL: JWT-auth server URL used bytestJwtNonceAuthenticationFlow(default:nats://127.0.0.1:14227)NATS_JWT: user JWT presented in the CONNECT payload (defaults tobuild/nats/jwt/user.jwt)NATS_JWT_NKEY_SEED: encoded user seed used byNkeySeedSignerto derive the public NKey and sign the server nonce (defaults tobuild/nats/jwt/user.seed)NATS_NKEY_URL: standalone NKey-auth server URL used bytestStandaloneNkeyAuthenticationFlowNATS_NKEY_SEED: encoded user seed used byNkeySeedSignerfor standalone NKey challenge-response auth
Example overrides for external infrastructure:
# Base server override. RUN_INTEGRATION=1 \ NATS_URL=nats://demo.example.net:4222 \ composer test:integration # Token auth override. RUN_INTEGRATION=1 \ NATS_TOKEN_URL=nats://token.example.net:4222 \ NATS_TOKEN=prod-token-value \ NATS_TOKEN_INVALID=wrong-token \ ./vendor/bin/phpunit --testsuite integration --filter testTokenAuthSuccessAndFailure # Username/password override. RUN_INTEGRATION=1 \ NATS_USERPASS_URL=nats://auth.example.net:4222 \ NATS_USERNAME=alice \ NATS_PASSWORD=s3cr3t \ NATS_BAD_PASSWORD=wrong-pass \ ./vendor/bin/phpunit --testsuite integration --filter testUserPasswordAuthSuccessAndFailure # TLS override with strict verification. RUN_INTEGRATION=1 \ NATS_TLS_URL=tls://tls.example.net:4222 \ NATS_TLS_CA_FILE=/path/to/ca.pem \ NATS_TLS_CERT_FILE=/path/to/client-cert.pem \ NATS_TLS_KEY_FILE=/path/to/client-key.pem \ ./vendor/bin/phpunit --testsuite integration --filter 'testTlsHandshakeFirstConnection|testTlsConnectionFailsWithoutClientCertificate|testTlsConnectionFailsWithWrongCa|testTlsConnectionFailsWithPeerNameMismatch' # JWT auth override. RUN_INTEGRATION=1 \ NATS_JWT_URL=nats://jwt.example.net:4222 \ NATS_JWT="$(cat /path/to/user.jwt)" \ NATS_JWT_NKEY_SEED="$(cat /path/to/user.seed)" \ ./vendor/bin/phpunit --testsuite integration --filter testJwtNonceAuthenticationFlow # Standalone NKey auth override. RUN_INTEGRATION=1 \ NATS_NKEY_URL=nats://nkey.example.net:4222 \ NATS_NKEY_SEED="$(cat /path/to/user.seed)" \ ./vendor/bin/phpunit --testsuite integration --filter testStandaloneNkeyAuthenticationFlow
Focused auth/TLS integration run:
RUN_INTEGRATION=1 ./vendor/bin/phpunit --testsuite integration --filter 'testTlsHandshakeFirstConnection|testTlsConnectionFailsWithoutClientCertificate|testTlsConnectionFailsWithWrongCa|testTlsConnectionFailsWithPeerNameMismatch|testTokenAuthSuccessAndFailure|testUserPasswordAuthSuccessAndFailure|testJwtNonceAuthenticationFlow|testStandaloneNkeyAuthenticationFlow'
To do a quick local flake check against the compose-backed environment, run:
composer test:integration:repeat
The CI workflow also exposes a manual workflow_dispatch soak job named integration-soak. When triggered from GitHub Actions, it runs scripts/repeat-integration.sh with a configurable repeat count on PHP 8.5.
Contributing and contributors
Contributions should keep changes focused and paired with the narrowest useful verification.
- Add or update tests when behavior changes.
- Prefer
composer test:unitfor small local changes andcomposer test:e2efor auth, transport, protocol, JetStream, or integration-fixture changes. - Do not hand-edit generated JWT fixture files under
build/nats/jwt/; regenerate them withcomposer fixture:jwt. - Run
composer stanfor code changes andcomposer fixwhen style adjustments are needed. - Review
AGENTS.mdfor repository structure, standards, and continuation guidance before larger changes.
Many thanks for all the contributions:
Current Test Baseline
- Unit tests cover protocol encoding/parsing, handshake/state transitions, subscriptions, backpressure policies, request/reply flows, reconnect/server-rotation behavior, and exponential backoff.
- Unit tests also cover JetStream account info, stream and consumer CRUD, publish acknowledgments, API error mapping, fetch batch, ordered consumers, consumer pause/resume, KV bucket options, ObjectStore chunking and digest verification.
- Unit tests cover microservices framework including PING/INFO/STATS/SCHEMA discovery and grouped endpoint hierarchy.
- Integration tests cover live connect/disconnect, publish-subscribe roundtrip, request-reply, connection rotation fallback, JetStream stream/consumer lifecycle with publish-ack flow, KV operations, ObjectStore operations, and service discovery.
- Integration tests also cover local token auth, username/password auth, TLS handshake-first auth including strict peer-validation, hostname mismatch, and missing-client-cert failures, resolver-backed JWT auth, and standalone NKey auth.
- Static analysis runs with PHPStan level 8.