yiisoft / yii-queue
Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk, SQS and Gearman
Fund package maintenance!
Open Collective
yiisoft
Requires
- php: ^8.0
- psr/container: ^1.0|^2.0
- psr/log: ^2.0|^3.0
- symfony/console: ^5.4|^6.0
- yiisoft/definitions: ^1.0|^2.0|^3.0
- yiisoft/friendly-exception: ^1.0
- yiisoft/injector: ^1.0
Requires (Dev)
- maglnet/composer-require-checker: ^4.2
- phpunit/phpunit: ^9.5
- rector/rector: ^0.18.10
- roave/infection-static-analysis-plugin: ^1.16
- spatie/phpunit-watcher: ^1.23
- vimeo/psalm: ^4.30|^5.8
- yiisoft/test-support: ^3.0
- yiisoft/yii-debug: dev-master
Suggests
- ext-pcntl: Need for process signals
This package is auto-updated.
Last update: 2024-01-09 05:25:09 UTC
README
Yii Queue Extension
An extension for running tasks asynchronously via queues.
Documentation is at docs/guide/README.md.
Installation
The preferred way to install this extension is through composer.
Either run
composer require yiisoft/queue
or add
"yiisoft/queue": "~3.0"
to the require
section of your composer.json
file.
Ready for yiisoft/config
If you are using yiisoft/config, you'll find out this package has some defaults
in the common
and params
configurations saving your time. Things you should
change to start working with the queue:
- Optionally: define default
\Yiisoft\Queue\Adapter\AdapterInterface
implementation. - And/or define channel-specific
AdapterInterface
implementations in thechannel-definitions
params key to be used with the queue factory. - Define message handlers in the
handlers
params key to be used with theQueueWorker
. - Resolve other
\Yiisoft\Queue\Queue
dependencies (psr-compliant event dispatcher).
Differences to yii2-queue
If you have experience with yiisoft/yii2-queue
, you will find out that this package is similar.
Though, there are some key differences which are described in the "migrating from yii2-queue" article.
Basic Usage
Each queue task consists of two parts:
- A message is a class implementing
MessageInterface
. For simple cases you can use the default implementation,Yiisoft\Queue\Message\Message
. For more complex cases you should implement the interface by your own. - A message handler is a callable called by a
Yiisoft\Queue\Worker\Worker
. The handler handles each queue message.
For example, if you need to download and save a file, your message may look like the following:
$data = [ 'url' => $url, 'destinationFile' => $filename, ]; $message = new \Yiisoft\Queue\Message\Message('file-download', $data);
Then you should push it to the queue:
$queue->push($message);
Its handler may look like the following:
class FileDownloader { private string $absolutePath; public function __construct(string $absolutePath) { $this->absolutePath = $absolutePath; } public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void { $fileName = $downloadMessage->getData()['destinationFile']; $path = "$this->absolutePath/$fileName"; file_put_contents($path, file_get_contents($downloadMessage->getData()['url'])); } }
The last thing we should do is to create a configuration for the Yiisoft\Queue\Worker\Worker
:
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']]; $worker = new \Yiisoft\Queue\Worker\Worker( $handlers, // Here it is $logger, $injector, $container );
There is the way to run all the messages that are already in the queue, and then exit:
$queue->run(); // this will execute all the existing messages $queue->run(10); // while this will execute only 10 messages as a maximum before exit
If you don't want your script to exit immediately, you can use the listen
method:
$queue->listen();
You can also check the status of a pushed message (the queue adapter you are using must support this feature):
$queue->push($message); $id = $message->getId(); // Get status of the job $status = $queue->status($id); // Check whether the job is waiting for execution. $status->isWaiting(); // Check whether a worker got the job from the queue and executes it. $status->isReserved(); // Check whether a worker has executed the job. $status->isDone();
Different queue channels
Often we need to push to different queue channels with an only application. There is the QueueFactory
class to make
different Queue
objects creation for different channels. With this factory channel-specific Queue
creation is as
simple as
$queue = $factory->get('channel-name');
The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in
the $definitions
constructor parameter of the factory, where keys are channel names and values are definitions
for the Yiisoft\Factory\Factory
. Below are some examples:
use Yiisoft\Queue\Adapter\SynchronousAdapter; [ 'channel1' => new SynchronousAdapter(), 'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), 'channel3' => [ 'class' => SynchronousAdapter::class, '__constructor' => ['channel' => 'channel3'], ], ]
For more information about a definition formats available see the factory documentation.
Another queue factory usage strategy is implicit adapter creation via withChannel()
method call. To use this approach
you should pass some specific constructor parameters:
true
to the$enableRuntimeChannelDefinition
- a default
AdapterInterface
implementation to the$defaultAdapter
.
In this case $factory->get('channel-name')
call will be converted
to $this->queue->withAdapter($this->defaultAdapter->withChannel($channel))
, when there is no explicit adapter definition
in the $definitions
.
Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes in channel names.
Console execution
The exact way of task execution depends on the adapter used. Most adapters can be run using console commands, which the component automatically registers in your application.
The following command obtains and executes tasks in a loop until the queue is empty:
yii queue:run
The following command launches a daemon which infinitely queries the queue:
yii queue:listen
See the documentation for more details about adapter specific console commands and their options.
The component also has the ability to track the status of a job which was pushed into queue.
For more details see the guide.
Middleware pipelines
Any message pushed to a queue or consumed from it passes through two different middleware pipelines: one pipeline
on message push and another - on message consume. The process is the same as for the HTTP request, but it is executed
twice for a queue message. That means you can add extra functionality on message pushing and consuming with configuration
of the two classes: PushMiddlewareDispatcher
and ConsumeMiddlewareDispatcher
respectively.
You can use any of these formats to define a middleware:
- A ready-to-use middleware object:
new FooMiddleware()
. It must implementMiddlewarePushInterface
,MiddlewareConsumeInterface
orMiddlewareFailureInterface
depending on the place you use it. - An array in the format of yiisoft/definitions. Only if you use yiisoft/definitions and yiisoft/di.
- A
callable
:fn() => // do stuff
,$object->foo(...)
, etc. It will be executed through the yiisoft/injector, so all the dependencies of your callable will be resolved. - A string for your DI container to resolve the middleware, e.g.
FooMiddleware::class
Middleware will be executed forwards in the same order they are defined. If you define it like the following:
[$middleware1, $midleware2]
, the execution will look like this:
graph LR StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue) -.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1] PushMiddleware1[$middleware1] -.-> EndPush((End)) StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle) -.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1] ConsumeMiddleware1[$middleware1] -.-> EndConsume((End))
Push pipeline
When you push a message, you can use middlewares to modify both message and queue adapter.
With message modification you can add extra data, obfuscate data, collect metrics, etc.
With queue adapter modification you can redirect message to another queue, delay message consuming, and so on.
To use this feature you have to create a middleware class, which implements MiddlewarePushInterface
, and
return a modified PushRequest
object from the processPush
method:
return $pushRequest->withMessage($newMessage)->withAdapter($newAdapter);
With push middlewares you can define an adapter object at the runtime, not in the Queue
constructor.
There is a restriction: by the time all middlewares are executed in the forward order, the adapter must be specified
in the PushRequest
object. You will get a AdapterNotConfiguredException
, if it isn't.
You have three places to define push middlewares:
PushMiddlewareDispatcher
. You can pass it either to the constructor, or to thewithMiddlewares()
method, which
creates a completely new dispatcher object with only those middlewares, which are passed as arguments. If you use yiisoft/config, you can add middleware to themiddlewares-push
key of theyiisoft/queue
array in theparams
.- Pass middlewares to either
Queue::withMiddlewares()
orQueue::withMiddlewaresAdded()
methods. The difference is that the former will completely replace an existing middleware stack, while the latter will add passed middlewares to the end of the existing stack. These middlewares will be executed after the common ones, passed directly to thePushMiddlewareDispatcher
. It's useful when defining a queue channel. Both methods return a new instance of theQueue
class. - Put middlewares into the
Queue::push()
method like this:$queue->push($message, ...$middlewares)
. These middlewares have the lowest priority and will be executed after those which are in thePushMiddlewareDispatcher
and the ones passed to theQueue::withMiddlewares()
andQueue::withMiddlewaresAdded()
and only for the message passed along with them.
Consume pipeline
You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Unless Push pipeline, you have only one place to define the middleware stack: in the ConsumeMiddlewareDispatcher
, either in the constructor, or in the withMiddlewares()
method. If you use yiisoft/config, you can add middleware to the middlewares-consume
key of the yiisoft/queue
array in the params
.
Error handling pipeline
Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in yiisoft/queue
with Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any Throwable
. The key differences from the previous two pipelines:
- You should set up the middleware pipeline separately for each queue channel. That means, the format should be
['channel-name' => [FooMiddleware::class]]
instead of[FooMiddleware::class]
, like for the other two pipelines. There is also a default key, which will be used for those channels without their own one:FailureMiddlewareDispatcher::DEFAULT_PIPELINE
. - The last middleware will throw the exception, which will come with the
FailureHandlingRequest
object. If you don't want the exception to be thrown, your middlewares shouldreturn
a request without calling$handler->handleFailure()
.
You can declare error handling middleware pipeline in the FailureMiddlewareDispatcher
, either in the constructor, or in the withMiddlewares()
method. If you use yiisoft/config, you can add middleware to the middlewares-fail
key of the yiisoft/queue
array in the params
.
See error handling docs for details.
Extra
Unit testing
The package is tested with PHPUnit. To run tests:
./vendor/bin/phpunit
Mutation testing
The package tests are checked with Infection mutation framework. To run it:
./vendor/bin/infection
Static analysis
The code is statically analyzed with Psalm. To run static analysis:
./vendor/bin/psalm
Support the project
Follow updates
License
The Yii Queue Extension is free software. It is released under the terms of the BSD License.
Please see LICENSE
for more information.
Maintained by Yii Software.