avto-dev / amqp-rabbit-laravel-queue
RabbitMQ laravel queue driver
Installs: 19 710
Dependents: 0
Suggesters: 0
Security: 0
Stars: 9
Watchers: 3
Forks: 11
Open Issues: 2
Requires
- php: ^8.1
- ext-amqp: *
- ext-json: *
- avto-dev/amqp-rabbit-manager: ^2.10
- illuminate/container: ^10.0 || ^11.0
- illuminate/contracts: ^10.0 || ^11.0
- illuminate/queue: ^10.0 || ^11.0
- illuminate/support: ^10.0 || ^11.0
- symfony/console: ^6.0 || ^7.0
Requires (Dev)
- laravel/laravel: ^10.0 || ^11.0
- mockery/mockery: ^1.6
- nesbot/carbon: ^2.66
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^10.5
- symfony/process: ^6.0 || ^7.0
README
RabbitMQ-based Laravel queue driver
This package allows to use RabbitMQ queues for queued Laravel (prioritized) jobs. Fully configurable.
Installed php extension ext-amqp
is required. Installation steps can be found in Dockerfile.
For jobs delaying you also should install rabbitmq-delayed-message-exchange
plugin for RabbitMQ. Delaying is optional feature.
Install
Important: Before using this package you should install
avto-dev/amqp-rabbit-manager
into your application. Installation steps can be found here.
Require this package with composer using the following command:
$ composer require avto-dev/amqp-rabbit-laravel-queue "^2.0"
Installed
composer
is required (how to install composer). Also you need to fix the major version of package.
You need to fix the major version of package.
After that you should modify your configuration files:
./config/rabbitmq.php
RabbitMQ queues and exchanges configuration:
<?php use Interop\Amqp\AmqpQueue; use Interop\Amqp\AmqpTopic; return [ // ... 'queues' => [ 'jobs' => [ 'name' => env('JOBS_QUEUE_NAME', 'jobs'), 'flags' => AmqpQueue::FLAG_DURABLE, // Remain queue active when a server restarts 'arguments' => [ 'x-max-priority' => 255, // @link <https://www.rabbitmq.com/priority.html> ], 'consumer_tag' => null, ], 'failed' => [ 'name' => env('FAILED_JOBS_QUEUE_NAME', 'failed-jobs'), 'flags' => AmqpQueue::FLAG_DURABLE, 'arguments' => [ 'x-message-ttl' => 604800000, // 7 days, @link <https://www.rabbitmq.com/ttl.html> 'x-queue-mode' => 'lazy', // @link <https://www.rabbitmq.com/lazy-queues.html> ], 'consumer_tag' => null, ], ], // ... 'exchanges' => [ // RabbitMQ Delayed Message Plugin is required (@link: <https://git.io/fj4SE>) 'delayed-jobs' => [ 'name' => env('DELAYED_JOBS_EXCHANGE_NAME', 'jobs.delayed'), 'type' => 'x-delayed-message', 'flags' => AmqpTopic::FLAG_DURABLE, // Remain active when a server restarts 'arguments' => [ 'x-delayed-type' => AmqpTopic::TYPE_DIRECT, ], ], ], // ... 'setup' => [ 'rabbit-default' => [ 'queues' => [ 'jobs', 'failed', ], 'exchanges' => [ 'delayed-jobs' ], ], ], ];
./config/queue.php
Laravel queue settings:
<?php use AvtoDev\AmqpRabbitLaravelQueue\Connector; return [ // ... 'default' => env('QUEUE_DRIVER', 'rabbitmq'), // ... 'connections' => [ // ... 'rabbitmq' => [ 'driver' => Connector::NAME, 'connection' => 'rabbit-default', 'queue_id' => 'jobs', 'delayed_exchange_id' => 'delayed-jobs', 'timeout' => (int) env('QUEUE_TIMEOUT', 0), // The timeout is in milliseconds 'resume' => (bool) env('QUEUE_RESUME', false), // Resume consuming when timeout is over ], ], // ... 'failed' => [ 'connection' => 'rabbit-default', 'queue_id' => 'failed', ], ];
resume
can be used with non-zerotimeout
value for periodic connection reloading (for example, if you set'timeout' => 30000
and'resume' => true
, queue worker will unsubscribe and subscribe back to the queue every 30 seconds without process exiting).
You can remove delayed_exchange_id
for disabling delayed jobs feature.
At the end, don't forget to execute command php ./artisan rabbit:setup
.
How jobs delaying works?
Very simple:
Usage
You can dispatch your jobs as usual (dispatch(new Job)
or dispatch(new Job)->delay(10)
), commands like queue:work
, queue:failed
, queue:retry
and others works fine.
Additional features:
- Jobs delaying (plugin
rabbitmq_delayed_message_exchange
for RabbitMQ server is required); - Jobs priority (job should implements
PrioritizedJobInterface
interface); - Automatically delayed messages exchanges bindings (only if you use command
rabbit:setup
for queues and exchanges creation); - The ability to store the state of
job
State storing
Using this package you can store any variables (except resources and callable entities) between job restarts (just use trait WithJobStateTrait
in your job class). But you should remember - state is available only inside job handle
method.
Consumer custom tag prefix
Every consumer has an identifier that is used by client libraries to determine what handler to invoke for a given delivery. Their names vary from protocol to protocol. Consumer tags and subscription IDs are two most commonly used terms.
If you want to add custom prefix to the consumer tag, you can specify it with an additional argument in the AvtoDev\AmqpRabbitLaravelQueue\Worker::__construct
method.
⚠️ Warning
Be careful with commands queue:failed
and queue:retry
. If during command execution something happens (lost connection, etc) you may loose all failed jobs!
You should avoid to use next method (broker does not guarantee operations order, so calling results may be wrong):
\AvtoDev\AmqpRabbitLaravelQueue\Queue::size()
\AvtoDev\AmqpRabbitLaravelQueue\Failed\RabbitQueueFailedJobProvider::count()
Testing
For package testing we use phpunit
framework and docker-ce
+ docker-compose
as develop environment. So, just write into your terminal after repository cloning:
$ make build $ make latest # or 'make lowest' $ make test
Changes log
Changes log can be found here.
Support
If you will find any package errors, please, make an issue in current repository.
License
This is open-sourced software licensed under the MIT License.