salesrender / plugin-component-queue
SalesRender plugin queue abstract component
Installs: 1 014
Dependents: 3
Suggesters: 0
Security: 0
Stars: 0
Watchers: 2
Forks: 0
Open Issues: 0
pkg:composer/salesrender/plugin-component-queue
Requires
- php: >=7.4.0
- ext-json: *
- khill/php-duration: ^1.1
- salesrender/plugin-component-db: ^0.3.8
- symfony/console: ^5.3
- symfony/process: ^5.3
- xakepehok/path: ^0.2.1
README
Abstract queue component for the SalesRender plugin ecosystem. Provides a task queue with retry logic, built on top of Symfony Console and Symfony Process. Tasks are persisted in the database via plugin-component-db and executed as background child processes through CLI commands.
Installation
composer require salesrender/plugin-component-queue
Requirements
| Requirement | Version |
|---|---|
| PHP | >= 7.4.0 |
| ext-json | * |
| symfony/console | ^5.3 |
| symfony/process | ^5.3 |
| salesrender/plugin-component-db | ^0.3.8 |
| xakepehok/path | ^0.2.1 |
| khill/php-duration | ^1.1 |
Architecture overview
The queue operates as a two-command pattern:
- QueueCommand -- a long-running daemon that polls the database for pending tasks and spawns background child processes (one per task) via
symfony/process. - QueueHandleCommand -- invoked by the child process to load a single task by ID and execute the actual business logic.
Tasks are persisted as database models (extending Task) with built-in retry tracking (TaskAttempt). The queue command runs in a loop until memory usage exceeds the configured limit.
Key classes
TaskAttempt
Tracks retry state for a task.
Namespace: SalesRender\Plugin\Components\Queue\Models\Task
| Method | Signature | Description |
|---|---|---|
__construct |
__construct(int $limit, int $interval) |
Create attempt tracker with max retries and interval (seconds) between attempts |
getLastTime |
getLastTime(): ?int |
Unix timestamp of last attempt, or null if never attempted |
getNumber |
getNumber(): int |
Current attempt number (starts at 0) |
getLimit |
getLimit(): int |
Maximum number of attempts allowed |
setLimit |
setLimit(int $limit): void |
Override the attempt limit |
getInterval |
getInterval(): int |
Interval in seconds between retries |
setInterval |
setInterval(int $interval): void |
Override the retry interval |
getLog |
getLog(): string |
Log message from the last attempt |
attempt |
attempt(string $log): void |
Record an attempt: increments number, sets lastTime to time(), saves log |
isSpent |
isSpent(): bool |
Returns true when all attempts have been exhausted (number >= limit) |
Task (abstract)
Abstract database model representing a queued task. Extends Model from plugin-component-db.
Namespace: SalesRender\Plugin\Components\Queue\Models\Task
| Method | Signature | Description |
|---|---|---|
__construct |
__construct(TaskAttempt $attempt) |
Generates UUID, captures current PluginReference from Connector, sets createdAt |
getPluginReference |
getPluginReference(): ?PluginReference |
Returns the plugin reference (companyId, alias, pluginId) or null |
schema |
static schema(): array |
Database schema with fields: companyId, pluginAlias, pluginId, createdAt, attemptLastTime, attemptNumber, attemptLimit, attemptInterval, attemptLog |
Inherited from Model: save(), delete(), findById(), findByCondition(), freeUpMemory(), tableName().
QueueCommand (abstract)
Long-running daemon command that polls for tasks and spawns child processes.
Namespace: SalesRender\Plugin\Components\Queue\Commands
| Method | Signature | Description |
|---|---|---|
__construct |
__construct(string $name, int $limit, int $maxMemoryInMb = 25) |
Sets command name to {name}:queue, concurrency limit, and max memory |
findModels |
abstract findModels(): ModelInterface[] |
Must be implemented. Return array of task models ready to be processed |
handleQueue |
handleQueue(ModelInterface $model): bool |
Spawns a child process: php console.php {name}:handle {id} |
startedLog |
startedLog(ModelInterface $model, OutputInterface $output): void |
Logs "Process started" message; override for custom logging |
execute |
execute(InputInterface $input, OutputInterface $output): int |
Main loop: mutex lock, poll for models, spawn processes, until memory limit |
CLI option: --disable-mutex (-dm) -- disables file-based mutex that prevents duplicate instances.
Environment variable: LV_PLUGIN_PHP_BINARY -- path to the PHP binary used to spawn child processes.
QueueHandleCommand (abstract)
Command invoked by child processes to handle a single task.
Namespace: SalesRender\Plugin\Components\Queue\Commands
| Method | Signature | Description |
|---|---|---|
__construct |
__construct(string $name) |
Sets command name to {name}:handle |
The execute method must be implemented in subclasses. Receives id as a required argument ($input->getArgument('id')).
Usage
1. Define a Task model
Create a concrete task class that extends Task. Pass a TaskAttempt with the desired retry limit and interval.
From plugin-logistic-eushipments -- a simple task with 100 retries and 600-second interval:
use SalesRender\Plugin\Components\Queue\Models\Task\Task; use SalesRender\Plugin\Components\Queue\Models\Task\TaskAttempt; final class BindingSyncTask extends Task { public function __construct() { parent::__construct(new TaskAttempt(100, 600)); } public function getAttempt(): TaskAttempt { return $this->attempt; } }
From plugin-core-chat -- a task carrying additional data (Chat object), with custom serialization:
use SalesRender\Plugin\Components\Queue\Models\Task\Task; use SalesRender\Plugin\Components\Queue\Models\Task\TaskAttempt; class ChatSendTask extends Task { protected Chat $chat; public function __construct(Chat $chat) { parent::__construct(new TaskAttempt(100, 10)); $this->chat = $chat; } public function getChat(): Chat { return $this->chat; } public function getAttempt(): TaskAttempt { return $this->attempt; } protected static function beforeWrite(array $data): array { $data = parent::beforeWrite($data); $data['chat'] = json_encode($data['chat']); return $data; } protected static function afterRead(array $data): array { $data = parent::afterRead($data); $data['chat'] = Chat::parseFromArray(json_decode($data['chat'], true)); return $data; } public static function schema(): array { return array_merge(parent::schema(), [ 'chat' => ['TEXT', 'NOT NULL'], ]); } }
2. Implement QueueCommand
Override findModels() to query pending tasks from the database. Use Medoo conditions to filter by attemptLastTime and attemptInterval so that tasks are retried only after the interval has elapsed.
From plugin-core-chat:
use SalesRender\Plugin\Components\Queue\Commands\QueueCommand; use Medoo\Medoo; class ChatSendQueueCommand extends QueueCommand { public function __construct() { parent::__construct( 'chatSendQueue', $_ENV['LV_PLUGIN_CHAT_SEND_QUEUE_LIMIT'] ?? 100, 25 ); } protected function findModels(): array { ChatSendTask::freeUpMemory(); $condition = [ 'OR' => [ 'attemptLastTime' => null, 'attemptLastTime[<=]' => Medoo::raw('(:time - <attemptInterval>)', [':time' => time()]), ], "ORDER" => ["createdAt" => "ASC"], 'LIMIT' => $this->limit ]; $processes = array_keys($this->processes); if (!empty($processes)) { $condition['id[!]'] = $processes; } return ChatSendTask::findByCondition($condition); } }
3. Implement QueueHandleCommand
Override execute() to load the task by ID, perform the business logic, and manage retry/deletion.
From plugin-core-chat:
use SalesRender\Plugin\Components\Db\Components\Connector; use SalesRender\Plugin\Components\Queue\Commands\QueueHandleCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class ChatSendQueueHandleCommand extends QueueHandleCommand { public function __construct() { parent::__construct('chatSendQueue'); } protected function execute(InputInterface $input, OutputInterface $output) { /** @var ChatSendTask $task */ $task = ChatSendTask::findById($input->getArgument('id')); if (is_null($task)) { $output->writeln("<error>Task with passed id was not found</error>"); return Command::INVALID; } if ($task->getPluginReference()) { Connector::setReference($task->getPluginReference()); } try { // Execute business logic... $task->delete(); return Command::SUCCESS; } catch (Throwable $throwable) { $output->writeln("<error>{$throwable->getMessage()}</error>"); $task->getAttempt()->attempt($throwable->getMessage()); } if ($task->getAttempt()->isSpent()) { $task->delete(); } else { $task->save(); } return Command::FAILURE; } }
4. Register commands
Register both commands in your ConsoleAppFactory and add a cron task for the queue command.
From plugin-core-chat:
use Symfony\Component\Console\Application; use SalesRender\Plugin\Core\Commands\CronCommand; class ConsoleAppFactory extends \SalesRender\Plugin\Core\Factories\ConsoleAppFactory { public function build(): Application { $this->app->add(new ChatSendQueueCommand()); $this->app->add(new ChatSendQueueHandleCommand()); CronCommand::addTask( '* * * * * ' . PHP_BINARY . ' ' . Path::root()->down('console.php') . ' chatSendQueue:queue' ); return parent::build(); } }
5. Enqueue a task
Create a task instance and save it:
$task = new ChatSendTask($chat); $task->save();
Configuration
| Environment variable | Description |
|---|---|
LV_PLUGIN_PHP_BINARY |
Path to the PHP binary. Used by QueueCommand to spawn child processes via symfony/process. |
The queue concurrency limit and memory limit are set in the QueueCommand constructor. Common pattern is to read the limit from an environment variable with a fallback default.
Retry logic
The retry mechanism is built into the TaskAttempt class:
- When a handle command catches an error, it calls
$task->getAttempt()->attempt($errorMessage)-- this increments the attempt counter and records the timestamp. - On next poll cycle,
QueueCommand::findModels()checksattemptLastTime[<=] (now - attemptInterval)-- so the task is not retried until the interval has elapsed. - After processing,
$task->getAttempt()->isSpent()returnstruewhen the number of attempts reaches the limit. If spent, the task is deleted; otherwise it is saved for a future retry.
API reference
Task database schema
The base Task::schema() defines the following columns (in addition to id from Model):
| Column | Type | Notes |
|---|---|---|
companyId |
VARCHAR(255) |
Nullable; set from Connector::getReference() |
pluginAlias |
VARCHAR(255) |
Nullable; set from Connector::getReference() |
pluginId |
VARCHAR(255) |
Nullable; set from Connector::getReference() |
createdAt |
INT NOT NULL |
Unix timestamp of task creation |
attemptLastTime |
INT |
Nullable; unix timestamp of last attempt |
attemptNumber |
INT NOT NULL |
Current attempt number |
attemptLimit |
INT NOT NULL |
Maximum attempts allowed |
attemptInterval |
INT NOT NULL |
Seconds between retries |
attemptLog |
VARCHAR(500) |
Log message from last attempt |
Subclasses should override schema() with array_merge(parent::schema(), [...]) to add custom fields.
Command naming convention
For a queue named "foo":
- Queue daemon command:
foo:queue - Handle command:
foo:handle
Dependencies
| Package | Purpose |
|---|---|
| salesrender/plugin-component-db | Database ORM: Model, Connector, PluginReference, ReflectionHelper, UuidHelper |
| symfony/console | CLI command framework |
| symfony/process | Spawning child processes for task handling |
| xakepehok/path | Path helper for building file paths (Path::root()) |
| khill/php-duration | Human-readable duration formatting for uptime display |
See also
- salesrender/plugin-component-db -- database layer (Model, Connector)
- salesrender/plugin-component-batch -- batch processing built on top of this queue component
- salesrender/plugin-component-special-request -- HTTP special request dispatcher built on top of this queue component
- salesrender/plugin-core -- core plugin framework with
ConsoleAppFactory