bschmitt / laravel-amqp
AMQP wrapper for Laravel and Lumen to publish and consume messages
Requires
- php: ^7.3|^8.0
- php-amqplib/php-amqplib: ^3.0
Requires (Dev)
- illuminate/config: ^7|^8|^9|^10|^11|^12|^13
- illuminate/console: ^7|^8|^9|^10|^11|^12|^13
- illuminate/container: ^7|^8|^9|^10|^11|^12|^13
- illuminate/queue: ^7|^8|^9|^10|^11|^12|^13
- illuminate/support: ^7|^8|^9|^10|^11|^12|^13
- mockery/mockery: ^1.3
- phpoption/phpoption: ^1.7
- phpunit/phpunit: ^9.6|^10.5|^11.5|^12.0
- squizlabs/php_codesniffer: ^3.5
- vlucas/phpdotenv: ^5.3
This package is auto-updated.
Last update: 2026-06-02 13:21:50 UTC
README
A detailed AMQP wrapper for Laravel and Lumen to publish and consume messages, especially from RabbitMQ. This package provides full support for RabbitMQ features including RPC patterns, management operations, message properties, and more.
Features
Core Features
- Advanced queue configuration
- Easy message publishing to queues
- Flexible queue consumption with useful options
- Support for all RabbitMQ exchange types (topic, direct, fanout, headers)
- Full AMQP message properties support
Version 3.1.0+ New Features
- RPC Pattern Support - Built-in request-response patterns with
rpc()andreply()methods - Queue Management - Programmatic control (purge, delete, unbind)
- Management HTTP API - Full integration with RabbitMQ Management API
- Policy Management - Create, update, and delete policies programmatically
- Feature Flags - Query RabbitMQ feature flags
- Enhanced Message Properties - Full support for priority, correlation_id, headers, etc.
- Listen Method - Auto-create queues and bind to multiple routing keys
- Connection Configuration Helper - Easy access to connection configs
Advanced Features
- Publisher Confirms - Guaranteed message delivery
- Consumer Prefetch (QoS) - Rate limiting and flow control
- Queue Types - Classic, Quorum, and Stream queues
- Dead Letter Exchanges - Message routing for failed messages
- Advanced retry & dead-letter abstractions - Declarative
RetryPolicy+DeadLetterTopology+RetryHandlerwith fixed/exponential backoff and auto-routing to DLQ when retries exhaust (see Retry & DLQ Abstractions) - Delayed messaging & publish backoff -
publishLater()/publishTypedLater()with TTL+DLX orrabbitmq-delayed-message-exchangeplugin strategies, plusPublishBackofffor publisher-side transient retries (see Delayed Messaging) - Typed message contracts -
MessageContractInterface,TypedMessagebase class,publishTyped()/consumeTyped()with pluggableMessageSerializerInterface(JSON by default) (see Typed Messaging) - JSON schema validation - Zero-dependency
SchemaValidator(Draft 7 subset) validates payloads against contractschema()definitions (see JSON Schema Validation) - Message Priority - Priority-based message processing
- TTL Support - Message and queue expiration
- Lazy Queues - Disk-based message storage
- Alternate Exchange - Unroutable message handling
- Native Laravel Queue integration - Use
amqpas aconfig/queue.phpdriver withqueue:work - Artisan commands -
amqp:work(with--retry/--contract/--validate-schema),amqp:consume,amqp:listen,amqp:publish(with--delay-ms), andamqp:purge - Exchange & topology builders -
ExchangeTopologydeclarative exchange + queue bindings withdeclareExchangeTopology()(see Production Infrastructure) - Quorum & priority queue profiles -
QueueProfile::quorum(),priority(), andquorumWithPriority()presets forqueue_properties(see Production Infrastructure) - Resilient connections & pooling -
ResilientConnectionManagerauto-reconnect with heartbeat staleness checks;ConnectionPoolfor persistent worker channels (see Production Infrastructure) - Distributed tracing - W3C
traceparentpropagation viaTracePropagatorInterface(OTel bridge viaCallbackTracePropagator); enable withpropagate_traceon publish/consume (see Production Infrastructure) - Correlation ID propagation -
CorrelationContextwithpropagate_correlationon publish/consume (see Production Infrastructure) - Consumer lifecycle hooks -
ConsumerLifecyclegraceful shutdown, signal handlers, andconsumeWithLifecycle()(see Production Infrastructure) - SAGA workflow helpers -
Sagastep/compensation orchestrator withSagaResultreporting (see SAGA, Events, Middleware & Testing) - Laravel events & consume middleware -
MessagePublishing/MessagePublished/MessageReceived/MessageHandled/MessageFailedevents plusConsumePipeline/ConsumeMiddlewareInterfaceandconsumeWithMiddleware()(see SAGA, Events, Middleware & Testing) - Fake AMQP test driver -
Amqp::fake()/FakeAmqpwithassertPublished(),assertPublishedCount(),assertNothingPublished()(see SAGA, Events, Middleware & Testing) - Publisher confirms & async publishing - persistent-channel
AsyncPublisherwith batched confirms viaAmqp::asyncPublisher()(see SAGA, Events, Middleware & Testing) - RPC abstraction helpers -
RpcClient/RpcServerwithRpcCallResult, JSON mode, andAmqp::rpcClient()/rpcServer()(see Scale & Interop) - Cross-service / polyglot messaging -
InteropEnvelopestandard headers (x-message-type,x-schema-version,x-source-service) viapublishInterop()/consumeInterop()(see Scale & Interop) - Observability & queue metrics -
MetricsCollector,QueueMetrics,Amqp::metrics(),queueMetrics()/getQueueStats()(see Scale & Interop) - High-performance workers -
WorkerOptions,HighPerformanceWorker,consumeOptimized(), andamqp:work --optimized(see Scale & Interop) - gRPC-lite typed RPC -
Rpc::call(UserService::class, GetUserRequest::make([...]))with typed request/response DTOs, service registries, andRpc::serve()on the server (see gRPC-lite RPC) - Service discovery -
Rpc::service('payments')->call(...)resolves a short name to a registeredRpcServiceclass; opt-in viastatic alias()on the service (see Messaging Platform) - Saga facade -
Saga::make()->step(...)->compensate(...)fluent syntax with reverse-order compensation on failure (see Messaging Platform) - Message contracts dispatch -
OrderCreated::dispatch(['orderId' => 'o-1'])auto-serializes and publishes typed messages via the Laravel container (see Messaging Platform) - Dead-letter management -
Amqp::deadLetters()->for('orders.dlq')->count()/peek()/summarize()/messages()/replayTo()/purge()+php artisan amqp:dlq(see Messaging Platform) - Declarative retry attribute -
#[Retry(attempts: 5, strategy: RetryStrategy::EXPONENTIAL)]on handlers, hydrated viaRetryPolicy::fromAttribute()(see Messaging Platform) - Monitoring dashboard -
Amqp::dashboard($queues)->snapshot()with lag, DLQ summaries, and RPC histograms;php artisan amqp:monitor/amqp:dlq(see Messaging Platform) - RPC latency tracking -
RpcLatencyRecorder, per-calldurationMsonRpcCallResult,RpcCallStarted/RpcCallCompleted/RpcCallFailedevents (see gRPC-lite RPC) - Causation ID propagation -
CorrelationContextnow propagates bothcorrelation_idandx-causation-idso consumers can chain "this happened because of that" (see Messaging Platform) - MessageStore -
MessageStoreInterface+InMemoryMessageStore; opt-in audit log of every publish/consume viaAmqp::setMessageStore()(see Messaging Platform) - Async Laravel events - mark events with
ShouldPublishToAmqpInterfaceand enableamqp.broadcast_laravel_eventsto auto-publishevent(new OrderCreated())to RabbitMQ (see Messaging Platform) - Laravel Pulse integration -
AmqpPulseRecorderauto-records publish/handle/fail/RPC/DLQ events to Pulse whenlaravel/pulseis installed; opt out withamqp.pulse_integration => false(see Messaging Platform) - OpenTelemetry bridge -
OpenTelemetryTracePropagatorinjects the active OTel span context into AMQP headers (with W3C fallback) whenopen-telemetry/apiis installed (see Production Infrastructure) - Correlation ID visualisation -
CorrelationChain::tree()/render()reconstruct causation graphs from theMessageStore;php artisan amqp:trace <correlation_id>prints an ASCII tree or JSON (see Messaging Platform) - Kubernetes liveness / readiness probes -
HealthState+HealthCheck+Http\Controllers\HealthController; opt-in HTTP routes (GET {prefix}/live|ready) plusphp artisan amqp:healthfor exec probes (see Kubernetes & Cloud Native) - Consumer autoscaling recommendations -
AutoscalingAdvisor(depth + lag heuristics, KEDA trigger spec) andphp artisan amqp:scaleCLI (see Kubernetes & Cloud Native) - Laravel Cloud compatibility -
LaravelClouddetector +AMQP_URL/CLOUDAMQP_URL/RABBITMQ_URLDSN auto-hydration on register (see Kubernetes & Cloud Native) - Multi-region deployment support -
MultiRegionConnectionresolver with locality preference, cool-down blacklist, andwithFailover()retry loop across region-scoped connection keys (see Kubernetes & Cloud Native)
Planned Features
Status legend: [x] shipped · [~] partial / building blocks shipped (full feature still planned) · [ ] not started.
Many "partial" items already ship as a programmatic API or CLI; the outstanding work is usually a UI, native integration, or codegen layer. See the Features section above for the full list of already-shipped capabilities.
Observability
- Laravel Pulse integration for AMQP metrics —
AmqpPulseRecorderauto-subscribes to publish/handle/fail/RPC/DLQ events whenlaravel/pulseis installed; disable viaamqp.pulse_integration => false - Native OpenTelemetry exporter support —
OpenTelemetryTracePropagatorbridges toopen-telemetry/apiwhen installed (active span context auto-injected); falls back to W3C generation otherwise.CallbackTracePropagatorremains for custom APMs - Correlation ID visualization —
CorrelationChain::tree()/render()reconstruct causation graphs from theMessageStore;php artisan amqp:trace <correlation_id>prints an ASCII tree or JSON - Queue throughput monitoring —
MetricsCollector,QueueMetrics,Amqp::metrics()/queueMetrics(),MonitoringDashboard - Consumer lag monitoring —
QueueMetrics::lag(),lagSeconds(),isLagging();--lag-threshold/--lag-seconds/--lag-ageonamqp:monitor - Dead-letter queue monitoring —
DeadLetterManager::peek()/summarize(),dead_lettersblock in dashboard,php artisan amqp:dlq - RPC latency tracking —
RpcLatencyRecorder,RpcCallResult::durationMs(),RpcCallCompleted/RpcCallFailedevents,--rpconamqp:monitor - Distributed trace propagation —
TraceContext,W3cTracePropagator,propagate_traceon publish/consume
Developer Experience
- AMQP Explorer (Telescope-like message inspector) —
php artisan amqp:explore(with--id, filters, JSON mode) - Message replay tooling —
php artisan amqp:replay+MessageStoreInterfacesource + target/exchange overrides - Failed message browser —
php artisan amqp:explore+amqp:dlq(messages,peek,summarize) for CLI inspection - Live queue inspector —
php artisan amqp:inspectwatch loop overqueueMetrics()/ Management API - Message payload diff viewer —
php artisan amqp:diff {left} {right}+ structural JSON/text diffing - Schema validation debugger —
php artisan amqp:schema:debug(interactive/file/store payload sources) - Interactive RPC testing console —
php artisan amqp:rpc:consolewith JSON/raw payload modes
Kubernetes & Cloud Native
- Kubernetes-ready consumer lifecycle management —
ConsumerLifecycle::withHealth()stampsHealthStateon every start/stop/message/error - Graceful shutdown support —
ConsumerLifecyclesignal handlers (SIGTERM/SIGINTviapcntl) + cooperativerequestStop() - Readiness probe endpoint —
GET {prefix}/readyHTTP route +php artisan amqp:health --probe=readyfor exec probes - Liveness probe endpoint —
GET {prefix}/liveHTTP route +php artisan amqp:health --probe=live - Auto-recovery after broker failures —
ResilientConnectionManager(reconnect + heartbeat staleness) andConnectionPool - Consumer autoscaling recommendations —
AutoscalingAdvisor+php artisan amqp:scale(depth/lag heuristics, KEDA-ready trigger output) - Laravel Cloud compatibility —
LaravelClouddetector +AMQP_URL/CLOUDAMQP_URL/RABBITMQ_URLauto-hydration - Multi-region deployment support —
MultiRegionConnectionresolver with locality preference, cool-down blacklist, andwithFailover()retry loop
Enterprise Messaging
- [~] Dead-letter queue management UI —
DeadLetterManagerAPI +amqp:dlq/amqp:monitorCLI; web UI still planned - Scheduled message delivery (absolute time) — only relative delays today via
publishLater()/dispatchLater() - Delayed message support —
DelayedPublisher,Amqp::publishLater()/publishTypedLater(),TypedMessage::dispatchLater(),amqp:publish --delay-ms - Message priority queues —
QueueProfile::priority()/quorumWithPriority(),x-max-priority, publishpriority - Bulk publish operations —
Amqp::batchBasicPublish()/batchPublish()+BatchManager - [~] Consumer rate limiting — QoS / prefetch only (
basic_qos,--prefetch-count,WorkerOptions::throughput()/lowLatency()); no app-level msgs/sec throttling - Circuit breaker support
- [~] Retry policy dashboard —
RetryPolicy,#[Retry],RetryHandler,consumeWithRetry(); metrics viaamqp:monitor(no dedicated retry UI)
Polyglot Microservices
- Contract generation for TypeScript
- Contract generation for Go
- Contract generation for Python
- [~] JSON Schema export — contracts can declare
schema()and validate in-process; noexportCLI yet - AsyncAPI specification generation
- Service registry integration —
ServiceRegistry,Rpc::services()->register()/autodiscover(),Rpc::service('alias')->call(...) - [~] Cross-language RPC contracts —
InteropEnvelopestandard headers (x-message-type,x-schema-version,x-source-service); no codegen for foreign languages
Operations
- [~] Horizon-style AMQP dashboard —
MonitoringDashboardsnapshot +php artisan amqp:monitor [--json]; full web UI still planned - [~] Queue health monitoring — Management API stats +
amqp:monitor(no dedicated health view) - [~] Consumer health monitoring — consumer counts / rates via Management API; no consumer health UI
- Queue topology visualizer (
ExchangeTopology/DeadLetterTopologyare code builders, not a visualizer) - Exchange / routing-key explorer
- [~] Production diagnostics commands —
amqp:monitor,amqp:publish,amqp:consume,amqp:listen,amqp:purge,amqp:work; no dedicatedamqp:diagnose - [~] Broker connectivity diagnostics —
ResilientConnectionManagerhandles reconnection; no standalone diagnostic command
Security
- Message encryption support
- Message signing & verification
- Sensitive payload masking
- [~] Audit trail integration —
MessageStoreInterface+InMemoryMessageStore(opt-in append log of every publish/consume) - Per-consumer access controls
- Security scanning for message contracts
AI & Modern Features
- AI-powered message anomaly detection
- AI-assisted queue optimization recommendations
- Natural language queue diagnostics
- Intelligent retry recommendations
Requirements
- PHP 7.3 through 8.5 (
composer.json:^7.3|^8.0) - Laravel 8.x through 13.x (or Lumen 8.x+)
- RabbitMQ 3.x (tested with
rabbitmq:3-managementDocker image)
| Laravel | Minimum PHP | Notes |
|---|---|---|
| 8.x | 7.3 | Last Laravel version for PHP 7.3 / 7.4 |
| 9.x | 8.0.2 | Use PHP 8.0.2+ (not 8.0.0/8.0.1) |
| 10.x | 8.1 | |
| 11.x / 12.x | 8.2 | |
| 13.x | 8.3 |
Config supports both use + properties (current) and legacy default + connections layouts.
Installation
Composer
composer require bschmitt/laravel-amqp
For Laravel 5.5+:
"bschmitt/laravel-amqp": "^3.1"
For Laravel < 5.5:
"bschmitt/laravel-amqp": "^2.0"
Quick Start
Publishing Messages
use Bschmitt\Amqp\Facades\Amqp; // Basic publish Amqp::publish('routing-key', 'message'); // Publish with queue creation Amqp::publish('routing-key', 'message', ['queue' => 'queue-name']); // Publish with message properties Amqp::publish('routing-key', 'message', [ 'priority' => 10, 'correlation_id' => 'unique-id', 'reply_to' => 'reply-queue', 'application_headers' => [ 'X-Custom-Header' => 'value' ] ]);
Consuming Messages
use Bschmitt\Amqp\Facades\Amqp; // Consume and acknowledge (using dynamic call) $amqp = app('Amqp'); $amqp->consume('queue-name', function ($message, $resolver) { echo $message->body; $resolver->acknowledge($message); $resolver->stopWhenProcessed(); }); // Consume forever $amqp = app('Amqp'); $amqp->consume('queue-name', function ($message, $resolver) { processMessage($message->body); $resolver->acknowledge($message); }, ['persistent' => true]); // Alternative: Using resolve() helper $amqp = resolve('Amqp'); $amqp->consume('queue-name', function ($message, $resolver) { processMessage($message->body); $resolver->acknowledge($message); });
RPC Pattern
// Client side - Make RPC call (using dynamic call) $amqp = app('Amqp'); $response = $amqp->rpc('rpc-queue', 'request-data', [], 30); // Server side - Process and reply (using dynamic call) $amqp = app('Amqp'); $amqp->consume('rpc-queue', function ($message, $resolver) { $result = processRequest($message->body); $resolver->reply($message, $result); $resolver->acknowledge($message); });
Listen to Multiple Routing Keys
$amqp = app('Amqp'); $amqp->listen(['key1', 'key2', 'key3'], function ($message, $resolver) { processMessage($message->body); $resolver->acknowledge($message); });
Artisan Commands
The package registers five console commands. Handler classes must implement Bschmitt\Amqp\Contracts\MessageHandlerInterface or expose an __invoke($message, $resolver) method. The $resolver is the active consumer and provides acknowledge(), reject(), reply(), and stopWhenProcessed().
amqp:work — long-running worker
php artisan amqp:work my-queue --handler="App\\Messaging\\ProcessOrderHandler"
| Option | Description |
|---|---|
--handler= |
Required. FQCN of your message handler |
--connection= |
Connection name from config/amqp.php |
--exchange= / --exchange-type= |
Override exchange settings |
--routing-key=* |
Routing key(s) to bind (repeatable) |
--prefetch-count= |
Enable QoS with this prefetch count |
--max-messages=0 |
Stop after N messages (0 = unlimited) |
--max-time=0 |
Stop after N seconds |
--memory=128 |
Exit if memory exceeds MB |
--stop-when-empty |
Exit when the queue is drained instead of waiting |
--requeue-on-error |
Requeue messages when the handler throws |
amqp:consume — process a fixed number of messages
php artisan amqp:consume my-queue --handler="App\\Messaging\\ProcessOrderHandler" --max-messages=10 php artisan amqp:consume my-queue --handler="App\\Messaging\\ProcessOrderHandler" --all
Defaults to one message per invocation. Use --all to drain the queue.
amqp:listen — listen on routing keys
php artisan amqp:listen order.created order.updated --handler="App\\Messaging\\OrderHandler"
Creates an auto-deleted queue (unless --queue= or --no-auto-delete is set) and binds it to every supplied routing key.
amqp:publish — publish from the CLI
php artisan amqp:publish order.created --body='{"id":42}' --exchange=orders --priority=5 php artisan amqp:publish order.created --file=./payload.json --headers='{"X-Source":"cli"}' php artisan amqp:publish order.created --body='{"id":42}' --delay-ms=5000 --exchange=orders
Use --delay-ms to schedule delivery (TTL+DLX by default, or --delay-strategy=plugin when the delayed-message exchange plugin is installed).
amqp:purge — empty a queue
php artisan amqp:purge my-queue --force
Retry options on amqp:work
| Option | Description |
|---|---|
--retry=N |
Wraps the handler in a RetryHandler and configures up to N retries (0 disables retries) |
--retry-delay=ms |
Base delay between retries in milliseconds (default 1000) |
--retry-backoff=fixed|exponential |
Backoff strategy (default fixed) |
--retry-multiplier=2.0 |
Growth factor for exponential backoff |
--retry-max-delay=ms |
Cap for the computed retry delay (0 = uncapped) |
--retry-jitter=ms |
Random jitter added to each retry delay |
--dlq=name |
Override the dead-letter queue name (default {queue}.dlq) |
--declare-topology |
Pre-declare the work + DLQ + retry queues before consuming |
--contract= |
FQCN of a MessageContractInterface to deserialize bodies into (passed as 3rd handler arg) |
--validate-schema |
Validate inbound JSON against the contract's schema() before invoking the handler |
See Retry & Dead-Letter Abstractions for the full picture.
Example handler
namespace App\Messaging; use Bschmitt\Amqp\Contracts\ConsumerInterface; use Bschmitt\Amqp\Contracts\MessageHandlerInterface; use PhpAmqpLib\Message\AMQPMessage; class ProcessOrderHandler implements MessageHandlerInterface { public function handle(AMQPMessage $message, ConsumerInterface $resolver, $typed = null): void { $order = $typed !== null ? $typed->toPayload() : json_decode($message->body, true); // ... process $order ... $resolver->acknowledge($message); } }
Laravel Queue Driver
Use this package as a native Laravel queue backend so jobs can be dispatched with dispatch(), Queue::push(), and processed with php artisan queue:work.
1. Publish AMQP config
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
2. Add queue connection
Merge the example from config/queue-amqp.php into config/queue.php:
'connections' => [ // ... 'amqp' => [ 'driver' => 'amqp', 'connection' => env('AMQP_ENV', 'production'), // key in config/amqp.php properties 'queue' => env('AMQP_QUEUE', 'default'), 'retry_after' => 90, ], ],
3. Set default queue connection (optional)
QUEUE_CONNECTION=amqp
4. Run the worker
php artisan queue:work amqp --queue=default
Jobs are published to your configured exchange with the queue name as the routing key. Delayed jobs use a TTL dead-letter queue per delay interval.
Delayed & released jobs
ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));
AmqpQueue::later() publishes to a per-TTL delay queue ({queue}.delay.{ttl_ms}) with
x-dead-letter-exchange / x-message-ttl so RabbitMQ delivers the job back to the
main queue when the delay expires. $job->release($seconds) uses the same mechanism.
Verify the driver
vendor/bin/phpunit --testdox \
--filter 'AmqpQueue|AmqpJob|AmqpConnector|AmqpServiceProviderQueue|QueueConfigResolver|LaravelQueue'
Full setup, architecture and troubleshooting: docs/content/queue-driver.md or the interactive docs site (docs/index.html).
Configuration
Laravel
Publish the configuration file:
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
Or manually copy vendor/bschmitt/laravel-amqp/config/amqp.php to config/amqp.php.
Lumen
Create a config folder in your Lumen root and copy the configuration file:
mkdir config cp vendor/bschmitt/laravel-amqp/config/amqp.php config/amqp.php
Register the service provider in bootstrap/app.php:
$app->configure('amqp'); $app->register(Bschmitt\Amqp\Providers\LumenServiceProvider::class); // For Lumen 5.2+, enable facades $app->withFacades(true, [ 'Bschmitt\Amqp\Facades\Amqp' => 'Amqp', ]);
Configuration Example
return [ 'use' => env('AMQP_ENV', 'production'), 'properties' => [ 'production' => [ 'host' => env('AMQP_HOST', 'localhost'), 'port' => env('AMQP_PORT', 5672), 'username' => env('AMQP_USER', 'guest'), 'password' => env('AMQP_PASSWORD', 'guest'), 'vhost' => env('AMQP_VHOST', '/'), 'exchange' => env('AMQP_EXCHANGE', 'amq.topic'), 'exchange_type' => env('AMQP_EXCHANGE_TYPE', 'topic'), 'consumer_tag' => 'consumer', 'ssl_options' => [], 'connect_options' => [], 'queue_properties' => ['x-ha-policy' => ['S', 'all']], 'exchange_properties' => [], 'timeout' => 0, // Management API (optional) 'management_api_url' => env('AMQP_MANAGEMENT_URL', 'http://localhost:15672'), 'management_api_user' => env('AMQP_MANAGEMENT_USER', 'guest'), 'management_api_password' => env('AMQP_MANAGEMENT_PASSWORD', 'guest'), ], ], ];
Documentation
Comprehensive Guides
- User Manual - Complete usage guide
- Release Notes - Version 3.4.0 changelog (latest: 3.4.0 minor release)
- FAQ - Common questions and answers
Wiki Documentation
- Getting Started - Installation and first steps
- Configuration - Configuration guide
- Publishing Messages - Publishing guide
- Consuming Messages - Consumption guide
- RPC Pattern - Request-response patterns
- Queue Management - Queue operations
- Management API - HTTP API integration
- Message Properties - Message properties
- Advanced Features - Advanced usage
- Architecture - Package architecture
- Testing - Testing guide
Module Documentation
See docs/modules/ for detailed module documentation:
- RPC Module
- Management Operations
- Management API
- Message Properties
- Consumer Prefetch
- And more...
Examples
Fanout Exchange
// Publishing Amqp::publish('', 'message', [ 'exchange_type' => 'fanout', 'exchange' => 'amq.fanout', ]); // Consuming (using dynamic call) $amqp = app('Amqp'); $amqp->consume('', function ($message, $resolver) { echo $message->body; $resolver->acknowledge($message); }, [ 'routing' => '', 'exchange' => 'amq.fanout', 'exchange_type' => 'fanout', 'queue_force_declare' => true, 'queue_exclusive' => true, 'persistent' => true ]);
Queue Management
// Get Amqp instance $amqp = app('Amqp'); // Purge queue $amqp->queuePurge('my-queue', ['queue' => 'my-queue']); // Delete queue $amqp->queueDelete('my-queue', ['queue' => 'my-queue']); // Get queue statistics $stats = $amqp->getQueueStats('my-queue', '/');
Management API
// Get Amqp instance $amqp = app('Amqp'); // Get queue statistics $stats = $amqp->getQueueStats('my-queue', '/'); // List connections $connections = $amqp->getConnections(); // Create policy $amqp->createPolicy('my-policy', [ 'pattern' => '^my-queue$', 'definition' => ['max-length' => 1000] ], '/');
Retry & Dead-Letter Abstractions
Three small primitives let you build production-grade retry pipelines without hand-rolling DLX wiring:
Bschmitt\Amqp\Support\RetryPolicy— declarativemax attempts+ backoff strategy (fixed, exponential, immediate, none) with optional cap and jitter.Bschmitt\Amqp\Support\DeadLetterTopology— describes the work queue, the DLQ, and the per-delay retry queues. Produces ready-to-use property arrays forpublish()/consume().Bschmitt\Amqp\Support\RetryHandler— decorator that wraps your handler. On exception it republishes the message to a TTL'd retry queue (which dead-letters back to the work queue when the TTL expires) and acknowledges the original delivery. When the retry budget is spent it rejects without requeue so RabbitMQ routes the message to the DLQ via thex-dead-letter-exchangeconfigured on the work queue.
Declare the topology once
use Bschmitt\Amqp\Support\DeadLetterTopology; use Bschmitt\Amqp\Support\RetryPolicy; $amqp = app('Amqp'); // RetryPolicy::exponential($maxAttempts, $baseDelayMs, $multiplier, $maxDelayMs) $policy = RetryPolicy::exponential(5, 1000, 2.0, 60000); $topology = DeadLetterTopology::for('orders.process', $policy) ->on('app.events', 'topic') ->withRoutingKey('orders.process'); // Idempotently creates: orders.process, orders.process.dlq, // and orders.process.retry.{1000,2000,4000,8000,16000} (capped at 60000). $amqp->declareRetryTopology($topology);
Consume with auto-retry / DLQ routing
$amqp->consumeWithRetry($topology, function ($message, $resolver) { processOrder(json_decode($message->body, true)); $resolver->acknowledge($message); });
When the handler throws:
RetryHandlerreads (and bumps) thex-retry-attemptapplication header.- If the next attempt still fits the policy, the message is republished to
orders.process.retry.{delayMs}with the computed TTL. RabbitMQ's DLX on that queue routes the message back toorders.processonce the TTL expires. - When the retry budget is exhausted, the handler rejects the message
without requeue and RabbitMQ forwards it to
orders.process.dlqvia the work queue'sx-dead-letter-exchange. - The
x-first-failed-atandx-last-errorheaders carry diagnostics forward across retries so DLQ inspection is meaningful.
Pick a policy
use Bschmitt\Amqp\Support\RetryPolicy; RetryPolicy::fixed(3, 1000); // 3 retries, 1s apart RetryPolicy::exponential(5, 500, 2.0, 30000); // 500ms doubling, capped at 30s RetryPolicy::immediate(2); // 2 retries with zero delay RetryPolicy::none(); // failures go straight to the DLQ
Wrap an existing handler manually
use Bschmitt\Amqp\Support\RetryHandler; $wrapped = $amqp->retryHandler($yourHandler, $topology, function ($level, $message, $context) { Log::log($level, $message, $context); }); $amqp->consume('orders.process', $wrapped, $topology->toWorkProperties());
Driving the worker from the CLI
php artisan amqp:work orders.process \
--handler="App\\Messaging\\ProcessOrderHandler" \
--retry=5 \
--retry-backoff=exponential \
--retry-delay=1000 \
--retry-multiplier=2.0 \
--retry-max-delay=60000 \
--dlq=orders.process.failed \
--declare-topology
See docs/content/advanced.md and the unit tests under test/Unit/Retry* /
test/Unit/DeadLetterTopologyTest.php for more examples.
Delayed Messaging & Publish Backoff
Schedule messages for later delivery or absorb transient broker errors on publish.
publishLater() — schedule delivery
$amqp = app('Amqp'); // TTL + dead-letter exchange (works on stock RabbitMQ) $amqp->publishLater('orders.reminder', json_encode(['orderId' => 42]), 60000, [ 'exchange' => 'shop.events', ]); // rabbitmq-delayed-message-exchange plugin (exchange must be x-delayed-message) $amqp->publishLater('orders.reminder', $body, 60000, [ 'exchange' => 'shop.delayed', 'delay_strategy' => 'plugin', ]);
DelayedPublisher creates a per-delay queue ({routing}.delayed.{ms}) with x-message-ttl and DLX routing back to the target exchange when using the default TTL strategy.
PublishBackoff — retry failed publishes
use Bschmitt\Amqp\Support\RetryPolicy; $amqp->withPublishBackoff(RetryPolicy::exponential(3, 100, 2.0))->run(function () use ($amqp) { return $amqp->publish('orders.created', $payload); });
This is separate from consumer-side RetryHandler — it retries the publish call itself when the broker throws.
Typed Message Contracts & DTO Serialization
Define message shapes as plain PHP classes and let the package handle JSON encoding/decoding.
use Bschmitt\Amqp\Support\TypedMessage; class OrderCreated extends TypedMessage { public $orderId; public $total; public $currency; public function __construct($orderId = null, $total = null, $currency = null) { $this->orderId = $orderId; $this->total = $total; $this->currency = $currency; } public static function routingKey() { return 'orders.created'; } public static function exchange() { return 'shop.events'; } }
$amqp = app('Amqp'); // Publish — picks up routing key + exchange from the contract $amqp->publishTyped(new OrderCreated('order-1', 19.99, 'USD')); // Consume — callback receives ($typed, $message, $resolver) $amqp->consumeTyped('orders.queue', OrderCreated::class, function ($order, $message, $resolver) { processOrder($order->orderId); $resolver->acknowledge($message); }); // Delayed typed publish $amqp->publishTypedLater(new OrderCreated('order-2', 9.99, 'USD'), 30000);
Swap the serializer via $amqp->setSerializer($mySerializer) when you need MessagePack, Avro, etc.
JSON Schema Validation
Contracts may expose a static schema() method returning a JSON Schema-style array. The package validates payloads on publish and consume using the bundled SchemaValidator (no external dependencies).
class OrderCreated extends TypedMessage { // ...properties... public static function schema() { return [ 'type' => 'object', 'required' => ['orderId', 'total', 'currency'], 'additionalProperties' => false, 'properties' => [ 'orderId' => ['type' => 'string', 'minLength' => 1], 'total' => ['type' => 'number', 'minimum' => 0], 'currency' => ['type' => 'string', 'enum' => ['USD', 'EUR', 'GBP']], ], ]; } }
Invalid payloads raise Bschmitt\Amqp\Exception\SchemaValidationException with a list of pointer-style error messages. On the CLI, combine --contract with --validate-schema on amqp:work.
Supported keywords include type, required, properties, additionalProperties, enum, const, minimum/maximum, minLength/maxLength, pattern, format (email, uri, uuid, date, date-time), items, oneOf/anyOf/allOf/not, and more — see docs/content/advanced.md.
Production Infrastructure
Exchange topology builder
Declare an exchange and multiple bound queues in one fluent builder:
use Bschmitt\Amqp\Facades\Amqp; use Bschmitt\Amqp\Support\ExchangeTopology; use Bschmitt\Amqp\Support\QueueProfile; $topology = ExchangeTopology::exchange('events', 'topic') ->bindQueue('orders.created', 'order.created') ->bindQueue('orders.shipped', 'order.shipped', QueueProfile::quorum()); Amqp::declareExchangeTopology($topology); // Publish using properties for a specific queue in the topology Amqp::publish('order.created', $payload, $topology->propertiesForQueue('orders.created'));
Shortcut: Amqp::exchangeTopology('events', 'topic')->bindQueue(...).
Quorum & priority queues
use Bschmitt\Amqp\Support\QueueProfile; Amqp::publish('jobs', $payload, QueueProfile::quorumWithPriority(10)->mergeInto([ 'queue' => 'jobs', 'routing' => 'jobs', ]));
Resilient connections & connection pool
use Bschmitt\Amqp\Facades\Amqp; use Bschmitt\Amqp\Managers\ConnectionPool; // Per-request resilient manager (reconnect + heartbeat staleness) $resilient = Amqp::resilientConnection(['host' => 'rabbitmq'], [ 'max_reconnect_attempts' => 5, 'heartbeat' => 30, ]); $channel = $resilient->getChannel(); // Long-lived worker pool (persistent keys survive disconnectAll(false)) $pool = Amqp::connectionPool(); $manager = $pool->connection('worker', ['use' => 'production', 'resilient' => true], true);
Correlation ID & distributed tracing
use Bschmitt\Amqp\Support\CorrelationContext; CorrelationContext::set('request-abc-123'); Amqp::publish('orders.created', $payload, [ 'propagate_correlation' => true, 'propagate_trace' => true, ]); Amqp::consumeWithLifecycle('orders.created', function ($message, $resolver) { // CorrelationContext::get() is populated when propagate_* flags are used }, null, [ 'propagate_correlation' => true, 'propagate_trace' => true, ]);
Bridge OpenTelemetry (or any APM) without a hard dependency:
use Bschmitt\Amqp\Support\CallbackTracePropagator; Amqp::setTracePropagator(new CallbackTracePropagator( function (array $carrier, $context) { // inject active span into $carrier return $carrier; }, function (array $carrier) { // extract TraceContext from $carrier or return null return null; } ));
Consumer lifecycle
use Bschmitt\Amqp\Support\ConsumerLifecycle; $lifecycle = (new ConsumerLifecycle()) ->registerSignalHandlers() ->onStopping(function ($lifecycle) { // flush buffers, close DB connections, etc. }); Amqp::consumeWithLifecycle('jobs', $handler, $lifecycle);
See docs/content/production-features.md for the full reference.
SAGA, Events, Middleware & Testing
SAGA workflow
use Bschmitt\Amqp\Facades\Amqp; $saga = Amqp::saga('checkout') ->step('reserveStock', $reserveStock, $releaseStock) ->step('chargeCard', $chargeCard, $refundCard) ->step('shipOrder', $shipOrder); $result = $saga->execute(['orderId' => 42]); if ($result->failed()) { Log::error('Saga failed', [ 'step' => $result->getFailedStep(), 'compensated' => $result->getCompensatedSteps(), 'error' => $result->getException()->getMessage(), ]); }
Compensations only run for steps that completed before the failure, in reverse order.
Laravel events
The package dispatches the following events through \Illuminate\Support\Facades\Event (and a local listener registry as a fallback for non-Laravel contexts):
| Event | When |
|---|---|
Bschmitt\Amqp\Events\MessagePublishing |
Right before a publish is sent |
Bschmitt\Amqp\Events\MessagePublished |
After a successful publish |
Bschmitt\Amqp\Events\MessageReceived |
When a message is received by the consume pipeline |
Bschmitt\Amqp\Events\MessageHandled |
After the handler completes |
Bschmitt\Amqp\Events\MessageFailed |
When the handler throws |
Listen in Laravel as usual:
Event::listen(\Bschmitt\Amqp\Events\MessageFailed::class, function ($event) { Log::warning('AMQP handler failed', ['error' => $event->exception->getMessage()]); });
Consume middleware
Wrap the consume handler with a pipeline:
use Bschmitt\Amqp\Facades\Amqp; Amqp::consumeWithMiddleware('orders', function ($message, $resolver) { // handle... }, [ function ($message, $next) { $start = microtime(true); $next($message); Log::info('handled', ['duration_ms' => (microtime(true) - $start) * 1000]); }, // ...or a ConsumeMiddlewareInterface instance ]);
Each middleware receives (AMQPMessage $message, callable $next) and can short-circuit by not calling $next.
Fake AMQP driver
In tests, replace the bound singleton with a recording fake:
use Bschmitt\Amqp\Core\Amqp; public function test_publishes_order_created() { $fake = Amqp::fake(); (new CreateOrder)->handle(); $fake->assertPublished('orders.created'); $fake->assertPublishedCount(1, 'orders.created'); $fake->assertNotPublished('orders.shipped'); }
The fake records both publish() and publishLater() calls; never touches the broker.
Async publishing with publisher confirms
$async = Amqp::asyncPublisher(['exchange' => 'events']) ->onAck(function ($tag) { /* metric: published */ }) ->onNack(function ($tag) { /* metric: failed */ }); foreach ($messages as $m) { $async->publish('events.created', json_encode($m)); } if (!$async->flush(30)) { Log::warning('Some publisher confirms timed out'); } $async->close();
AsyncPublisher keeps a single channel open with confirm_select and only waits for confirmations on flush(), so high-throughput publishers don't block on the per-message round-trip.
Scale & Interop
RPC abstraction
// Client $result = Amqp::rpcClient(['exchange' => 'rpc'])->asJson()->timeout(10) ->call('users.lookup', ['id' => 42]); if ($result->succeeded()) { $user = $result->body(); } // Server Amqp::rpcServer()->asJson()->serve('rpc.users', function ($request, $consumer) { return ['id' => $request['id'], 'name' => 'Ada']; });
RpcCallResult exposes succeeded(), timedOut(), and body().
Cross-service / polyglot messaging
Amqp::publishInterop( 'orders.created', ['orderId' => 99, 'total' => 12.50], 'orders.created', 'billing-service', ['exchange' => 'events'], '2.0' ); Amqp::consumeInterop('events.orders', function ($interop, $raw, $resolver) { $payload = \Bschmitt\Amqp\Support\InteropEnvelope::decodePayload($interop); // $interop->messageType, $interop->sourceService, $interop->schemaVersion });
Standard headers (x-message-type, x-schema-version, x-source-service) let Node, Go, or Java consumers route messages without PHP DTOs.
Observability & queue metrics
// In-process counters (per worker / request) $stats = Amqp::metrics()->snapshot(); // Broker-side queue depth + rates (Management API) $metrics = Amqp::queueMetrics('orders', '/'); Log::info('queue depth', $metrics->toArray());
Publish/consume paths increment MetricsCollector automatically when using publish(), consumeWithMiddleware(), or HighPerformanceWorker.
High-performance workers
Amqp::consumeOptimized('jobs', $handler, ['exchange' => 'work']); // Or explicitly: Amqp::highPerformanceWorker( \Bschmitt\Amqp\Support\WorkerOptions::throughput(100) )->run('jobs', $handler);
CLI: php artisan amqp:work jobs --handler=App\\Handlers\\JobHandler --optimized
See docs/content/scale-and-interop.md for the full reference.
gRPC-lite RPC
A typed, service-oriented RPC layer that feels like gRPC but rides on RabbitMQ. Define a service once, then call it from any process with typed DTOs.
Define the service contract
use Bschmitt\Amqp\Rpc\RpcService; use Bschmitt\Amqp\Rpc\RpcRequest; use Bschmitt\Amqp\Rpc\RpcResponse; class UserService extends RpcService { public static function queue(): string { return 'rpc.user-service'; } public static function methods(): array { return [ GetUserRequest::class => 'getUser', CreateUserRequest::class => 'createUser', ]; } } class GetUserRequest extends RpcRequest { public $id; public function __construct($id = null) { $this->id = $id; } public static function responseClass() { return GetUserResponse::class; } } class GetUserResponse extends RpcResponse { public $id; public $name; public function __construct($id = null, $name = null) { $this->id = $id; $this->name = $name; } }
Call from any client
use Rpc; // facade alias auto-registered $response = Rpc::call( UserService::class, GetUserRequest::make(['id' => 5]) ); echo $response->name; // GetUserResponse instance, hydrated for you
Rpc::call() automatically:
- Resolves the queue from the service contract.
- JSON-encodes the request DTO.
- Issues a synchronous AMQP RPC round-trip via the existing primitive.
- Hydrates the reply into the request's
responseClass()(or returns the raw decoded array).
Throws RpcTimeoutException if no reply arrives, or RpcException if the server returned an error envelope.
Serve on the server side
use Rpc; class UserServiceHandler { public function getUser(GetUserRequest $request): GetUserResponse { $user = User::findOrFail($request->id); return GetUserResponse::make([ 'id' => $user->id, 'name' => $user->name, ]); } public function createUser(CreateUserRequest $request): GetUserResponse { $user = User::create(['name' => $request->name]); return GetUserResponse::make(['id' => $user->id, 'name' => $user->name]); } } Rpc::register(UserService::class, UserServiceHandler::class) ->serve(UserService::class);
The handler may be an instance or a container-resolvable FQCN (Laravel only). Handler exceptions are wrapped into an _rpc_error envelope so the client raises a typed RpcException with the original message and class name.
Configurable
// Global default timeout Rpc::defaultTimeout(10); // Per-call timeout + extra publish properties Rpc::call(UserService::class, GetUserRequest::make(['id' => 1]), 5, [ 'exchange' => 'rpc.svc', ]);
RPC latency & events
Every Rpc::call() records timing in Amqp::rpcMetrics() and dispatches Laravel events you can wire to Pulse, logs, or APM:
use Bschmitt\Amqp\Events\RpcCallCompleted; use Bschmitt\Amqp\Events\RpcCallFailed; Event::listen(RpcCallCompleted::class, fn ($e) => Log::info('rpc.ok', [ 'service' => $e->service, 'request' => $e->request, 'ms' => $e->durationMs, ])); $stats = Amqp::rpcMetrics()->snapshot(); // ['UserService::GetUserRequest' => ['count' => 42, 'p95_ms' => 12.5, 'error_rate' => 0.02, ...]]
Lower-level RpcClient::call() also returns RpcCallResult::durationMs().
See docs/content/grpc-lite-rpc.md for the full reference.
Laravel Messaging Platform
A set of higher-level building blocks that turn the package from "an AMQP client" into a full microservice toolkit: service discovery, sagas, message contracts, dead-letter management, declarative retry, monitoring, automatic context propagation, an audit log, and an event bridge.
Service Discovery (Rpc::service(...))
Skip exchange/routing-key/queue gymnastics — register a short name and call by that name.
use Bschmitt\Amqp\Facades\Rpc; // Either: explicit registration Rpc::services()->register('payments', PaymentsService::class); // Or: opt-in auto-discovery (service exposes `public static function alias()`) class PaymentsService extends RpcService { public static function queue(): string { return 'rpc.payments'; } public static function methods(): array { return [GetPayment::class => 'find']; } public static function alias(): ?string { return 'payments'; } } Rpc::services()->autodiscover([PaymentsService::class]); $response = Rpc::service('payments') ->timeout(5) ->call(GetPayment::make(['id' => 123]));
Rpc::service() accepts an alias or a service FQCN.
Saga Facade
Saga::make()->step()->compensate() with reverse-order compensation when a step throws.
use Bschmitt\Amqp\Facades\Saga; $result = Saga::make('checkout') ->step('reserve', fn($ctx) => $stock->reserve($ctx['orderId'])) ->compensate(fn($ctx) => $stock->release($ctx['orderId'])) ->step('charge', fn($ctx) => $payments->charge($ctx['amount'])) ->compensate(fn($ctx, $tx) => $payments->refund($tx)) ->execute(['orderId' => 1, 'amount' => 49.99]); if (!$result->succeeded()) { Log::error('Saga failed at ' . $result->getFailedStep(), [ 'compensated' => $result->getCompensatedSteps(), ]); }
Message Contracts (OrderCreated::dispatch(...))
TypedMessage now exposes make() and dispatch() (and dispatchLater() for the delayed-queue variant).
use Bschmitt\Amqp\Support\TypedMessage; class OrderCreated extends TypedMessage { public $orderId; public $total; public static function name(): string { return 'orders.created'; } } OrderCreated::dispatch(['orderId' => 'o-1', 'total' => 9.99]); OrderCreated::dispatchLater(['orderId' => 'o-1'], 2_000); // 2s delay
Dead-Letter Management
use Bschmitt\Amqp\Facades\Amqp; Amqp::deadLetters()->for('orders.dlq')->count(); // 17 Amqp::deadLetters()->for('orders.dlq')->peek(20); // non-destructive sample Amqp::deadLetters()->for('orders.dlq')->summarize(100); // group by reason / error Amqp::deadLetters()->for('orders.dlq')->messages(10); // drain & inspect (destructive) Amqp::deadLetters()->for('orders.dlq')->replayTo('orders', 50); Amqp::deadLetters()->for('orders.dlq')->purge();
CLI:
php artisan amqp:dlq inspect orders.dlq php artisan amqp:dlq summary orders.dlq --limit=200 --json php artisan amqp:dlq replay orders.dlq --target=orders --limit=50 php artisan amqp:dlq purge orders.dlq --force
Lifecycle events: DeadLetterDetected, DeadLetterReplayed, DeadLetterPurged.
Declarative Retry (#[Retry])
use Bschmitt\Amqp\Attributes\Retry; use Bschmitt\Amqp\Support\RetryStrategy; use Bschmitt\Amqp\Support\RetryPolicy; class CreateOrderHandler { #[Retry(attempts: 5, strategy: RetryStrategy::EXPONENTIAL, delayMs: 500)] public function handle($message): void { /* ... */ } } $policy = RetryPolicy::fromAttribute(CreateOrderHandler::class, 'handle'); $amqp->consumeWithRetry('orders', $handler, $policy);
On PHP 7.x the attribute parses as a comment (the package still loads); call sites that want the attribute need PHP 8+.
Monitoring Dashboard
$snapshot = Amqp::dashboard(['orders', 'orders.dlq']) ->deadLetters(['orders.dlq']) ->lagThresholds(1000, 60.0, 300) ->snapshot(); // process, queues (with lag / lag_seconds / lagging), dead_letters, rpc, lagging[], generated
CLI:
php artisan amqp:monitor --queue=orders --queue=orders.dlq --json
php artisan amqp:monitor --queue=orders --dlq=orders.dlq --rpc
php artisan amqp:monitor --queue=orders --lag-threshold=1000 --lag-seconds=60
# exits 1 when any queue breaches a lag threshold (cron-friendly)
Wire the snapshot into any HTTP route (Laravel, Symfony, Slim) to expose a JSON dashboard.
Causation ID Propagation
CorrelationContext::inheritFromMessage() now picks up the inbound message_id as the causation_id for everything published afterwards, so downstream services can trace "this happened because of that" through a chain.
CorrelationContext::inheritFromMessage($incoming); Amqp::publish('orders.created', $body, [ 'propagate_correlation' => true, 'message_id' => uniqid('msg_', true), ]); // outbound has `correlation_id`, `x-correlation-id`, and `x-causation-id` set
Correlation Chain Visualisation
CorrelationChain walks the MessageStore, groups entries by correlation_id, and rebuilds the causation tree using the x-causation-id header — no UI server required.
use Bschmitt\Amqp\Support\CorrelationChain; $chain = new CorrelationChain($amqp->messageStore()); $summary = $chain->summarize('corr_abc123'); // total, published, consumed, routings, first_at, last_at, duration_ms $tree = $chain->tree('corr_abc123'); // nested ['entry' => ..., 'children' => [...]] echo $chain->render($tree); // ASCII tree, perfect for logs
CLI:
php artisan amqp:trace corr_abc123 php artisan amqp:trace corr_abc123 --summary php artisan amqp:trace corr_abc123 --json --limit=50
Sample output:
correlation_id: corr_abc123
messages: 4 (published=3, consumed=1)
span: 18.42 ms
routings: orders.created(1), orders.shipped(2), orders.invoiced(1)
[published] >> orders.created (msg=msg_root)
├── [published] >> orders.shipped (msg=msg_a)
│ └── [published] >> orders.invoiced (msg=msg_grand)
└── [consumed] << orders.shipped (msg=msg_b)
Laravel Pulse Integration
When laravel/pulse is installed the package auto-registers AmqpPulseRecorder and records the following metric types so they show up under Pulse::values($type) and in custom cards:
| Type | Key | Value |
|---|---|---|
amqp_publish |
routing key | 1 (count) |
amqp_handle |
queue | duration (ms) |
amqp_fail |
queue | 1 (count) |
amqp_rpc |
Service::Request (short name) |
duration (ms) |
amqp_rpc_fail |
Service::Request |
1 (count) |
amqp_dlq |
dead-letter queue | sampled msg count |
Disable the auto-subscription in config/amqp.php:
return [ // ... 'pulse_integration' => false, ];
The recorder is a silent no-op when Pulse is not installed — no exceptions, no log spam.
OpenTelemetry Bridge
OpenTelemetryTracePropagator plugs the open-telemetry/api SDK into the package's TracePropagatorInterface so that the active OTel span context is auto-injected into every AMQP traceparent / tracestate header.
use Bschmitt\Amqp\Contracts\TracePropagatorInterface; use Bschmitt\Amqp\Support\OpenTelemetryTracePropagator; // In a service provider: $this->app->singleton(TracePropagatorInterface::class, function () { return new OpenTelemetryTracePropagator(); // Or pass an explicit \OpenTelemetry\Context\Propagation\TextMapPropagatorInterface });
When the SDK is absent the propagator falls back to W3C generation (W3cTracePropagator), so the same wiring works on stripped-down environments and in CI.
MessageStore (audit log / event-sourcing seed)
use Bschmitt\Amqp\Support\InMemoryMessageStore; $amqp->setMessageStore(new InMemoryMessageStore()); Amqp::publish('orders.created', '{}'); $entries = $amqp->messageStore()->all(['direction' => 'published']);
Implement MessageStoreInterface to back it with Eloquent / Redis / files for durable replay.
Async Laravel Events
use Bschmitt\Amqp\Contracts\ShouldPublishToAmqpInterface; class OrderCreated implements ShouldPublishToAmqpInterface { public function __construct(public string $orderId) {} } // config/amqp.php return [ // ... 'broadcast_laravel_events' => true, ]; event(new OrderCreated('o-1')); // auto-published to RabbitMQ with routing key `order_created`
Override amqpRouting(), amqpExchange(), or amqpPayload() on the event to customise routing.
Testing
The package includes comprehensive test coverage:
# Run all tests php vendor/bin/phpunit # Run unit tests only php vendor/bin/phpunit test/Unit/ # Run integration tests only php vendor/bin/phpunit test/Integration/
Test Requirements:
- RabbitMQ server running (for integration tests)
- Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
See Testing Guide for more information.
Version 3.1.0+ Highlights
New Methods
RPC:
$amqp->rpc($routingKey, $request, $properties, $timeout)- Make RPC calls (use$amqp = app('Amqp'))Consumer::reply($message, $response, $properties)- Send RPC responses$amqp->listen($routingKeys, $callback, $properties)- Auto-create queues with multiple bindings (use$amqp = app('Amqp'))
Management:
$amqp->queuePurge($queue, $properties)- Purge queue (use$amqp = app('Amqp'))$amqp->queueDelete($queue, $ifUnused, $ifEmpty, $properties)- Delete queue$amqp->queueUnbind(...)- Unbind queue$amqp->exchangeDelete(...)- Delete exchange$amqp->exchangeUnbind(...)- Unbind exchange
Management API:
$amqp->getQueueStats($queue, $vhost, $properties)- Queue statistics (use$amqp = app('Amqp'))$amqp->getConnections($connectionName, $properties)- List connections$amqp->getChannels($channelName, $properties)- List channels$amqp->getNodes($nodeName, $properties)- Cluster nodes$amqp->getPolicies($properties)- List policies$amqp->createPolicy(...)- Create policy$amqp->updatePolicy(...)- Update policy$amqp->deletePolicy(...)- Delete policy$amqp->listFeatureFlags($properties)- List feature flags$amqp->getFeatureFlag($name, $properties)- Get feature flag
Helpers:
$amqp->getConnectionConfig($connectionName)- Get connection config (use$amqp = app('Amqp'))
Note: For consume(), listen(), rpc(), and all management methods, you must resolve the Amqp instance from the container using $amqp = app('Amqp') or $amqp = resolve('Amqp'). The static facade Amqp:: works for publish() but not for consume() and other instance methods.
Kubernetes & Cloud Native
Liveness / readiness probes
Two complementary surfaces — HTTP routes for sidecars and a CLI for exec probes — both backed by the same HealthState + HealthCheck pair.
1. HTTP routes
Enable in config/amqp.php (or via AMQP_PROBES_ENABLED=true):
'probes' => [ 'enabled' => true, 'prefix' => 'amqp/health', // GET /amqp/health/live, /ready, / 'middleware' => [], // optional middleware (e.g. ['api']) 'state_file' => storage_path('framework/amqp-health.json'), 'heartbeat_age' => 60, // seconds before liveness flips to 503 'queues' => ['orders', 'orders.dlq'], 'max_backlog' => 5000, ],
The service provider registers:
| Method | Path | Response |
|---|---|---|
| GET | /amqp/health/live |
200 alive / 503 dead |
| GET | /amqp/health/ready |
200 ready / 503 not ready |
| GET | /amqp/health/ |
combined snapshot |
Wire it from your consumer:
use Bschmitt\Amqp\Support\ConsumerLifecycle; use Bschmitt\Amqp\Support\HealthState; $lifecycle = (new ConsumerLifecycle()) ->withHealth(HealthState::instance(storage_path('framework/amqp-health.json'))) ->registerSignalHandlers(); Amqp::consumeWithLifecycle('orders', $handler, $lifecycle);
2. CLI exec probe (sidecar / livenessProbe.exec.command)
# Readiness (default) php artisan amqp:health php artisan amqp:health --queue=orders --backlog=1000 # Liveness php artisan amqp:health --probe=live --heartbeat-age=30 # Combined snapshot php artisan amqp:health --all --state-file=/var/run/amqp-health.json
Exit codes: 0 = healthy, 1 = unhealthy — exactly what livenessProbe.exec / readinessProbe.exec expect.
Consumer autoscaling recommendations
AutoscalingAdvisor is a pure function that turns a QueueMetrics snapshot into a recommended replica count and a ready-to-paste KEDA trigger:
use Bschmitt\Amqp\Support\AutoscalingAdvisor; $metrics = Amqp::queueMetrics('orders'); $advice = (new AutoscalingAdvisor()) ->messagesPerConsumer(100) ->maxLagSeconds(15.0) ->minReplicas(1) ->maxReplicas(20) ->advise($metrics); // $advice['desired_consumers'] => 4 // $advice['action'] => 'scale_up' // $advice['reasons'] => ['depth 350 / 100 ...', 'lag 20s > 15s -> +1 ...'] // $advice['keda'] => KEDA RabbitMQ trigger spec
CLI form:
php artisan amqp:scale orders orders.priority \
--per-consumer=100 --max=20 --lag-seconds=15
php artisan amqp:scale orders --keda # emit only the KEDA trigger
php artisan amqp:scale orders --json --fail-on-scale-up # CI-friendly
The --keda output drops straight into a ScaledObject manifest under spec.triggers.
Laravel Cloud / managed hosting compatibility
LaravelCloud detects managed environments (Laravel Cloud, Forge, Vapor, Render, Fly.io) and, when amqp.cloud.auto_hydrate is true (default), parses an AMQP_URL / CLOUDAMQP_URL / RABBITMQ_URL DSN into the active connection block on register() — without overwriting explicit config:
AMQP_URL=amqps://app:secret@rabbit.cloudamqp.com/%2Fprod
Explicit AMQP_HOST / AMQP_USER / etc. still win. You can also call the detector directly:
use Bschmitt\Amqp\Support\LaravelCloud; if (LaravelCloud::isHosted()) { logger()->info('amqp hosted env', LaravelCloud::summary()); } $props = LaravelCloud::parseDsn(env('AMQP_URL'));
Multi-region deployment support
Configure region-scoped connection keys, then resolve / fail over with locality preference:
// config/amqp.php 'regions' => [ 'enabled' => true, 'connections' => ['production-us', 'production-eu', 'production-apac'], 'primary' => null, // null = match LARAVEL_CLOUD_REGION/AWS_REGION 'cooldown_seconds' => 30, ],
use Bschmitt\Amqp\Support\MultiRegionConnection; $resolver = app(MultiRegionConnection::class); // Single attempt with locality preference $connectionKey = $resolver->pick(); // 'production-us' // Run a publish across regions until one succeeds $resolver->withFailover(function ($region) { Amqp::publish('orders.created', $payload, ['use' => $region]); }); // Fan-out to every region (e.g. announcements) foreach ($resolver->each() as $region) { Amqp::publish('events.maintenance', $payload, ['use' => $region]); }
Failed regions cool down for the configured window before re-entering rotation.
Backward Compatibility
Version 3.4.0 is fully backward compatible with previous versions. All existing code will continue to work without modifications.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Credits
- Some concepts were used from mookofe/tail
- Built and tested with
rabbitmq:3-managementDocker image
License
This package is open-sourced software licensed under the MIT license.
Support
For issues, questions, or contributions:
- GitHub Issues: https://github.com/bschmitt/laravel-amqp/issues
- Documentation: See
docs/directory - FAQ: docs/laravel-amqp.wiki/FAQ.md
Version: 3.4.0
Status: Ready