lamoda / queue-bundle
Symfony queue bundle
Installs: 252
Dependents: 2
Suggesters: 0
Security: 0
Stars: 11
Watchers: 17
Forks: 5
Open Issues: 0
Requires
- php: >=7.2.0
- ext-json: *
- ext-pdo: *
- doctrine/doctrine-bundle: ^1.9 || ^2.0
- doctrine/orm: ~2.3
- jms/serializer-bundle: ^2.4 || ^3.0
- php-amqplib/php-amqplib: ~3.0
- php-amqplib/rabbitmq-bundle: ~2.9.0
- symfony/config: ^4.1 || ^5.0
- symfony/console: ^4.2 || ^5.0
- symfony/dependency-injection: ^4.2 || ^5.0
- symfony/event-dispatcher: ^4.3 || ^5.0
- symfony/framework-bundle: ^4.3
- symfony/monolog-bundle: ^3.3
Requires (Dev)
- codeception/codeception: ^4.1
- codeception/module-asserts: ^1.3
- friendsofphp/php-cs-fixer: ^2.13
- phpunit/phpunit: ^8.0
- dev-master
- 3.0.0
- 2.2.2
- 2.2.1
- 2.2.0
- 2.1.1
- 2.1.0
- 2.0.1
- 2.0.0
- 1.2.0
- 1.1.0
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- dev-log-for-queue-creating
- dev-feature/SCENTRE-6712-low-log-level
- dev-feature/upgrade-php-amqplib-to-v2.9
- dev-feature/SCENTRE-6184-republish-queue-in-initial-status
- dev-feature/SCENTRE-5889_extend_retry_sending_time
- dev-php74-symfony5
- dev-feature/SCENTRE-5818
- dev-feature/SCENTRE-5744
This package is auto-updated.
Last update: 2023-06-29 01:22:25 UTC
README
Symfony bundle for convenient work with queues. Currently it supports RabbitMQ.
Installation
-
Install bundle
composer require lamoda/queue-bundle
-
Extend
Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass
use Doctrine\ORM\Mapping as ORM; use Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass; /** * @ORM\Entity(repositoryClass="Lamoda\QueueBundle\Entity\QueueRepository") */ class Queue extends QueueEntityMappedSuperclass { }
-
Configure bundle parameters
lamoda_queue: ## required entity_class: App\Entity\Queue max_attempts: 5 batch_size_per_requeue: 5 batch_size_per_republish: 5 ## optional (will use for default delay Geometric Progression Strategy) strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
Register bundle
class AppKernel extends Kernel { // ... public function registerBundles() { $bundles = [ // ... new Lamoda\QueueBundle\LamodaQueueBundle(), // ... ]; return $bundles; } // ... }
or add to
config/bundles.php
return [ // ... Lamoda\QueueBundle\LamodaQueueBundle::class => ['all' => true], // ... ];
-
Migrate schema
doctrine:migrations:diff
to create migration forqueue
tabledoctrine:migrations:migrate
- apply the migration
Setup
Create new exchange
-
Define new exchange constant
namespace App\Constant; class Exchanges { public const DEFAULT = 'default'; }
-
Add new node to
old_sound_rabbit_mq.producers
with previous defined constant name, example:old_sound_rabbit_mq: producers: default: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct"
Create new queue
-
Define new queue constant
namespace App\Constant; class Queues { public const NOTIFICATION = 'notification'; }
-
Register consumer for queue in
old_sound_rabbit_mq.consumers
with previous defined constant name, example:old_sound_rabbit_mq: consumers: notification: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct" queue_options: name: !php/const App\Constant\Queues::NOTIFICATION routing_keys: - !php/const App\Constant\Queues::NOTIFICATION callback: "lamoda_queue.consumer"
-
Create job class, extend
AbstractJob
by example:namespace App\Job; use App\Constant\Exchanges; use App\Constant\Queues; use Lamoda\QueueBundle\Job\AbstractJob; use JMS\Serializer\Annotation as JMS; class SendNotificationJob extends AbstractJob { /** * @var string * * @JMS\Type("int") */ private $message; public function __construct(string $message) { $this->message = $message; } public function getDefaultQueue(): string { return Queues::NOTIFICATION; } public function getDefaultExchange(): string { return Exchanges::DEFAULT; } }
-
Create job handler, implement HandlerInterface by example:
namespace App\Handler; use Lamoda\QueueBundle\Handler\HandlerInterface; use Lamoda\QueueBundle\QueueInterface; class SendNotificationHandler implements HandlerInterface { public function handle(QueueInterface $job): void { // implement service logic here } }
-
Tag handler at service container
services: App\Handler\SendNotificationHandler: public: true tags: - { name: queue.handler, handle: App\Job\SendNotificationJob }
-
Configure delay strategy parameters (optional)
lamoda_queue: ... queues: queue_one: 'delay_arithmetic_progression' queue_two: 'delay_special_geometric_progression' # Settings of special behaviors for Delay strategies (optional) services: lamoda_queue.strategy.delay.arithmetic_progression: class: Lamoda\QueueBundle\Strategy\Delay\ArithmeticProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_arithmetic_progression' } arguments: - 60 # start_interval_sec parameter - 1700 # multiplier parameter lamoda_queue.strategy.delay.geometric_progression: class: Lamoda\QueueBundle\Strategy\Delay\GeometricProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_special_geometric_progression' } arguments: - 70 # start_interval_sec parameter - 4 # multiplier parameter
In this block, you can config special delay behaviors for each queue. For this, you have to register new services that use one of several base strategies (ArithmeticProgressionStrategy, GeometricProgressionStrategy) or yours (for this you have to make Service Class that implements DelayStrategyInterface).
Each strategy service has to have a tag with name
lamoda_queue_strategy
and uniquekey
. After this, you can use thesekeys
for matching with queues inlamoda_queue.queues
section.By default, use GeometricProgressionStrategy with params (their you can customize in
lamoda_queue
config section):strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
Add queue name in "codeception.yml" at
modules.config.AMQP.queues
-
Execute
./bin/console queue:init
command
Usage
Init exchange and queues
./bin/console queue:init
Add job to queue
$job = new SendNotificationJob($id); $container->get(Lamoda\QueueBundle\Factory\PublisherFactory::class)->publish($job);
Run queued job
./bin/console queue:consume notification
Requeue failed queues
./bin/console queue:requeue
Advanced usage
You can queue any primitive class, just implement QueueInterface
:
namespace App\Process; use Lamoda\QueueBundle\Entity\QueueInterface; class MyProcess implements QueueInterface { // implement interface functions }
services: App\Handler\MyProcessHandler: public: true tags: - { name: queue.handler, handle: App\Process\MyProcess }
$process = new MyProcess(); $container->get('queue.publisher')->publish($process);
How to rerun queues
If you want to rerun queue, throw Lamoda\QueueBundle\Exception\RuntimeException
.
If you want mark queue as failed, throw any another kind of exception.
namespace App\Handler; use Lamoda\QueueBundle\Handler\HandlerInterface; use Lamoda\QueueBundle\QueueInterface; class SendNotificationHandler implements HandlerInterface { public function handle(QueueInterface $job): void { // implement service logic here // Rerun queue if ($rerun === true) { throw new Lamoda\QueueBundle\Exception\RuntimeException('Error message'); } // Mark queue as failed if ($failed === true) { throw new \Exception(); } } }
By default delay time is calculated exponentially. You can affect it through configuration.
lamoda_queue: ## required ## ... max_attempts: 5 ## optional strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
Events
Lamoda\QueueBundle\Event\QueueAttemptsReachedEvent
When consumer wants to execute reached maximum attempts queue.
Properties:
- Queue Entity
QueueAttemptsReachedEvent::getQueue()
Development
PHP Coding Standards Fixer
make php-cs-check make php-cs-fix
Tests
Unit
make test-unit