ruudk/absurd-php-sdk

PHP SDK for Absurd: a Durable Execution Engine for Postgres

Installs: 0

Dependents: 0

Suggesters: 0

Security: 0

Stars: 2

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/ruudk/absurd-php-sdk

0.1.0 2026-01-18 12:00 UTC

This package is auto-updated.

Last update: 2026-01-18 12:03:01 UTC


README

PHP SDK for Absurd: a PostgreSQL-based durable task execution system.

Absurd is the simplest durable execution workflow system you can think of. It's entirely based on Postgres and nothing else. It's almost as easy to use as a queue, but it handles scheduling and retries, and it does all of that without needing any other services to run in addition to Postgres.

Warning: This is an early experiment and should not be used in production.

What is Durable Execution?

Durable execution (or durable workflows) is a way to run long-lived, reliable functions that can survive crashes, restarts, and network failures without losing state or duplicating work. Instead of running your logic in memory, a durable execution system decomposes a task into smaller pieces (step functions) and records every step and decision.

How It Works

This SDK uses PHP Fibers to provide a clean, synchronous-looking API for durable workflows. When you call methods like $ctx->step(), $ctx->awaitEvent(), or $ctx->sleepFor(), the Fiber suspends execution, allowing the SDK to checkpoint progress to the database. When the task resumes (after a crash, timeout, or event), execution continues from exactly where it left off.

This means you can write workflow code that looks like normal sequential PHP code, while the SDK handles all the complexity of persistence, retries, and resumption behind the scenes.

Requirements

  • PHP 8.4+
  • PDO extension with PostgreSQL driver
  • PCNTL extension (for signal handling in workers)

Installation

composer require ruudk/absurd-php-sdk

Quick Start

<?php

use Ruudk\Absurd\Absurd;
use Ruudk\Absurd\Serialization\SymfonySerializer;
use Ruudk\Absurd\Task\Context as TaskContext;

// Create PDO connection
$pdo = new PDO('pgsql:host=localhost;dbname=absurd');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Create Absurd instance
$absurd = new Absurd($pdo, new SymfonySerializer());

// Register a task handler
$absurd->registerTask('order-fulfillment', function (array $params, TaskContext $ctx): array {
    // Each step is checkpointed - if the process crashes, we resume from the last completed step
    $payment = $ctx->step('process-payment', fn() => processPayment($params['amount']));

    $inventory = $ctx->step('reserve-inventory', fn() => reserveItems($params['items']));

    // Wait for an event - the task suspends until the event arrives
    $shipment = $ctx->awaitEvent("shipment.packed:{$params['orderId']}");

    $ctx->step('send-notification', fn() => sendEmail($params['email'], $shipment));

    return [
        'orderId' => $payment['id'],
        'trackingNumber' => $shipment['trackingNumber'],
    ];
});

// Start a worker that pulls tasks from Postgres
$worker = $absurd->startWorker();
$worker->start();

Client Configuration

use Ruudk\Absurd\Absurd;
use Ruudk\Absurd\Serialization\SymfonySerializer;
use Symfony\Component\EventDispatcher\EventDispatcher;

$pdo = new PDO('pgsql:host=localhost;dbname=absurd');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

$absurd = new Absurd(
    pdo: $pdo,
    serializer: new SymfonySerializer(),
    defaultQueueName: 'default',      // Default queue for tasks
    defaultMaxAttempts: 5,            // Default retry attempts
    eventDispatcher: new EventDispatcher(), // Optional: for hooks and error handling
);

Queue Management

// Create a queue (required before spawning tasks)
$absurd->createQueue('my-queue');

// List all queues
$queues = $absurd->listQueues();
// Returns: ['default', 'my-queue', ...]

// Drop a queue and all its data
$absurd->dropQueue('my-queue');

Task Registration

use Ruudk\Absurd\Task\RegisterOptions;
use Ruudk\Absurd\Task\CancellationPolicy;

// Simple registration
$absurd->registerTask('my-task', function (array $params, TaskContext $ctx): array {
    // Task logic here
    return ['result' => 'done'];
});

// Registration with options
$absurd->registerTask(
    'order-processor',
    function (array $params, TaskContext $ctx): array {
        // Task logic
        return [];
    },
    new RegisterOptions(
        queue: 'orders',                // Override default queue
        defaultMaxAttempts: 3,          // Override default max attempts
        defaultCancellation: new CancellationPolicy(
            maxDuration: 3600,          // Cancel after 1 hour total
            maxDelay: 300,              // Cancel if delayed more than 5 minutes
        ),
    ),
);

Spawning Tasks

use Ruudk\Absurd\Task\SpawnOptions;
use Ruudk\Absurd\Task\RetryStrategy;
use Ruudk\Absurd\Task\CancellationPolicy;

// Simple spawn
$result = $absurd->spawn('order-fulfillment', [
    'orderId' => '42',
    'amount' => 9999,
    'items' => ['widget-1', 'gadget-2'],
    'email' => 'customer@example.com',
]);

echo "Task ID: {$result->taskId}\n";
echo "Run ID: {$result->runId}\n";
echo "Attempt: {$result->attempt}\n";
echo "Created: " . ($result->created ? 'yes' : 'no (from cache)') . "\n";

// Spawn with options
$result = $absurd->spawn(
    'order-fulfillment',
    $params,
    new SpawnOptions(
        maxAttempts: 5,
        retryStrategy: RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300),
        cancellation: new CancellationPolicy(maxDuration: 3600),
        headers: ['priority' => 'high', 'trace_id' => 'abc123'],
        idempotencyKey: 'order-42',     // Prevents duplicate task creation
    ),
    queue: 'orders',                    // Override queue for unregistered tasks
);

// Check if task was newly created or returned from idempotency cache
if ($result->created) {
    echo "New task created\n";
} else {
    echo "Existing task returned (idempotency key matched)\n";
}

Retry Strategies

use Ruudk\Absurd\Task\RetryStrategy;

// Exponential backoff: 10s, 20s, 40s, 80s... up to 300s
RetryStrategy::exponential(baseSeconds: 10, factor: 2.0, maxSeconds: 300);

// Linear backoff: 10s, 20s, 30s, 40s... up to 300s
RetryStrategy::linear(baseSeconds: 10, maxSeconds: 300);

// Fixed delay: always 30s between retries
RetryStrategy::fixed(seconds: 30);

// No delay: immediate retry
RetryStrategy::none();

Task Context Methods

Inside a task handler, you have access to TaskContext with these methods:

$absurd->registerTask('workflow', function (array $params, TaskContext $ctx): array {
    // Access task metadata
    $taskId = $ctx->taskId;
    $runId = $ctx->runId;
    $attempt = $ctx->attempt;
    $headers = $ctx->headers;    // Custom headers from spawn options

    // Checkpoint a step - cached on retry
    $result = $ctx->step('step-name', fn() => expensiveOperation());

    // Wait for an external event
    $eventPayload = $ctx->awaitEvent('payment-confirmed');

    // Wait for event with timeout (throws TimeoutError if not received)
    use Ruudk\Absurd\Execution\AwaitEventOptions;
    $payload = $ctx->awaitEvent('webhook-received', new AwaitEventOptions(
        stepName: 'wait-webhook',   // Custom checkpoint name
        timeout: 300,               // 5 minute timeout
    ));

    // Sleep for a duration
    $ctx->sleepFor('delay', 60);  // Sleep for 60 seconds

    // Sleep until a specific time
    $ctx->sleepUntil('scheduled', new DateTimeImmutable('tomorrow 9am'));

    // Emit an event (can wake other waiting tasks)
    $ctx->emitEvent('order-processed', ['orderId' => '123']);

    // Extend the lease for long-running operations
    $ctx->heartbeat(120);     // Extend by 120 seconds
    $ctx->heartbeat();        // Extend by original claim timeout

    return ['done' => true];
});

Emitting Events

// Emit an event that a suspended task might be waiting for
$absurd->emitEvent('shipment.packed:42', [
    'trackingNumber' => 'TRACK123',
]);

// Emit to a specific queue
$absurd->emitEvent('payment-confirmed', $payload, 'orders');

Cancelling Tasks

// Cancel a task by ID
$absurd->cancelTask($taskId);

// Cancel in a specific queue
$absurd->cancelTask($taskId, 'orders');

Running tasks will stop at their next checkpoint, heartbeat, or await event call.

Retrieving Task Info

use Ruudk\Absurd\Task\TaskInfo;

// Get task state and result
$taskInfo = $absurd->getTask($taskId);

if ($taskInfo === null) {
    echo "Task not found\n";
}

// Check task state
echo "State: {$taskInfo->state}\n";  // pending, running, sleeping, completed, failed, cancelled
echo "Attempts: {$taskInfo->attempts}\n";

// Check terminal states
if ($taskInfo->isCompleted()) {
    $result = $taskInfo->completedPayload;
    echo "Result: " . json_encode($result) . "\n";
}

if ($taskInfo->isFailed()) {
    echo "Task failed after {$taskInfo->attempts} attempts\n";
}

if ($taskInfo->isCancelled()) {
    echo "Task was cancelled\n";
}

// Check if task reached any terminal state
if ($taskInfo->isTerminal()) {
    echo "Task is done (completed, failed, or cancelled)\n";
}

// Get from a specific queue
$taskInfo = $absurd->getTask($taskId, 'orders');

Worker Configuration

use Ruudk\Absurd\Worker\WorkerOptions;
use Psr\Log\NullLogger;

$worker = $absurd->startWorker(new WorkerOptions(
    workerId: 'my-worker-1',        // Unique identifier (default: hostname:pid)
    claimTimeout: 120,              // Seconds before task lease expires (default: 120)
    batchSize: 5,                   // Number of tasks to claim at once (default: 1)
    pollInterval: 0.25,             // Seconds between polls (default: 0.25)
    fatalOnLeaseTimeout: true,      // Exit process if lease times out (default: true)
    logger: new NullLogger(),       // PSR-3 logger for worker output
));

// Handle graceful shutdown
pcntl_async_signals(true);
pcntl_signal(SIGTERM, fn() => $worker->stop());
pcntl_signal(SIGINT, fn() => $worker->stop());

$worker->start();

Worker Concurrency Model

PHP workers process tasks sequentially within a single process. The batchSize option controls how many tasks are claimed from the database in each poll, but they are still executed one at a time. This design ensures:

  • Predictable resource usage per worker
  • Simple error isolation (one task failure doesn't affect others)
  • No need for complex thread-safety considerations

For concurrent task processing, run multiple worker processes:

# Run 4 worker processes in parallel
for i in {1..4}; do php worker.php & done

Or use a process manager like Supervisor:

[program:absurd-worker]
command=php /path/to/worker.php
numprocs=4
process_name=%(program_name)s_%(process_num)02d

Error Handling with Events

Use the PSR-14 EventDispatcher for error handling and lifecycle hooks:

use Ruudk\Absurd\Absurd;
use Ruudk\Absurd\Event\TaskErrorEvent;
use Ruudk\Absurd\Event\BeforeSpawnEvent;
use Ruudk\Absurd\Event\TaskExecutionEvent;
use Symfony\Component\EventDispatcher\EventDispatcher;

$dispatcher = new EventDispatcher();

// Handle task errors
$dispatcher->addListener(TaskErrorEvent::class, function (TaskErrorEvent $event) {
    $exception = $event->exception;
    $task = $event->task;  // May be null for non-task errors

    error_log(sprintf(
        'Task error: %s (task: %s)',
        $exception->getMessage(),
        $task?->taskId ?? 'unknown',
    ));
});

// Modify spawn options before task creation (e.g., inject trace IDs)
$dispatcher->addListener(BeforeSpawnEvent::class, function (BeforeSpawnEvent $event) {
    $event->options = $event->options->with(
        headers: array_merge($event->options->headers ?? [], [
            'trace_id' => getCurrentTraceId(),
        ]),
    );
});

// Wrap task execution (e.g., restore trace context)
$dispatcher->addListener(TaskExecutionEvent::class, function (TaskExecutionEvent $event) {
    $event->wrapExecution(function (Closure $execute) use ($event) {
        $traceId = $event->context->headers['trace_id'] ?? null;
        $scope = TraceContext::restore($traceId);
        try {
            return $execute();
        } finally {
            $scope->detach();
        }
    });
});

$absurd = new Absurd($pdo, $serializer, eventDispatcher: $dispatcher);

Typed Payloads

You can use typed objects as task parameters:

readonly class OrderPayload
{
    public function __construct(
        public string $orderId,
        public int $amount,
        public array $items,
    ) {}
}

$absurd->registerTask('process-order', function (OrderPayload $order, TaskContext $ctx): array {
    // $order is automatically deserialized to OrderPayload
    echo "Processing order: {$order->orderId}\n";

    return ['processed' => true];
});

// Spawn with typed payload
$absurd->spawn('process-order', new OrderPayload(
    orderId: 'ord-123',
    amount: 9999,
    items: ['widget', 'gadget'],
));

Idempotency Keys

Use idempotency keys to prevent duplicate task creation:

// First call creates the task
$result1 = $absurd->spawn('daily-report', $params, new SpawnOptions(
    idempotencyKey: 'daily-report-2024-01-15',
));
echo $result1->created;  // true

// Second call with same key returns existing task
$result2 = $absurd->spawn('daily-report', $differentParams, new SpawnOptions(
    idempotencyKey: 'daily-report-2024-01-15',
));
echo $result2->created;  // false
echo $result2->taskId === $result1->taskId;  // true

Use task ID for deriving idempotency keys for external APIs:

$absurd->registerTask('payment-task', function (array $params, TaskContext $ctx): array {
    $payment = $ctx->step('charge-card', function () use ($params, $ctx) {
        // Use taskId to create idempotency key for Stripe
        $idempotencyKey = "{$ctx->taskId}:payment";
        return $stripe->charges->create([
            'amount' => $params['amount'],
            'idempotency_key' => $idempotencyKey,
        ]);
    });

    return $payment;
});

Local Development

Prerequisites

  • Docker and Docker Compose
  • GitHub CLI (gh) - for downloading Absurd binaries

Quick Start

# Start PostgreSQL, initialize Absurd schema, and launch Habitat UI
make up

# Stop everything
make down

# Clean up (removes binaries and database volumes)
make clean

After running make up:

  • PostgreSQL is available at localhost:54329
  • Habitat UI (task dashboard) is available at http://localhost:7890

Running the Examples

The SDK includes two comprehensive examples:

E-commerce Order Fulfillment

Demonstrates checkpoints, events, sub-tasks, and trace propagation in a realistic order processing scenario.

# Terminal 1: Start the worker
php examples/Ecommerce/console consume

# Terminal 2: Create an order and interact with the workflow
php examples/Ecommerce/console produce

AI Agent with Tool Calling

Demonstrates a durable AI agent workflow based on this blog post. The agent loops through conversation steps, calling tools as needed, with each iteration durably checkpointed.

# Set your OpenAI API key
export OPENAI_KEY=your-api-key

# Terminal 1: Start the worker
php examples/Agent/console consume

# Terminal 2: Ask questions
php examples/Agent/console ask

Features:

  • Loop-based agent with automatic checkpointing per iteration
  • OpenAI integration with tool calling (get_weather, search_web, calculate)
  • Automatic recovery from crashes - resumes from last checkpoint
  • Durable conversation state stored in PostgreSQL

Make Commands

Command Description
make help Show available commands
make setup Download absurdctl binary
make up Start PostgreSQL, initialize Absurd, and run Habitat
make down Stop containers
make clean Remove binaries and database volumes

Production Setup

For production, initialize Absurd in your PostgreSQL database:

# Install absurdctl from https://github.com/earendil-works/absurd/releases
absurdctl init -d your-database-name
absurdctl create-queue -d your-database-name default

API Reference

Absurd Class

Method Description
registerTask(name, handler, options?) Register a task handler
spawn(taskName, params, options?, queue?) Spawn a new task
getTask(taskId, queueName?) Get task info by ID
emitEvent(eventName, payload?, queueName?) Emit an event
cancelTask(taskId, queueName?) Cancel a running task
claimTasks(options?) Claim tasks for processing
startWorker(options?) Start a worker
createQueue(queueName?) Create a queue
dropQueue(queueName?) Drop a queue
listQueues() List all queues
executeTask(task, claimTimeout, ...) Execute a claimed task

TaskContext Class

Method Description
step(name, value) Execute a checkpointed step
awaitEvent(eventName, options?) Wait for an event
sleepFor(stepName, duration) Sleep for a duration (seconds)
sleepUntil(stepName, wakeAt) Sleep until a specific time
emitEvent(eventName, payload?) Emit an event from within a task
heartbeat(seconds?) Extend the task lease

SpawnResult Class

Property Type Description
taskId string Unique task identifier
runId string Current run identifier
attempt int Current attempt number
created bool True if newly created, false if from idempotency cache