avto-dev / amqp-rabbit-manager
RabbitMQ manager
Installs: 26 141
Dependents: 1
Suggesters: 0
Security: 0
Stars: 2
Watchers: 3
Forks: 5
Open Issues: 0
Requires
- php: ^8.1
- ext-amqp: *
- enqueue/amqp-ext: ^0.10.19
- illuminate/console: ~10.0 || ~11.0
- illuminate/events: ~10.0 || ~11.0
- illuminate/support: ~10.0 || ~11.0
- queue-interop/queue-interop: ^0.8
- symfony/console: ~6.0 || ~7.0
Requires (Dev)
- laravel/laravel: ~10.0 || ~11.0
- mockery/mockery: ^1.6
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^10.5
README
RabbitMQ manager for Laravel applications
This package can be used for easy access to the RabbitMQ entities like connections or queues.
Installed php extension
ext-amqp
is required. Installation steps can be found in Dockerfile.
Install
Require this package with composer using the following command:
$ composer require avto-dev/amqp-rabbit-manager "^2.0"
Installed
composer
is required (how to install composer).
You need to fix the major version of package.
After that you should "publish" package configuration file using next command:
$ php ./artisan vendor:publish --provider='AvtoDev\AmqpRabbitManager\ServiceProvider'
And configure it in the file ./config/rabbitmq.php
.
Usage
At first you should execute command rabbit:setup
for creating all queues and exchanges on RabbitMQ server.
Then, in any part of your application you can resolve connection or queue/exchange factories. For example, in artisan command:
<?php namespace App\Console\Commands; use AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface; class SomeCommand extends \Illuminate\Console\Command { /** * The console command name. * * @var string */ protected $name = 'some:command'; /** * Execute the console command. * * @param ConnectionsFactoryInterface $connections * * @return void */ public function handle(ConnectionsFactoryInterface $connections): void { $connections->default(); // Get the default RabbitMQ connection instance } }
Create queue manually
Declare queue operation creates a queue on a broker side (use command rabbit:setup
instead this):
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $exchange = $connections ->default() ->declareQueue($queues->make('some-queue-id'));
Create exchange manually
Declare exchange operation creates a topic on a broker side (use command rabbit:setup
instead this):
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */ $exchange = $connections ->default() ->declareTopic($exchanges->make('some-exchange-id'));
Bind queue to exchange
Connects a queue to the exchange. So messages from that topic comes to the queue and could be processed (use command rabbit:setup
events \AvtoDev\AmqpRabbitManager\Commands\Events\*
instead this):
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ /** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */ $connections ->default() ->bind(new \Interop\Amqp\Impl\AmqpBind( $exchanges->make('some-exchange-id'), $queues->make('some-queue-id') ));
Send message to exchange
Create message and them to the exchange:
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->send($exchanges->make('some-exchange-id'), $message);
Send message to queue
Create message and them to the queue:
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->send($queues->make('some-queue-id'), $message);
Send priority message
Messages priority uses for messages ordering:
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->setPriority(10) // ... ->send($queues->make('some-queue-id'), $message);
Send expiration message
Also known as message TTL:
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->setTimeToLive(60000) // 60 sec // ... ->send($queues->make('some-queue-id'), $message);
Send delayed message
You should avoid to use enqueue/amqp-tools
delay strategies, if you can. If you makes it manually - you have full control under it.
Get (consume) single message
Get one message and continue script execution:
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $consumer = $connections->default()->createConsumer($queues->make('some-queue-id')); $message = $consumer->receive(); try { // .. process a message .. $consumer->acknowledge($message); } catch (\Exception $e) { // .. process exception .. $consumer->reject($message); }
Subscription consumer
Start (nearly) infinity loop for messages processing (you can start more then one consumer in a one time, just call ``):
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $queue = $queues->make('some-queue-id'); $consumer = $connection->createConsumer($queue); $subscriber = $connection->createSubscriptionConsumer(); $subscriber->subscribe( $consumer, function(\Interop\Amqp\AmqpMessage $message, \Enqueue\AmqpExt\AmqpConsumer $consumer): bool { try { // .. process a message .. $consumer->acknowledge($message); } catch (\Exception $e) { // .. process exception .. $consumer->reject($message); return false; // Subscription will be cancelled } return true; // Subscription will be continued } ); $subscriber->consume(); // You can pass timeout in milliseconds
Purge queue messages
Remove all messages in queue:
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $connection->purgeQueue($queues->make('some-queue-id'));
Testing
For package testing we use phpunit
framework and docker-ce
+ docker-compose
as develop environment. So, just write into your terminal after repository cloning:
$ make build $ make latest # or 'make lowest' $ make test
Changes log
Changes log can be found here.
Support
If you will find any package errors, please, make an issue in current repository.
License
This is open-sourced software licensed under the MIT License.