avto-dev/amqp-rabbit-manager

RabbitMQ manager

v2.10.0 2024-04-25 11:14 UTC

This package is auto-updated.

Last update: 2024-12-25 12:40:18 UTC


README

Laravel

RabbitMQ manager for Laravel applications

Version PHP Version Build Status Coverage Downloads count License

This package can be used for easy access to the RabbitMQ entities like connections or queues.

Installed php extension ext-amqp is required. Installation steps can be found in Dockerfile.

Install

Require this package with composer using the following command:

$ composer require avto-dev/amqp-rabbit-manager "^2.0"

Installed composer is required (how to install composer).

You need to fix the major version of package.

After that you should "publish" package configuration file using next command:

$ php ./artisan vendor:publish --provider='AvtoDev\AmqpRabbitManager\ServiceProvider'

And configure it in the file ./config/rabbitmq.php.

Usage

At first you should execute command rabbit:setup for creating all queues and exchanges on RabbitMQ server.

Then, in any part of your application you can resolve connection or queue/exchange factories. For example, in artisan command:

<?php

namespace App\Console\Commands;

use AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface;

class SomeCommand extends \Illuminate\Console\Command
{
    /**
     * The console command name.
     *
     * @var string
     */
    protected $name = 'some:command';

    /**
     * Execute the console command.
     *
     * @param ConnectionsFactoryInterface $connections
     *
     * @return void
     */
    public function handle(ConnectionsFactoryInterface $connections): void
    {
        $connections->default(); // Get the default RabbitMQ connection instance
    }
}

Create queue manually

Declare queue operation creates a queue on a broker side (use command rabbit:setup instead this):

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$exchange = $connections
    ->default()
    ->declareQueue($queues->make('some-queue-id'));

Create exchange manually

Declare exchange operation creates a topic on a broker side (use command rabbit:setup instead this):

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */

$exchange = $connections
    ->default()
    ->declareTopic($exchanges->make('some-exchange-id'));

Bind queue to exchange

Connects a queue to the exchange. So messages from that topic comes to the queue and could be processed (use command rabbit:setup events \AvtoDev\AmqpRabbitManager\Commands\Events\* instead this):

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */

$connections
    ->default()
    ->bind(new \Interop\Amqp\Impl\AmqpBind(
        $exchanges->make('some-exchange-id'),
        $queues->make('some-queue-id')
    ));

Send message to exchange

Create message and them to the exchange:

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->send($exchanges->make('some-exchange-id'), $message);

Send message to queue

Create message and them to the queue:

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->send($queues->make('some-queue-id'), $message);

Send priority message

Messages priority uses for messages ordering:

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->setPriority(10)
    // ...
    ->send($queues->make('some-queue-id'), $message);

Send expiration message

Also known as message TTL:

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->setTimeToLive(60000) // 60 sec
    // ...
    ->send($queues->make('some-queue-id'), $message);

Send delayed message

You should avoid to use enqueue/amqp-tools delay strategies, if you can. If you makes it manually - you have full control under it.

Get (consume) single message

Get one message and continue script execution:

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$consumer = $connections->default()->createConsumer($queues->make('some-queue-id'));

$message = $consumer->receive();

try {
    // .. process a message ..

    $consumer->acknowledge($message);
} catch (\Exception $e) {
    // .. process exception ..

    $consumer->reject($message);
}

Subscription consumer

Start (nearly) infinity loop for messages processing (you can start more then one consumer in a one time, just call ``):

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$queue      = $queues->make('some-queue-id');
$consumer   = $connection->createConsumer($queue);
$subscriber = $connection->createSubscriptionConsumer();

$subscriber->subscribe(
    $consumer,
    function(\Interop\Amqp\AmqpMessage $message, \Enqueue\AmqpExt\AmqpConsumer $consumer): bool {
        try {
            // .. process a message ..

            $consumer->acknowledge($message);
        } catch (\Exception $e) {
            // .. process exception ..

            $consumer->reject($message);

            return false; // Subscription will be cancelled
        }

        return true; // Subscription will be continued
    }
);

$subscriber->consume(); // You can pass timeout in milliseconds

Purge queue messages

Remove all messages in queue:

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();

$connection->purgeQueue($queues->make('some-queue-id'));

Testing

For package testing we use phpunit framework and docker-ce + docker-compose as develop environment. So, just write into your terminal after repository cloning:

$ make build
$ make latest # or 'make lowest'
$ make test

Changes log

Release date Commits since latest release

Changes log can be found here.

Support

Issues Issues

If you will find any package errors, please, make an issue in current repository.

License

This is open-sourced software licensed under the MIT License.