bschmitt / laravel-amqp
AMQP wrapper for Laravel and Lumen to publish and consume messages
Installs: 2 194 576
Dependents: 7
Suggesters: 0
Security: 0
Stars: 273
Watchers: 11
Forks: 85
Open Issues: 24
pkg:composer/bschmitt/laravel-amqp
Requires
- php: ^7.3|^8.0
- illuminate/support: >=6.20.0
- php-amqplib/php-amqplib: ^3.0
Requires (Dev)
- illuminate/config: >=5.5
- mockery/mockery: ^1.2
- phpoption/phpoption: 1.7.x-dev
- phpunit/phpunit: >=6.0
- squizlabs/php_codesniffer: ^3.0@dev
- vlucas/phpdotenv: ^5.3
README
A detailed AMQP wrapper for Laravel and Lumen to publish and consume messages, especially from RabbitMQ. This package provides full support for RabbitMQ features including RPC patterns, management operations, message properties, and more.
Features
Core Features
- Advanced queue configuration
- Easy message publishing to queues
- Flexible queue consumption with useful options
- Support for all RabbitMQ exchange types (topic, direct, fanout, headers)
- Full AMQP message properties support
Version 3.1.0 New Features
- RPC Pattern Support - Built-in request-response patterns with
rpc()andreply()methods - Queue Management - Programmatic control (purge, delete, unbind)
- Management HTTP API - Full integration with RabbitMQ Management API
- Policy Management - Create, update, and delete policies programmatically
- Feature Flags - Query RabbitMQ feature flags
- Enhanced Message Properties - Full support for priority, correlation_id, headers, etc.
- Listen Method - Auto-create queues and bind to multiple routing keys
- Connection Configuration Helper - Easy access to connection configs
Advanced Features
- Publisher Confirms - Guaranteed message delivery
- Consumer Prefetch (QoS) - Rate limiting and flow control
- Queue Types - Classic, Quorum, and Stream queues
- Dead Letter Exchanges - Message routing for failed messages
- Message Priority - Priority-based message processing
- TTL Support - Message and queue expiration
- Lazy Queues - Disk-based message storage
- Alternate Exchange - Unroutable message handling
Requirements
- PHP 8.1 or higher
- Laravel 8.x / 9.x / 10.x / 11.x or Lumen 8.x / 9.x / 10.x
- RabbitMQ 3.x (tested with
rabbitmq:3-managementDocker image)
Installation
Composer
composer require bschmitt/laravel-amqp
For Laravel 5.5+:
"bschmitt/laravel-amqp": "^3.1"
For Laravel < 5.5:
"bschmitt/laravel-amqp": "^2.0"
Quick Start
Publishing Messages
use Bschmitt\Amqp\Facades\Amqp; // Basic publish Amqp::publish('routing-key', 'message'); // Publish with queue creation Amqp::publish('routing-key', 'message', ['queue' => 'queue-name']); // Publish with message properties Amqp::publish('routing-key', 'message', [ 'priority' => 10, 'correlation_id' => 'unique-id', 'reply_to' => 'reply-queue', 'application_headers' => [ 'X-Custom-Header' => 'value' ] ]);
Consuming Messages
use Bschmitt\Amqp\Facades\Amqp; // Consume and acknowledge (using dynamic call) $amqp = app('Amqp'); $amqp->consume('queue-name', function ($message, $resolver) { echo $message->body; $resolver->acknowledge($message); $resolver->stopWhenProcessed(); }); // Consume forever $amqp = app('Amqp'); $amqp->consume('queue-name', function ($message, $resolver) { processMessage($message->body); $resolver->acknowledge($message); }, ['persistent' => true]); // Alternative: Using resolve() helper $amqp = resolve('Amqp'); $amqp->consume('queue-name', function ($message, $resolver) { processMessage($message->body); $resolver->acknowledge($message); });
RPC Pattern
// Client side - Make RPC call (using dynamic call) $amqp = app('Amqp'); $response = $amqp->rpc('rpc-queue', 'request-data', [], 30); // Server side - Process and reply (using dynamic call) $amqp = app('Amqp'); $amqp->consume('rpc-queue', function ($message, $resolver) { $result = processRequest($message->body); $resolver->reply($message, $result); $resolver->acknowledge($message); });
Listen to Multiple Routing Keys
$amqp = app('Amqp'); $amqp->listen(['key1', 'key2', 'key3'], function ($message, $resolver) { processMessage($message->body); $resolver->acknowledge($message); });
Configuration
Laravel
Publish the configuration file:
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
Or manually copy vendor/bschmitt/laravel-amqp/config/amqp.php to config/amqp.php.
Lumen
Create a config folder in your Lumen root and copy the configuration file:
mkdir config cp vendor/bschmitt/laravel-amqp/config/amqp.php config/amqp.php
Register the service provider in bootstrap/app.php:
$app->configure('amqp'); $app->register(Bschmitt\Amqp\Providers\LumenServiceProvider::class); // For Lumen 5.2+, enable facades $app->withFacades(true, [ 'Bschmitt\Amqp\Facades\Amqp' => 'Amqp', ]);
Configuration Example
return [ 'use' => 'production', 'properties' => [ 'production' => [ 'host' => env('AMQP_HOST', 'localhost'), 'port' => env('AMQP_PORT', 5672), 'username' => env('AMQP_USER', 'guest'), 'password' => env('AMQP_PASSWORD', 'guest'), 'vhost' => env('AMQP_VHOST', '/'), 'exchange' => env('AMQP_EXCHANGE', 'amq.topic'), 'exchange_type' => env('AMQP_EXCHANGE_TYPE', 'topic'), 'consumer_tag' => 'consumer', 'ssl_options' => [], 'connect_options' => [], 'queue_properties' => ['x-ha-policy' => ['S', 'all']], 'exchange_properties' => [], 'timeout' => 0, // Management API (optional) 'management_api_url' => env('AMQP_MANAGEMENT_URL', 'http://localhost:15672'), 'management_api_user' => env('AMQP_MANAGEMENT_USER', 'guest'), 'management_api_password' => env('AMQP_MANAGEMENT_PASSWORD', 'guest'), ], ], ];
Documentation
Comprehensive Guides
- User Manual - Complete usage guide
- Release Notes - Version 3.1.0 changelog
- FAQ - Common questions and answers
Wiki Documentation
- Getting Started - Installation and first steps
- Configuration - Configuration guide
- Publishing Messages - Publishing guide
- Consuming Messages - Consumption guide
- RPC Pattern - Request-response patterns
- Queue Management - Queue operations
- Management API - HTTP API integration
- Message Properties - Message properties
- Advanced Features - Advanced usage
- Architecture - Package architecture
- Testing - Testing guide
Module Documentation
See docs/modules/ for detailed module documentation:
- RPC Module
- Management Operations
- Management API
- Message Properties
- Consumer Prefetch
- And more...
Examples
Fanout Exchange
// Publishing Amqp::publish('', 'message', [ 'exchange_type' => 'fanout', 'exchange' => 'amq.fanout', ]); // Consuming (using dynamic call) $amqp = app('Amqp'); $amqp->consume('', function ($message, $resolver) { echo $message->body; $resolver->acknowledge($message); }, [ 'routing' => '', 'exchange' => 'amq.fanout', 'exchange_type' => 'fanout', 'queue_force_declare' => true, 'queue_exclusive' => true, 'persistent' => true ]);
Queue Management
// Get Amqp instance $amqp = app('Amqp'); // Purge queue $amqp->queuePurge('my-queue', ['queue' => 'my-queue']); // Delete queue $amqp->queueDelete('my-queue', ['queue' => 'my-queue']); // Get queue statistics $stats = $amqp->getQueueStats('my-queue', '/');
Management API
// Get Amqp instance $amqp = app('Amqp'); // Get queue statistics $stats = $amqp->getQueueStats('my-queue', '/'); // List connections $connections = $amqp->getConnections(); // Create policy $amqp->createPolicy('my-policy', [ 'pattern' => '^my-queue$', 'definition' => ['max-length' => 1000] ], '/');
Testing
The package includes comprehensive test coverage:
# Run all tests php vendor/bin/phpunit # Run unit tests only php vendor/bin/phpunit test/Unit/ # Run integration tests only php vendor/bin/phpunit test/Integration/
Test Requirements:
- RabbitMQ server running (for integration tests)
- Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
See Testing Guide for more information.
Version 3.1.0 Highlights
New Methods
RPC:
$amqp->rpc($routingKey, $request, $properties, $timeout)- Make RPC calls (use$amqp = app('Amqp'))Consumer::reply($message, $response, $properties)- Send RPC responses$amqp->listen($routingKeys, $callback, $properties)- Auto-create queues with multiple bindings (use$amqp = app('Amqp'))
Management:
$amqp->queuePurge($queue, $properties)- Purge queue (use$amqp = app('Amqp'))$amqp->queueDelete($queue, $ifUnused, $ifEmpty, $properties)- Delete queue$amqp->queueUnbind(...)- Unbind queue$amqp->exchangeDelete(...)- Delete exchange$amqp->exchangeUnbind(...)- Unbind exchange
Management API:
$amqp->getQueueStats($queue, $vhost, $properties)- Queue statistics (use$amqp = app('Amqp'))$amqp->getConnections($connectionName, $properties)- List connections$amqp->getChannels($channelName, $properties)- List channels$amqp->getNodes($nodeName, $properties)- Cluster nodes$amqp->getPolicies($properties)- List policies$amqp->createPolicy(...)- Create policy$amqp->updatePolicy(...)- Update policy$amqp->deletePolicy(...)- Delete policy$amqp->listFeatureFlags($properties)- List feature flags$amqp->getFeatureFlag($name, $properties)- Get feature flag
Helpers:
$amqp->getConnectionConfig($connectionName)- Get connection config (use$amqp = app('Amqp'))
Note: For consume(), listen(), rpc(), and all management methods, you must resolve the Amqp instance from the container using $amqp = app('Amqp') or $amqp = resolve('Amqp'). The static facade Amqp:: works for publish() but not for consume() and other instance methods.
Backward Compatibility
Version 3.1.0 is fully backward compatible with previous versions. All existing code will continue to work without modifications.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Credits
- Some concepts were used from mookofe/tail
- Built and tested with
rabbitmq:3-managementDocker image
License
This package is open-sourced software licensed under the MIT license.
Support
For issues, questions, or contributions:
- GitHub Issues: https://github.com/bschmitt/laravel-amqp/issues
- Documentation: See
docs/directory - FAQ: docs/laravel-amqp.wiki/FAQ.md
Version: 3.1.0
Status: Ready