ruudk / absurd-php-sdk
PHP SDK for Absurd: a Durable Execution Engine for Postgres
Requires
- php: ^8.4
- ext-pdo: *
- psr/event-dispatcher: ^1.0
- psr/log: ^3.0
Requires (Dev)
- ext-bcmath: *
- ext-curl: *
- ext-pcntl: *
- carthage-software/mago: 1.1.0
- ergebnis/composer-normalize: ^2.47
- monolog/monolog: ^3.10
- phpunit/phpunit: ^12.5
- shipmonk/composer-dependency-analyser: ^1.8
- symfony/console: ^7.4 || ^8.0
- symfony/dotenv: ^8.0
- symfony/event-dispatcher: ^8.0
- symfony/property-access: ^8.0
- symfony/serializer: ^7.4 || ^8.0
Suggests
- ext-pcntl: enables signal-based fatal timeout via SIGALRM in LeaseMonitor
- symfony/serializer: for typed task payloads
This package is auto-updated.
Last update: 2026-04-19 16:09:29 UTC
README
PHP SDK for Absurd: a PostgreSQL-based durable task execution system.
Absurd is the simplest durable execution workflow system you can think of. It's entirely based on Postgres and nothing else. It's almost as easy to use as a queue, but it handles scheduling and retries, and it does all of that without needing any other services to run in addition to Postgres.
Note: This PHP SDK is still in its early stages. Absurd itself has been running in production at Earendil since its initial release.
What is Durable Execution?
Durable execution (or durable workflows) is a way to run long-lived, reliable functions that can survive crashes, restarts, and network failures without losing state or duplicating work. Instead of running your logic in memory, a durable execution system decomposes a task into smaller pieces (step functions) and records every step and decision.
How It Works
This SDK uses PHP Fibers to provide a clean, synchronous-looking
API for durable workflows. When you call methods like $ctx->step(), $ctx->awaitEvent(), or $ctx->sleepFor(), the
Fiber suspends execution, allowing the SDK to checkpoint progress to the database. When the task resumes (after a crash,
timeout, or event), execution continues from exactly where it left off.
This means you can write workflow code that looks like normal sequential PHP code, while the SDK handles all the complexity of persistence, retries, and resumption behind the scenes.
Installation
composer require ruudk/absurd-php-sdk
Quick Start
<?php use Ruudk\Absurd\Absurd; use Ruudk\Absurd\Connection\PdoConnection; use Ruudk\Absurd\Task\Context as TaskContext; // Create PDO connection $pdo = new PDO('pgsql:host=localhost;dbname=absurd'); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); // Create Absurd instance $absurd = new Absurd(new PdoConnection($pdo)); // Register a task handler $absurd->registerTask('order-fulfillment', function (array $params, TaskContext $ctx): array { // Each step is checkpointed - if the process crashes, we resume from the last completed step $payment = $ctx->step('process-payment', fn() => processPayment($params['amount'])); $inventory = $ctx->step('reserve-inventory', fn() => reserveItems($params['items'])); // Wait for an event - the task suspends until the event arrives $shipment = $ctx->awaitEvent("shipment.packed:{$params['orderId']}"); $ctx->step('send-notification', fn() => sendEmail($params['email'], $shipment)); return [ 'orderId' => $payment['id'], 'trackingNumber' => $shipment['trackingNumber'], ]; }); // Start a worker that pulls tasks from Postgres $worker = $absurd->startWorker(); $worker->start();
Client Configuration
The Absurd constructor accepts a Connection instance instead of a raw PDO object. Wrap your PDO with
PdoConnection (or implement your own adapter, e.g. for Doctrine DBAL):
use Ruudk\Absurd\Absurd; use Ruudk\Absurd\Connection\PdoConnection; use Symfony\Component\EventDispatcher\EventDispatcher; $pdo = new PDO('pgsql:host=localhost;dbname=absurd'); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $absurd = new Absurd( connection: new PdoConnection($pdo), defaultQueueName: 'default', // Default queue for tasks defaultMaxAttempts: 5, // Default retry attempts eventDispatcher: new EventDispatcher(), // Optional: for hooks and error handling );
The serializer defaults to JsonSerializer (no extra dependencies). Pass SymfonySerializer if you need typed object
deserialization:
use Ruudk\Absurd\Serialization\SymfonySerializer; $absurd = new Absurd( connection: new PdoConnection($pdo), serializer: new SymfonySerializer(), );
Custom Database Connection
Implement Connection\Connection to integrate with any database layer (e.g. Doctrine DBAL):
use Ruudk\Absurd\Connection\Connection; use Ruudk\Absurd\Exception\QueryException; final readonly class DbalConnection implements Connection { public function __construct(private \Doctrine\DBAL\Connection $dbal) {} public function fetchAll(string $sql, array $params = []): array { return $this->dbal->fetchAllAssociative($sql, $params); } // ... implement fetch(), execute(), scalar() } $absurd = new Absurd(connection: new DbalConnection($dbal));
Queue Management
// Create a queue (required before spawning tasks) $absurd->createQueue('my-queue'); // List all queues $queues = $absurd->listQueues(); // Returns: ['default', 'my-queue', ...] // Drop a queue and all its data $absurd->dropQueue('my-queue');
Task Registration
use Ruudk\Absurd\Task\RegisterOptions; use Ruudk\Absurd\Task\CancellationPolicy; // Simple registration $absurd->registerTask('my-task', function (array $params, TaskContext $ctx): array { // Task logic here return ['result' => 'done']; }); // Registration with options $absurd->registerTask( 'order-processor', function (array $params, TaskContext $ctx): array { // Task logic return []; }, new RegisterOptions( queue: 'orders', // Override default queue defaultMaxAttempts: 3, // Override default max attempts defaultCancellation: new CancellationPolicy( maxDuration: 3600, // Cancel after 1 hour total maxDelay: 300, // Cancel if delayed more than 5 minutes ), ), );
Spawning Tasks
use Ruudk\Absurd\Task\SpawnOptions; use Ruudk\Absurd\Task\RetryStrategy; use Ruudk\Absurd\Task\CancellationPolicy; // Simple spawn $result = $absurd->spawn('order-fulfillment', [ 'orderId' => '42', 'amount' => 9999, 'items' => ['widget-1', 'gadget-2'], 'email' => 'customer@example.com', ]); echo "Task ID: {$result->taskId}\n"; echo "Run ID: {$result->runId}\n"; echo "Attempt: {$result->attempt}\n"; echo "Created: " . ($result->created ? 'yes' : 'no (from cache)') . "\n"; // Spawn with options $result = $absurd->spawn( 'order-fulfillment', $params, new SpawnOptions( maxAttempts: 5, retryStrategy: RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300), cancellation: new CancellationPolicy(maxDuration: 3600), headers: ['priority' => 'high', 'trace_id' => 'abc123'], idempotencyKey: 'order-42', // Prevents duplicate task creation ), queue: 'orders', // Override queue for unregistered tasks ); // Check if task was newly created or returned from idempotency cache if ($result->created) { echo "New task created\n"; } else { echo "Existing task returned (idempotency key matched)\n"; }
Retry Strategies
use Ruudk\Absurd\Task\RetryStrategy; // Exponential backoff: 10s, 20s, 40s, 80s... up to 300s RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300); // Linear backoff: 10s, 20s, 30s, 40s... up to 300s RetryStrategy::linear(baseSeconds: 10, maxSeconds: 300); // Fixed delay: always 30s between retries RetryStrategy::fixed(seconds: 30); // No delay: immediate retry RetryStrategy::none();
Task Context Methods
Inside a task handler, you have access to TaskContext with these methods:
$absurd->registerTask('workflow', function (array $params, TaskContext $ctx): array { // Access task metadata $taskId = $ctx->taskId; $runId = $ctx->runId; $attempt = $ctx->attempt; $headers = $ctx->headers; // Custom headers from spawn options // Checkpoint a step - cached on retry $result = $ctx->step('step-name', fn() => expensiveOperation()); // Split step - when wrapping work in a closure isn't practical $handle = $ctx->beginStep('split-step'); if (!$handle->done) { $computed = expensiveOperation(); } $result = $ctx->completeStep($handle, $computed ?? null); // completeStep returns the cached state on replay, or the provided value on first run // Wait for an external event $eventPayload = $ctx->awaitEvent('payment-confirmed'); // Wait for event with timeout (throws TimeoutError if not received) use Ruudk\Absurd\Execution\AwaitEventOptions; $payload = $ctx->awaitEvent('webhook-received', new AwaitEventOptions( stepName: 'wait-webhook', // Custom checkpoint name timeout: 300, // 5 minute timeout )); // Sleep for a duration $ctx->sleepFor('delay', 60); // Sleep for 60 seconds // Sleep until a specific time $ctx->sleepUntil('scheduled', new DateTimeImmutable('tomorrow 9am')); // Emit an event (can wake other waiting tasks) $ctx->emitEvent('order-processed', ['orderId' => '123']); // Extend the lease for long-running operations $ctx->heartbeat(120); // Extend by 120 seconds $ctx->heartbeat(); // Extend by original claim timeout return ['done' => true]; });
Emitting Events
// Emit an event that a suspended task might be waiting for $absurd->emitEvent('shipment.packed:42', [ 'trackingNumber' => 'TRACK123', ]); // Emit to a specific queue $absurd->emitEvent('payment-confirmed', $payload, 'orders');
Cancelling Tasks
// Cancel a task by ID $absurd->cancelTask($taskId); // Cancel in a specific queue $absurd->cancelTask($taskId, 'orders');
Running tasks will stop at their next checkpoint, heartbeat, or await event call.
Retrying Tasks
Force a retry of a failed or cancelled task:
use Ruudk\Absurd\Task\RetryOptions; // Retry with default options $result = $absurd->retryTask($taskId); // Retry with a higher attempt limit or as a brand new task $result = $absurd->retryTask( $taskId, new RetryOptions( maxAttempts: 10, // Override max attempts for this retry spawnNewTask: true, // Spawn as a new task instead of resuming the existing one ), queue: 'orders', // Optional: override queue );
Use the BeforeRetryEvent to modify retry options from a listener (same pattern as BeforeSpawnEvent).
Retrieving Task Info
use Ruudk\Absurd\Task\TaskInfo; // Get task state and result $taskInfo = $absurd->getTask($taskId); if ($taskInfo === null) { echo "Task not found\n"; } // Check task state echo "State: {$taskInfo->state}\n"; // pending, running, sleeping, completed, failed, cancelled echo "Attempts: {$taskInfo->attempts}\n"; // Check terminal states if ($taskInfo->isCompleted()) { $result = $taskInfo->completedPayload; echo "Result: " . json_encode($result) . "\n"; } if ($taskInfo->isFailed()) { echo "Task failed after {$taskInfo->attempts} attempts\n"; } if ($taskInfo->isCancelled()) { echo "Task was cancelled\n"; } // Check if task reached any terminal state if ($taskInfo->isTerminal()) { echo "Task is done (completed, failed, or cancelled)\n"; } // Get from a specific queue $taskInfo = $absurd->getTask($taskId, 'orders');
Worker Configuration
use Ruudk\Absurd\Worker\WorkerOptions; use Psr\Log\NullLogger; $worker = $absurd->startWorker(new WorkerOptions( workerId: 'my-worker-1', // Unique identifier (default: hostname:pid) claimTimeout: 120, // Seconds before task lease expires (default: 120) batchSize: 5, // Number of tasks to claim at once (default: 1) pollInterval: 0.25, // Seconds between polls (default: 0.25) fatalOnLeaseTimeout: true, // Exit process if lease times out (default: true) logger: new NullLogger(), // PSR-3 logger for worker output )); // Handle graceful shutdown pcntl_async_signals(true); pcntl_signal(SIGTERM, fn() => $worker->stop()); pcntl_signal(SIGINT, fn() => $worker->stop()); $worker->start();
Worker Concurrency Model
PHP workers process tasks sequentially within a single process. The batchSize option controls how many tasks are
claimed from the database in each poll, but they are still executed one at a time. This design ensures:
- Predictable resource usage per worker
- Simple error isolation (one task failure doesn't affect others)
- No need for complex thread-safety considerations
For concurrent task processing, run multiple worker processes:
# Run 4 worker processes in parallel for i in {1..4}; do php worker.php & done
Or use a process manager like Supervisor:
[program:absurd-worker] command = php /path/to/worker.php numprocs = 4 process_name = %(program_name)s_%(process_num)02d
Events
Use a PSR-14 EventDispatcherInterface for lifecycle hooks and error handling.
Available Events
| Event | Dispatched when |
|---|---|
WorkerStartedEvent |
Worker begins polling |
WorkerStoppedEvent |
Worker stops |
WorkerRunningEvent |
After each poll cycle (idle or after batch completion) |
TaskStartedEvent |
A task is picked up for execution |
TaskCompletedEvent |
A task finishes (includes suspended tasks) |
TaskFailedEvent |
A task throws an unhandled exception |
TaskErrorEvent |
Any error occurs (task or worker level) |
BeforeSpawnEvent |
Before a task is spawned (options are mutable) |
BeforeRetryEvent |
Before a task is retried (options are mutable) |
TaskExecutionEvent |
Wraps task execution (for context propagation) |
use Ruudk\Absurd\Event\BeforeSpawnEvent; use Ruudk\Absurd\Event\TaskErrorEvent; use Ruudk\Absurd\Event\TaskExecutionEvent; use Ruudk\Absurd\Event\WorkerRunningEvent; use Symfony\Component\EventDispatcher\EventDispatcher; $dispatcher = new EventDispatcher(); // Handle task errors $dispatcher->addListener(TaskErrorEvent::class, function (TaskErrorEvent $event) { $exception = $event->exception; $task = $event->task; // May be null for non-task errors error_log(sprintf( 'Task error: %s (task: %s)', $exception->getMessage(), $task?->taskId ?? 'unknown', )); }); // Stop the worker after 100 tasks (like Symfony Messenger's MaxMessagesListener) $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) { static $handled = 0; $handled += $event->tasksHandled; if ($handled >= 100) { $event->worker->stop(); } }); // React to idle cycles (no tasks found) $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) { if ($event->isIdle()) { // e.g. flush metrics, release resources between idle cycles } }); // Modify spawn options before task creation (e.g., inject trace IDs) $dispatcher->addListener(BeforeSpawnEvent::class, function (BeforeSpawnEvent $event) { $event->options = $event->options->with( headers: array_merge($event->options->headers ?? [], [ 'trace_id' => getCurrentTraceId(), ]), ); }); // Wrap task execution (e.g., restore trace context) $dispatcher->addListener(TaskExecutionEvent::class, function (TaskExecutionEvent $event) { $event->wrapExecution(function (Closure $execute) use ($event) { $traceId = $event->context->headers['trace_id'] ?? null; $scope = TraceContext::restore($traceId); try { return $execute(); } finally { $scope->detach(); } }); }); $absurd = new Absurd(new PdoConnection($pdo), eventDispatcher: $dispatcher);
Typed Payloads
With a serializer that supports type hydration (like the shipped SymfonySerializer), you can use typed objects as task
parameters:
readonly class OrderPayload { public function __construct( public string $orderId, public int $amount, public array $items, ) {} } $absurd->registerTask('process-order', function (OrderPayload $order, TaskContext $ctx): array { // $order is automatically deserialized to OrderPayload echo "Processing order: {$order->orderId}\n"; return ['processed' => true]; }); // Spawn with typed payload $absurd->spawn('process-order', new OrderPayload( orderId: 'ord-123', amount: 9999, items: ['widget', 'gadget'], ));
Idempotency Keys
Use idempotency keys to prevent duplicate task creation:
// First call creates the task $result1 = $absurd->spawn('daily-report', $params, new SpawnOptions( idempotencyKey: 'daily-report-2024-01-15', )); echo $result1->created; // true // Second call with same key returns existing task $result2 = $absurd->spawn('daily-report', $differentParams, new SpawnOptions( idempotencyKey: 'daily-report-2024-01-15', )); echo $result2->created; // false echo $result2->taskId === $result1->taskId; // true
Use task ID for deriving idempotency keys for external APIs:
$absurd->registerTask('payment-task', function (array $params, TaskContext $ctx): array { $payment = $ctx->step('charge-card', function () use ($params, $ctx) { // Use taskId to create idempotency key for Stripe $idempotencyKey = "{$ctx->taskId}:payment"; return $stripe->charges->create([ 'amount' => $params['amount'], 'idempotency_key' => $idempotencyKey, ]); }); return $payment; });
Local Development
Prerequisites
- Docker and Docker Compose
- GitHub CLI (
gh) - for downloading Absurd binaries
Quick Start
# Start PostgreSQL, initialize Absurd schema, and launch Habitat UI make up # Stop everything make down # Clean up (removes binaries and database volumes) make clean
After running make up:
- PostgreSQL is available at
localhost:54329 - Habitat UI (task dashboard) is available at http://localhost:7890
Running the Examples
The SDK includes two comprehensive examples:
E-commerce Order Fulfillment
Demonstrates checkpoints, events, sub-tasks, and trace propagation in a realistic order processing scenario.
# Terminal 1: Start the worker php examples/Ecommerce/console consume # Terminal 2: Create an order and interact with the workflow php examples/Ecommerce/console produce
AI Agent with Tool Calling
Demonstrates a durable AI agent workflow based on this blog post. The agent loops through conversation steps, calling tools as needed, with each iteration durably checkpointed.
# Set your OpenAI API key export OPENAI_KEY=your-api-key # Terminal 1: Start the worker php examples/Agent/console consume # Terminal 2: Ask questions php examples/Agent/console ask
Features:
- Loop-based agent with automatic checkpointing per iteration
- OpenAI integration with tool calling (get_weather, search_web, calculate)
- Automatic recovery from crashes - resumes from last checkpoint
- Durable conversation state stored in PostgreSQL
Make Commands
| Command | Description |
|---|---|
make help |
Show available commands |
make setup |
Download absurdctl binary |
make up |
Start PostgreSQL, initialize Absurd, and run Habitat |
make down |
Stop containers |
make clean |
Remove binaries and database volumes |
Production Setup
For production, initialize Absurd in your PostgreSQL database:
# Install absurdctl from https://github.com/earendil-works/absurd/releases
absurdctl init -d your-database-name
absurdctl create-queue -d your-database-name default
API Reference
Absurd Class
| Method | Description |
|---|---|
registerTask(name, handler, options?) |
Register a task handler |
spawn(taskName, params, options?, queue?) |
Spawn a new task |
retryTask(taskId, options?, queue?) |
Retry a failed or cancelled task |
getTask(taskId, queueName?) |
Get task info by ID |
emitEvent(eventName, payload?, queueName?) |
Emit an event |
cancelTask(taskId, queueName?) |
Cancel a running task |
claimTasks(options?) |
Claim tasks for processing |
startWorker(options?) |
Start a worker |
createQueue(queueName?) |
Create a queue |
dropQueue(queueName?) |
Drop a queue |
listQueues() |
List all queues |
executeTask(task, claimTimeout, ...) |
Execute a claimed task |
TaskContext Class
| Method | Description |
|---|---|
step(name, value) |
Execute a checkpointed step |
beginStep(name) |
Begin a split step, returns a StepHandle |
completeStep(handle, value) |
Complete a split step, returns the value (cached on replay) |
awaitEvent(eventName, options?) |
Wait for an event |
sleepFor(stepName, duration) |
Sleep for a duration (seconds) |
sleepUntil(stepName, wakeAt) |
Sleep until a specific time |
emitEvent(eventName, payload?) |
Emit an event from within a task |
heartbeat(seconds?) |
Extend the task lease |
SpawnResult Class
| Property | Type | Description |
|---|---|---|
taskId |
string | Unique task identifier |
runId |
string | Current run identifier |
attempt |
int | Current attempt number |
created |
bool | True if newly created, false if from idempotency cache |