monkeyscloud / monkeyslegion-queue
A modular queue system featuring multiple storage drivers, scalable workers, job batching, chaining, rate limiting, events, and an intuitive CLI for managing jobs.
Package info
github.com/MonkeysCloud/MonkeysLegion-Queue
pkg:composer/monkeyscloud/monkeyslegion-queue
Requires
Requires (Dev)
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^12.2
- squizlabs/php_codesniffer: ^4.0
This package is auto-updated.
Last update: 2026-04-28 04:33:41 UTC
README
A production-grade, driver-agnostic queue system for PHP 8.4+ with automatic retries, job chaining & batching, rate limiting, lifecycle events, a built-in web dashboard, and a comprehensive CLI toolkit.
What's New in 2.0
| Area | v1 | v2 |
|---|---|---|
| Job contract | Plain classes with handle() |
DispatchableJobInterface — typed, explicit |
| Serialization | Manual payload arrays | JobSerializer trait — auto-extracts constructor params, freezes objects |
| Worker DI | All dependencies passed manually | ContainerAware trait — auto-resolves QueueEventDispatcher, RateLimiterInterface, BatchRepository, MonkeysLoggerInterface |
| Rate limiting | Concrete RateLimiter only |
RateLimiterInterface contract — pluggable implementations |
| Dashboard | ❌ | Built-in web UI with QueueDashboardProvider auto-discovery |
| Provider system | ❌ | #[Provider] attribute + composer.json auto-registration |
| Configuration | PHP array | .mlc config format (env-driven) |
| Dispatcher | Returns void |
Returns Batch from batch()->dispatch() for progress tracking |
Features
✨ Multiple Queue Drivers — Redis (production), Database (production), Null (testing)
🔄 Automatic Retries — Exponential backoff, configurable max attempts, failed job tracking
⏰ Delayed Jobs & Dispatching — Schedule future execution, priority queue support, clean dispatcher API
🔗 Job Batching & Chaining — Sequential chains, parallel batches with completion/failure/finally callbacks
⚡ Rate Limiting — Token-bucket RateLimiterInterface, per-queue or per-job-type throttling
🎯 Queue Events — JobProcessing, JobProcessed, JobFailed, BatchCompleted
📊 Web Dashboard — Real-time queue stats, failed job inspection, maintenance actions — auto-registered via QueueDashboardProvider
🛡️ Production Ready — Graceful SIGTERM/SIGINT shutdown, memory-limit protection, GC-aware polling, DI-driven worker
Installation
composer require monkeyscloud/monkeyslegion-queue
Requires PHP 8.4+, the
ext-redisextension (for the Redis driver), and the MonkeysLegion CLI package.
Configuration
MLC Format (config/queue.mlc)
queue {
# The default store to use (redis, database, null)
default = ${QUEUE_DEFAULT:-null}
# Core queue behavior
settings {
default_queue = ${QUEUE_DEFAULT_QUEUE:-default}
failed_queue = ${QUEUE_FAILED_QUEUE:-failed}
queue_prefix = ${QUEUE_PREFIX:-ml_queue}
retry_after = ${QUEUE_RETRY_AFTER:-90}
visibility_timeout = ${QUEUE_VISIBILITY_TIMEOUT:-300}
max_attempts = ${QUEUE_MAX_ATTEMPTS:-3}
path = ${QUEUE_VIEW_PATH:-ml-queue}
}
# Queue drivers
stores {
redis {
host = ${REDIS_HOST:-127.0.0.1}
port = ${REDIS_PORT:-6379}
username = ${REDIS_USERNAME:-null}
password = ${REDIS_PASSWORD:-null}
database = ${REDIS_DATABASE:-0}
timeout = ${REDIS_TIMEOUT:-2.0}
}
null {}
database {
table = ${QUEUE_DATABASE_TABLE:-jobs}
failed_table = ${QUEUE_DATABASE_FAILED_TABLE:-failed_jobs}
}
}
}
Environment Variables
# Queue QUEUE_DEFAULT=redis QUEUE_DEFAULT_QUEUE=default QUEUE_FAILED_QUEUE=failed QUEUE_PREFIX=ml_queue QUEUE_MAX_ATTEMPTS=3 # Redis REDIS_HOST=127.0.0.1 REDIS_PORT=6379 REDIS_PASSWORD= REDIS_DATABASE=0 REDIS_TIMEOUT=2.0 # Database driver QUEUE_DATABASE_TABLE=jobs QUEUE_DATABASE_FAILED_TABLE=failed_jobs
Quick Start
1 — Create a Queue Instance
use MonkeysLegion\Queue\Factory\QueueFactory; use MonkeysLegion\Database\Contracts\ConnectionInterface; $config = require 'config/queue.php'; // Null / Redis (no DB connection needed) $factory = new QueueFactory($config); $queue = $factory->make(); // default driver from config // Database driver requires a ConnectionInterface $factory = new QueueFactory($config, $dbConnection); $queue = $factory->driver('database');
2 — Define a Job
All dispatchable jobs must implement DispatchableJobInterface:
<?php namespace App\Jobs; use MonkeysLegion\Queue\Contracts\DispatchableJobInterface; class SendEmailJob implements DispatchableJobInterface { public function __construct( public string $email, public string $subject, public string $message, ) {} public function handle(): void { mail($this->email, $this->subject, $this->message); } }
Tip: Generate the boilerplate with
php console make:job SendEmailJob.
Marker Interfaces
| Interface | Behaviour |
|---|---|
DispatchableJobInterface |
Required. Provides the handle() contract. |
ShouldQueue |
Optional semantic marker. Explicitly signals "this class is async." All jobs are async by default. |
ShouldSync |
Causes the dispatcher to run handle() synchronously in the current process, skipping the queue entirely. |
3 — Dispatch
use MonkeysLegion\Queue\Dispatcher\QueueDispatcher; use App\Jobs\SendEmailJob; $dispatcher = new QueueDispatcher($queue); $job = new SendEmailJob('user@example.com', 'Welcome!', 'Thanks for joining'); // Immediate push $dispatcher->dispatch($job); // To a specific queue $dispatcher->dispatch($job, queue: 'emails'); // With delay (seconds) $dispatcher->dispatch($job, queue: 'emails', delay: 60); // At an absolute timestamp $dispatcher->dispatchAt($job, timestamp: time() + 3600, queue: 'emails');
The JobSerializer trait auto-extracts constructor parameters—including nested objects (serialised for safe JSON/DB storage)—so you never build payload arrays by hand.
4 — Run a Worker
php console queue:work --queue=emails --sleep=3 --tries=5 --memory=256 --timeout=120
| Flag | Default | Description |
|---|---|---|
--queue |
default |
Comma-separated list; first queue has highest priority |
--sleep |
3 |
Seconds to idle when the queue is empty |
--tries |
3 |
Max retry attempts per job |
--memory |
128 |
Memory limit in MB |
--timeout |
60 |
Per-job timeout in seconds |
Priority queues — --queue=high,default,low processes high first, then default, then low.
Worker output:
[09:45:12] • Worker started (queue=default)
[09:45:13] → Processing (class=App\Jobs\SendEmailJob, job_id=1a2b3c4d, attempts=1, queue=default)
[09:45:14] ✓ Completed (class=App\Jobs\SendEmailJob, job_id=1a2b3c4d, duration_ms=1250.45, queue=default)
[09:45:18] ⚠ Pushed to retry later (class=App\Jobs\SendEmailJob, job_id=5e6f7g8h, attempts=1, delay=1)
[09:45:22] ✗ Failed (class=App\Jobs\SendEmailJob, job_id=5e6f7g8h, attempts=3)
Graceful shutdown: kill -SIGTERM <pid> or Ctrl+C. The worker finishes the current job, then exits.
Advanced Usage
Job Chaining
Jobs run sequentially — each starts only after the previous succeeds. Chain metadata is embedded in the job payload; the Job wrapper dispatches the next job on completion.
$dispatcher->chain([ new DownloadFileJob($url), new ProcessFileJob($path), new NotifyUserJob($userId), ])->onQueue('files')->dispatch();
Important
Jobs inside a Chain or Batch are always queued, regardless of ShouldSync.
Job Batching
Group parallel jobs, track progress, and register callbacks:
$batch = $dispatcher->batch([ new ProcessImageJob($image1), new ProcessImageJob($image2), new ProcessImageJob($image3), ]) ->onQueue('images') ->then('App\\Callbacks\\BatchSuccess') ->catch('App\\Callbacks\\BatchFailed') ->finally('App\\Callbacks\\BatchComplete') ->dispatch(); echo $batch->progress() . '% complete'; echo $batch->getPendingJobs() . ' jobs remaining';
Batching requires a
BatchRepository(database-backed). Pass it toQueueDispatcher:$dispatcher = new QueueDispatcher($queue, new BatchRepository($dbConnection));
Rate Limiting
Throttle job throughput with the RateLimiterInterface contract. The built-in RateLimiter uses an in-memory token-bucket:
use MonkeysLegion\Queue\RateLimiter\RateLimiter; $rateLimiter = new RateLimiter( maxAttempts: 60, // 60 jobs … decaySeconds: 60, // … per minute );
The Worker resolves RateLimiterInterface from the DI container automatically. When rate-limited, jobs are released back with a delay.
Queue Events
use MonkeysLegion\Queue\Events\QueueEventDispatcher; use MonkeysLegion\Queue\Events\JobProcessed; use MonkeysLegion\Queue\Events\JobFailed; use MonkeysLegion\Queue\Events\JobProcessing; use MonkeysLegion\Queue\Events\BatchCompleted; $events = new QueueEventDispatcher(); $events->listen(JobProcessed::class, function (JobProcessed $e) { Log::info("Job {$e->job->getId()} done in {$e->processingTimeMs}ms"); }); $events->listen(JobFailed::class, function (JobFailed $e) { Log::error("Job failed: " . $e->exception->getMessage()); if (!$e->willRetry) { // Final failure — alert ops } });
The worker dispatches events automatically when QueueEventDispatcher is registered in the container.
Dashboard
The queue package ships a full web dashboard, auto-registered via the QueueDashboardProvider.
Setup
Add the provider to your framework's composer.json extra key (done automatically on install):
{
"extra": {
"monkeyslegion": {
"providers": [
"MonkeysLegion\\Queue\\Providers\\QueueDashboardProvider"
]
}
}
}
The provider registers template views, scans the DashboardController, and resolves the URL prefix from your queue config's path setting.
Routes
All routes live under /{path}/dashboard (default: /ml-queue/dashboard):
| Method | Route | Action |
|---|---|---|
GET |
/ |
Overview — queue list, stats, failed count |
GET |
/queues |
Browse jobs in a specific queue |
GET |
/delayed |
Inspect delayed/scheduled jobs |
GET |
/failed |
List failed jobs |
GET |
/maintenance |
Maintenance panel |
GET |
/job?id=…&queue=… |
Single job detail |
POST |
/job/delete |
Delete a job |
POST |
/failed/retry |
Retry failed jobs |
POST |
/failed/delete |
Delete specific failed jobs |
POST |
/failed/clear |
Purge all failed jobs |
POST |
/queue/clear |
Clear a queue |
POST |
/queue/purge |
Purge all queues |
POST |
/delayed/process |
Force-process delayed jobs now |
CLI Commands
Setup
# Create the jobs and failed_jobs tables interactively
php console queue:setup
Queue Management
php console queue:list # List all queues with statistics php console queue:stats default # View stats for a specific queue php console queue:clear default # Clear a queue
Failed Jobs
php console queue:failed --limit=20 # List failed jobs php console queue:retry --limit=100 # Retry failed → move back to original queue php console queue:flush # Permanently delete all failed jobs
Job Scaffolding
php console make:job ProcessOrderJob php console make:job Notifications/SendPushNotification
Programmatic Queue Operations
Push & Pop
// Direct push (raw array format) $queue->push([ 'job' => 'App\\Jobs\\SendEmailJob', 'payload' => ['user@example.com', 'Welcome!', 'Thanks'], ], 'emails'); // Delayed push $queue->later(60, [ 'job' => 'App\\Jobs\\SendReminderJob', 'payload' => ['user_id' => 123], ]); // Bulk push $queue->bulk([ ['job' => 'App\\Jobs\\SendEmailJob', 'payload' => ['a@b.com', 'Hi', 'Body']], ['job' => 'App\\Jobs\\SendEmailJob', 'payload' => ['c@d.com', 'Hi', 'Body']], ], 'emails');
Monitoring
$stats = $queue->getStats('default'); // ['ready' => 10, 'processing' => 2, 'delayed' => 5, 'failed' => 1] $count = $queue->count('emails'); $failedCount = $queue->countFailed(); $queues = $queue->getQueues();
Inspection
$jobs = $queue->listQueue('default', 10); $nextJob = $queue->peek('default'); $job = $queue->findJob('job_abc123', 'default');
Management
$queue->deleteJob('job_abc123', 'default'); $queue->moveJobToQueue('job_abc123', 'from_queue', 'to_queue'); $queue->clear('default'); $queue->purge();
Failed Jobs
$failedJobs = $queue->getFailed(20); $queue->retryFailed(100); $queue->removeFailedJobs(['job_123', 'job_456']); $queue->clearFailed();
Null Queue (Testing)
$factory = new QueueFactory([ 'default' => 'null', 'settings' => [], 'stores' => ['null' => []], ]); $queue = $factory->make(); $queue->push(['job' => 'TestJob', 'payload' => []]); // no-op $queue->pop(); // null $queue->count(); // 0
Job Retries — Exponential Backoff
| Attempt | Delay |
|---|---|
| 1 | 1 s |
| 2 | 2 s |
| 3 | 4 s |
| 4 | 8 s |
| 5 | 16 s |
| 6 | 32 s |
| 7+ | 60 s (cap) |
Architecture
src/
├── Abstract/
│ └── AbstractQueue.php # Base queue with config, encode/decode, default impls
├── Batch/
│ ├── Batch.php # Batch state container (progress, cancel, toArray)
│ ├── BatchRepository.php # Database-backed batch storage
│ └── PendingBatch.php # Fluent batch builder (then/catch/finally)
├── Chain/
│ └── PendingChain.php # Fluent chain builder (onQueue/dispatch)
├── Cli/
│ └── Command/
│ ├── MakeJobCommand.php
│ ├── QueueWorkCommand.php
│ ├── QueueListCommand.php
│ ├── QueueClearCommand.php
│ ├── QueueFailedCommand.php
│ ├── QueueRetryCommand.php
│ ├── QueueFlushCommand.php
│ ├── QueueStatsCommand.php
│ └── SetupDatabaseCommand.php # Interactive table creation
├── Contracts/
│ ├── DispatchableJobInterface.php # handle() contract for user jobs
│ ├── JobInterface.php # Internal job wrapper contract
│ ├── QueueDispatcherInterface.php # dispatch() / dispatchAt()
│ ├── QueueInterface.php # Queue driver contract (22 methods)
│ ├── ShouldQueue.php # Semantic async marker
│ ├── ShouldSync.php # Forces synchronous execution
│ └── WorkerInterface.php # work() / process() / stop()
├── Dashboard/
│ └── DashboardController.php # Web UI controller (attribute-routed)
├── Dispatcher/
│ └── QueueDispatcher.php # Dispatch, chain, batch entry point
├── Driver/
│ ├── DatabaseQueue.php # PDO / ConnectionInterface driver
│ ├── RedisQueue.php # ext-redis driver
│ └── NullQueue.php # No-op driver for tests
├── Events/
│ ├── QueueEventDispatcher.php # listen() / dispatch() / forget()
│ ├── JobProcessing.php # Before execution
│ ├── JobProcessed.php # After success (includes processingTimeMs)
│ ├── JobFailed.php # On failure (includes willRetry flag)
│ └── BatchCompleted.php # When batch finishes
├── Factory/
│ └── QueueFactory.php # make() / driver() factory
├── Helpers/
│ └── CliPrinter.php # Styled CLI output
├── Job/
│ └── Job.php # Internal wrapper — deserialise, handle, chain dispatch
├── Providers/
│ └── QueueDashboardProvider.php # Auto-registers dashboard views & routes
├── RateLimiter/
│ ├── RateLimiterInterface.php # attempt() / availableIn() / remaining() / reset()
│ └── RateLimiter.php # In-memory token-bucket implementation
├── Template/
│ └── views/dashboard/ # Dashboard Blade-like templates
├── Traits/
│ └── JobSerializer.php # serializeJob() / unserializeJob() — shared across dispatcher, chain, batch
└── Worker/
└── Worker.php # ContainerAware worker with signal handling & GC
Data Flow
┌─────────────────┐
│ Dispatcher │
│ dispatch(job) │
└────────┬────────┘
│ serialiseJob()
▼
┌─────────────────┐
│ Queue Driver │
│ (Redis / DB) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Worker │
│ pop → handle │
└────────┬────────┘
│
┌────┴────┐
▼ ▼
Success Failure
│ │
▼ ▼
ack() attempts < max?
├─ yes → release(delay)
└─ no → fail() → failed queue
Database Migrations
Three migration templates ship under migration/:
queue_jobs.sql— main jobs tablefailed_jobs.sql— failed jobs tablejob_batches.sql— batch tracking table
Run php console queue:setup for interactive table creation, or apply the SQL files directly.
Requirements
- PHP 8.4 or higher
- Redis extension (
ext-redis) for the Redis driver monkeyscloud/monkeyslegion-cli^2.0monkeyscloud/monkeyslegion-database^2.0 (for the Database driver & batch storage)
License
MIT License. See LICENSE for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues, questions, or suggestions, please open an issue on GitHub.
Roadmap
- Priority queues
- Job batching
- Job chaining
- Rate limiting
- Queue events / hooks
- Dashboard UI
- Metrics & analytics
- Horizontal scaling helpers
Made with ❤️ by MonkeysLegion