solophp / task-queue
Enhanced PHP task queue using Solo Database with retry mechanism and task expiration support.
v2.0.0
2025-08-02 17:59 UTC
Requires (Dev)
- phpunit/phpunit: ^10.0
- squizlabs/php_codesniffer: ^3.7
README
A lightweight PHP task queue built on top of PDO.
Supports scheduled execution, retries, task expiration, indexed task types, automatic deletion of completed tasks, and optional process-level locking via LockGuard
.
๐ฆ Installation
composer require solophp/task-queue
๐ Requirements
- PHP: >= 8.2
- Extensions:
ext-json
- for JSON payload handlingext-pdo
- for database operationsext-posix
- for LockGuard process locking (optional)
This package uses standard PHP extensions and has minimal external dependencies. No external database libraries required - works with any PDO-compatible database (MySQL, PostgreSQL, SQLite, etc.).
โ๏ธ Setup
use Solo\TaskQueue\TaskQueue; $pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password'); $queue = new TaskQueue($pdo, table: 'tasks', maxRetries: 5, deleteOnSuccess: true); $queue->install(); // creates the tasks table if not exists
๐ Usage
Add a task:
$taskId = $queue->addTask( 'email_notification', ['type' => 'email_notification', 'user_id' => 123, 'template' => 'welcome'], new DateTimeImmutable('tomorrow') // optional, defaults to now );
Process all tasks:
$queue->processPendingTasks(function (string $name, array $payload) { match ($name) { 'email_notification' => sendEmail($payload), 'push_notification' => sendPush($payload), default => throw new RuntimeException("Unknown task: $name") }; });
Process only specific type:
$queue->processPendingTasks(function (string $name, array $payload) { sendEmail($payload); }, 20, 'email_notification'); // only tasks where payload_type column = 'email_notification'
๐ Using LockGuard
(optional)
use Solo\TaskQueue\LockGuard; $lockFile = __DIR__ . '/storage/locks/my_worker.lock'; $lock = new LockGuard($lockFile); // path to lock file if (!$lock->acquire()) { exit(0); // Another worker is already running } try { $queue->processPendingTasks(...); } finally { $lock->release(); // Optional, auto-released on shutdown }
๐งฐ Features
- Task Retries โ Configurable max retry attempts before marking as failed
- Task Expiration โ Automatic expiration via
expires_at
timestamp - Indexed Task Types โ Fast filtering by
payload_type
- Row-Level Locking โ Prevents concurrent execution of the same task
- Transactional Safety โ All task operations are executed within a transaction
- Optional Process Locking โ Prevent overlapping workers using
LockGuard
- Optional Deletion on Success โ Set
deleteOnSuccess: true
to automatically delete tasks after success
๐งช API Methods
Method | Description |
---|---|
install() |
Create the tasks table |
addTask(string $name, array $payload, ?DateTimeImmutable $scheduledAt = null, ?DateTimeImmutable $expiresAt = null) |
Add task to the queue (default schedule: now) |
getPendingTasks(int $limit = 10, ?string $onlyType = null) |
Retrieve ready-to-run tasks, optionally filtered by type |
markCompleted(int $taskId) |
Mark task as completed |
markFailed(int $taskId, string $error = '') |
Mark task as failed with error message |
processPendingTasks(callable $callback, int $limit = 10, ?string $onlyType = null) |
Process pending tasks with a custom handler |
๐งช Testing
# Run tests composer test # Run code sniffer composer cs # Fix code style issues composer cs-fix
๐ License
This project is open-sourced under the MIT license.