bricev / jobqueue
JobQueue is a PHP component that queues `tasks` defining `jobs` which can be executed by asynchronous and/or distributed `workers`
Installs: 2 423
Dependents: 0
Suggesters: 0
Security: 0
Stars: 34
Watchers: 7
Forks: 10
Open Issues: 1
Requires
- php: ^7.1
- http-interop/response-sender: ^1.0
- middlewares/error-handler: ^1.0
- middlewares/fast-route: ^1.0
- middlewares/payload: ^1.0
- middlewares/request-handler: ^1.0
- predis/predis: ^1.0
- psr/log: ^1.0
- ramsey/uuid: ^3.0
- relay/relay: ^2.0
- symfony/config: ^4.0
- symfony/console: ^4.0
- symfony/dependency-injection: ^4.0
- symfony/event-dispatcher: ^4.0
- symfony/yaml: ^4.0
- vlucas/phpdotenv: ^2.0
- zendframework/zend-diactoros: ^1.0
Requires (Dev)
- guzzlehttp/guzzle: ^6.1
- phpunit/phpunit: ^6
- symfony/process: ^4.0
README
Simple Job/Queue PHP component to help applications distribute Tasks through multiple workers.
Install
This package is installable and auto-loadable via Composer:
$ composer require bricev/jobqueue
The application proposes a webservice and two CLIs. Read bellow for more documentation.
Configuration
- PHP 7.1 must be installed
- Redis must be installed (for queue data persistence)
- the
JOBQUEUE_ENV
environment variable may be set asdev
,prod
(or any string, default if not set:dev
) - the
JOBQUEUE_REDIS_DSN
environment variable must define the Redis DSN (eg. 'tcp://127.0.0.1:6379')
Usage
Defining a job
A job
holds the code that will be executed for tasks by a worker.
Each job must be defined in a PHP class that implements the ExecutableJob
interface:
<?php use JobQueue\Domain\Job\ExecutableJob; use JobQueue\Domain\Task\Task; use Psr\Log\LoggerAwareTrait; final class DummyJob implements ExecutableJob { use LoggerAwareTrait; /** * * @param Task $task */ function setUp(Task $task) { // This is called before the `perform()` method /** @todo prepare the job execution */ } /** * * @param Task $task */ function perform(Task $task) { /** @todo do the job! */ } /** * * @param Task $task */ function tearDown(Task $task) { // This is called after the `perform()` method /** @todo clean up after the job execution */ } }
Note: ExecutableJob
interface extends the LoggerAwareInterface
that can be used to set a logger.
This documentation provides more information about PSR-3 Log interfaces.
Tip: Package psr/log
(repo) can be added to a composer project by running this command:
composer require psr/log
The \Psr\Log\LoggerAwareTrait
can be used to easily add a logger setter and be compliant with the LoggerAwareInterface
.
Defining a task
Creating a task
requires the following elements:
- a
profile
- it can be anything, it regroups tasks into queue partitions, so that the JobQueueworker
app can consume tasks from one (only) profile - a
job
- which holds the code that has to be executed for the task - optional
parameters
that can be used by the job during its execution
To define a task in PHP:
<?php use JobQueue\Domain\Task\Profile; use JobQueue\Domain\Task\Task; use JobQueue\Tests\Domain\Job\DummyJob; $task = new Task( new Profile('foobar'), new DummyJob, [ 'foo' => 'bar', // [...] ] );
Adding tasks to the queue
First, the queue has to be instantiated.
This can be done manually:
<?php use JobQueue\Infrastructure\RedisQueue; use Predis\Client; $predis = new Client('tcp://localhost:6379'); $queue = new RedisQueue($predis);
Or by using the ServiceContainer (this requires the proper configuration, see Configuration
section above):
<?php use JobQueue\Infrastructure\ServiceContainer; $queue = ServiceContainer::getInstance()->queue;
Then, tasks can be enqueued easily:
<?php /** @var \JobQueue\Domain\Task\Queue $queue */ /** @var \JobQueue\Domain\Task\Task $task */ $queue->add($task);
The task's job will be executed as soon as a worker starts consuming the task's profile.
This component embeds a PHP executable worker.
See the CLI
section to get more details about its usage.
Worker events
The worker emits some events that can be listened to:
To intercept an event, you can use the EventDispatcher
from the service container:
<?php use JobQueue\Infrastructure\ServiceContainer; $dispatcher = ServiceContainer::getInstance()->dispatcher; $dispatcher->addListener('task.failed', function ($event) { /** @var \JobQueue\Domain\Task\TaskHasFailed $event */ $task = $event->getTask(); // Do something... });
CLI
Those features require the proper configuration, see Configuration
section above.
The manager
app can be used to perform CRUD operations on tasks.
Usage:
$ bin/manager list # lists all commands $ bin/manager {command} --help # display the command help
The worker
app can be used to consume enqueued tasks.
Usage:
$ bin/worker --help
The worker
app can be used as an OS service (eg. upstart, systemd... on unix) to run on servers.
Webservice
Configuration
A web server should be configured to serve public/index.php
as a router script.
This feature requires the proper configuration, see Configuration
section above.
API
List all tasks:
GET /tasks
profile: string (a profile name that filters tasks)
status: waiting|running|finished|failed (a status that filters tasks)
order: date|profile|status (sort order, default: status)
Returns an array of tasks:
HTTP/1.1 200 Ok
Content-Type: application/json
[
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
]
Errors:
400 Bad Request
if one of the parameters is wrong:Status "foo" does not exists
is status is not equal towaiting|running|finished|failed
Profile name only allows lowercase alphanumerical, dash and underscore characters
is profile is malformedImpossible to order by "foobar"
is order is not equal todate|profile|status
500 Internal Server Error
in case of a technical error
Get a task information:
GET /task/{identifier}
Returns the task definition:
HTTP/1.1 200 Ok
Content-Type: application/json
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
Errors:
404 Not Found
if no task correspond to the identifier500 Internal Server Error
in case of a technical error
Create a new task:
POST /tasks
{
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
Returns the task definition:
HTTP/1.1 201 Created
Content-Type: application/json
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
Errors:
400 Bad Request
if the JSON body is malformed:Missing job
if the JSON object miss ajob
value.Missing profile
if the JSON object miss aprofile
value.Profile name only allows lowercase alphanumerical, dash and underscore characters
is profile is malformedMalformed parameters
is theparameters
value from the JSON object is not a key/value array
500 Internal Server Error
in case of a technical error
Tests
First, a Redis server must run locally (127.0.0.1 on 6379 port).
Then, to run tests, use the following command:
$ php vendor/bin/phpunit
The MIT License (MIT). Please see LICENSE for more information.