cainydev / laragraph
Build stateful, multi-agent LLM workflows in Laravel. Adapts LangGraph’s cyclic graphs for PHP’s stateless architecture with built-in persistence.
Fund package maintenance!
Requires
- php: ^8.4
- illuminate/contracts: ^11.0||^12.0||^13.0
- prism-php/prism: ^0.99|^0.100
- spatie/laravel-package-tools: ^1.16
Requires (Dev)
- inertiajs/inertia-laravel: ^2.0
- larastan/larastan: ^3.0
- laravel/ai: ^0.3.2
- laravel/pint: ^1.14
- laravel/reverb: ^1.0
- nunomaduro/collision: ^8.8
- orchestra/testbench: ^10.11||^11.0
- pestphp/pest: ^4.0
- pestphp/pest-plugin-arch: ^4.0
- pestphp/pest-plugin-laravel: ^4.0
- phpstan/extension-installer: ^1.4
- phpstan/phpstan-deprecation-rules: ^2.0
- phpstan/phpstan-phpunit: ^2.0
Suggests
- laravel/ai: Required to use the Laragraph Laravel AI integration traits.
- prism-php/prism: Required to use PrismNode and ToolNode.
- dev-main
- v0.3.0
- v0.2.3
- v0.2.2
- v0.2.1
- v0.2.0
- v0.1.7
- v0.1.6
- v0.1.5
- v0.1.4
- v0.1.3
- v0.1.2
- v0.1.1
- v0.1.0
- dev-dependabot/composer/inertiajs/inertia-laravel-tw-3.1
- dev-dependabot/composer/laravel/ai-tw-0.6.7
- dev-dependabot/composer/laravel/ai-tw-0.6.5
- dev-dependabot/github_actions/dependabot/fetch-metadata-3.1.0
- dev-dependabot/composer/inertiajs/inertia-laravel-tw-3.0
This package is auto-updated.
Last update: 2026-05-08 21:37:41 UTC
README
LaraGraph
Stateful, graph-based workflow engine for Laravel.
Build multi-step agent pipelines, human-in-the-loop processes, and parallel fan-out/fan-in tasks — all backed by your database and queue.
Inspired by LangGraph
Table of Contents
- Installation
- Core Concepts
- Building a Workflow
- Running a Workflow
- State
- Human-in-the-Loop
- Node Contracts
- Built-in Nodes
- Prism Integration
- Laravel AI Integration
- Sub-graph Workflows
- Recursion Limit
- Events
- Configuration
- Testing
Installation
composer require cainy/laragraph
Publish and run the migration:
php artisan vendor:publish --tag="laragraph-migrations"
php artisan migrate
Publish the config file:
php artisan vendor:publish --tag="laragraph-config"
Core Concepts
LaraGraph models a workflow as a directed graph of nodes connected by edges. Each run of that graph is a WorkflowRun — a database record that tracks the current state, status, and active node pointers.
| Term | Meaning |
|---|---|
| Node | A unit of work. Receives the current state, returns a mutation. |
| Edge | A directed connection between two nodes, optionally conditional. |
| State | A plain PHP array that accumulates mutations as nodes execute. |
| Pointer | Tracks which nodes are currently in-flight for a run. |
| WorkflowRun | The persisted record for a single execution of a workflow. |
Execution is fully queue-driven. Each node runs as an independent ExecuteNode job, so parallel branches execute concurrently across your worker pool.
Building a Workflow
Workflows are classes that extend Workflow and define their graph in a definition() method:
use Cainy\Laragraph\Builder\Workflow; class MyPipeline extends Workflow { public function definition(): void { $this->addNode('fetch', FetchNode::class) ->addNode('transform', TransformNode::class) ->addNode('store', StoreNode::class) ->transition(Workflow::START, 'fetch') ->transition('fetch', 'transform') ->transition('transform', 'store') ->transition('store', Workflow::END); } }
You can also call compile() directly on a Workflow instance if you prefer building inline, but the class-based approach is recommended since workflows are stored by class name.
Nodes
A node is any class implementing Cainy\Laragraph\Contracts\Node:
use Cainy\Laragraph\Contracts\Node; use Cainy\Laragraph\Engine\NodeExecutionContext; class SummarizeNode implements Node { public function handle(NodeExecutionContext $context, array $state): array { $text = implode("\n", $state['paragraphs'] ?? []); return ['summary' => substr($text, 0, 200)]; } }
handle() receives a typed NodeExecutionContext and the current full state. It returns an array of mutations — only the keys you want to change.
NodeExecutionContext
$context->runId // int — ID of the WorkflowRun $context->workflowKey // string — class name of the workflow $context->nodeName // string — name of this node in the graph $context->attempt // int — current queue attempt (1-based) $context->maxAttempts // int — maximum attempts configured $context->createdAt // DateTimeImmutable $context->isolatedPayload // ?array — payload injected by a Send (see Dynamic Fan-out) $context->parentRunId // ?int — set when this run was dispatched as a child workflow $context->parentNodeName // ?string — the sub-graph node name on the parent $context->routing // array — read-only engine routing snapshot (counters, interrupt marker, etc.) // Helpers: $context->isSendExecution() // bool — true when dispatched via a Send $context->payload('key', $default) // mixed — read a value from the isolated payload $context->parentMetadata() // ?array — lazy-loads the parent run's metadata (null at top-level)
User state vs engine state: $state only contains keys your nodes write. Engine
bookkeeping (spawn counters, interrupt markers, gate reasons, child-run ids,
error summaries) lives on a separate workflow_runs.routing column and is
never merged into $state. Read it via $context->routing when you need it.
Transitions
$this->transition(Workflow::START, 'fetch') ->transition('fetch', 'transform') ->transition('transform', Workflow::END);
Workflow::START and Workflow::END are reserved entry and exit pseudo-nodes.
Nodes can be registered as class strings (resolved via the container) or as pre-built instances.
Conditional Edges
Pass a Closure as the third argument to ->transition():
->transition('classify', 'approve', fn(array $state) => $state['score'] > 50) ->transition('classify', 'reject', fn(array $state) => $state['score'] <= 50)
Branch Edges
A branch edge uses a resolver to return one or more target node names dynamically at runtime:
->branch('router', function(array $state): string { return $state['approved'] ? 'publish' : 'revise'; }, targets: ['publish', 'revise'])
The targets array is optional but recommended — it enables graph visualization without executing the resolver.
Parallel Branches
To execute multiple nodes in parallel from a single node, add multiple transitions from the same source:
$this->addNode('split', SplitNode::class) ->addNode('branch-a', BranchANode::class) ->addNode('branch-b', BranchBNode::class) ->addNode('merge', MergeNode::class) ->transition(Workflow::START, 'split') ->transition('split', 'branch-a') ->transition('split', 'branch-b') ->transition('branch-a', 'merge') ->transition('branch-b', 'merge') ->transition('merge', Workflow::END);
branch-a and branch-b run as independent queue jobs. Use a BarrierNode as the merge node to wait for all branches before continuing.
Dynamic Fan-out with Send
To fan out over a dynamic list, return Send objects from a branch edge resolver:
use Cainy\Laragraph\Routing\Send; ->branch('planner', function(array $state): array { return array_map( fn(string $query) => new Send('worker', ['query' => $query]), $state['queries'] ); }, targets: ['worker'])
Each Send dispatches an independent ExecuteNode job. The target node receives the payload via $context->isolatedPayload or the helper methods:
public function handle(NodeExecutionContext $context, array $state): array { $query = $context->payload('query'); // ... }
The same fan-out is available via the SendNode prebuilt (see Built-in Nodes).
Fan-out Patterns
Three canonical shapes cover almost every real workflow:
1. One-shot fan-out → barrier. Single worker node per item, then a
BarrierNode. Use for independent work with no per-item downstream steps.
->branch('split', fn($s) => array_map(fn($id) => new Send('worker', ['id' => $id]), $s['ids']), ['worker']) ->transition('worker', 'barrier') ->transition('barrier', 'aggregate')
2. Per-item pipeline → child workflow → barrier. When each item needs a
multi-step pipeline (enrich → qualify → classify → draft), make the pipeline
its own Workflow and dispatch it as a sub-graph via Send::toWorkflow().
The payload becomes the child's initial state (parent state is not
leaked), and each Send spawns its own isolated child run.
class LeadPipeline extends Workflow { public function definition(): void { $this->addNode('enrich', EnrichNode::class) ->addNode('qualify', QualifyNode::class) ->addNode('classify', ClassifyNode::class) ->transition(Workflow::START, 'enrich') ->transition('enrich', 'qualify') ->transition('qualify', 'classify') ->transition('classify', Workflow::END); } } class FanoutPipeline extends Workflow { public function definition(): void { $this->addNode('per_lead', app(LeadPipeline::class)) ->addNode('barrier', new BarrierNode()) ->branch(Workflow::START, fn($s) => collect($s['lead_ids'])->map( fn($id) => Send::toWorkflow('per_lead', ['lead_id' => $id]) )->all(), ['per_lead']) ->transition('per_lead', 'barrier') ->transition('barrier', Workflow::END); } }
Why a sub-workflow rather than chaining sibling nodes in the parent? Send
payloads are per-job and don't propagate through subsequent static edges
(enrich → qualify would see payload = null). Inside a child workflow the
payload becomes state, so enrich writes to state and qualify sees it on
the next hop — no payload-threading needed.
3. Parent supervises long-running children. Same shape as (2), but use
cascadeFailure(false) on the child workflow when you want a map-reduce style
aggregate where one child's failure shouldn't fail the whole run. See
Sub-graph Workflows below.
Running a Workflow
Starting a Run
use Cainy\Laragraph\Facades\Laragraph; $run = Laragraph::run(MyPipeline::class, initialState: [ 'input' => 'Hello, world!', ]); echo $run->id; // WorkflowRun ID echo $run->status; // RunStatus::Running
Pass an optional metadata array as the third argument to attach correlation data that travels with the run without being visible to nodes:
$run = Laragraph::run(MyPipeline::class, initialState: ['input' => 'Hello'], metadata: ['trace_id' => $traceId, 'user_id' => $userId], ); $run->metadata; // ['trace_id' => ..., 'user_id' => ...]
The run is created synchronously. Node jobs are dispatched to your queue immediately after.
Controlling a Run
// Pause a running workflow Laragraph::pause($run->id); // Resume a paused workflow, optionally merging additional state Laragraph::resume($run->id, ['approved' => true]); // Abort a workflow (sets status to Failed, clears all pointers) Laragraph::abort($run->id);
Lifecycle Hooks
Override any of these methods on your Workflow subclass to react to run lifecycle events. Hook exceptions are swallowed and never affect engine state.
class MyPipeline extends Workflow { public function definition(): void { /* ... */ } public function onStarting(WorkflowRun $run): void { Log::info("Run {$run->id} starting"); } public function onCompleted(WorkflowRun $run): void { Cache::forget("pipeline:{$run->metadata['trace_id']}"); } public function onFailed(WorkflowRun $run, Throwable $exception): void { report($exception); } }
State
State is a plain PHP array that persists in the workflow_runs.state column. Every node receives the full current state and returns a mutation — a partial array of keys to update.
The reducer determines how mutations are merged into the existing state.
Reducers
LaraGraph ships with three reducers:
| Class | Behaviour |
|---|---|
SmartReducer (default) |
List arrays are appended. Scalars and associative arrays are overwritten. |
MergeReducer |
Deep recursive merge for all keys. |
OverwriteReducer |
Shallow array_merge — always overwrites. |
SmartReducer is the right default for most agent workflows: message histories accumulate naturally, while scalar values like status or score simply overwrite.
Custom Reducer
Implement StateReducerInterface and bind it in your service provider, or attach it to a specific workflow:
// Globally $this->app->bind(StateReducerInterface::class, MyReducer::class); // Per workflow $this->withReducer(MyReducer::class)
Human-in-the-Loop
LaraGraph has first-class support for pausing workflows and waiting for human input.
interrupt_before
Pause the run before a node executes. On resume, the node runs normally.
$this->addNode('review', ReviewNode::class) ->interruptBefore('review');
interrupt_after
Pause the run after a node executes but before its outgoing edges are evaluated.
$this->addNode('drafter', DrafterNode::class) ->addNode('publish', PublishNode::class) ->transition(Workflow::START, 'drafter') ->transition('drafter', 'publish') ->transition('publish', Workflow::END) ->interruptAfter('drafter');
Resuming
Call Laragraph::resume() with any additional state to merge before the run continues:
Laragraph::resume($run->id, [ 'meta' => ['approved' => true], ]);
Dynamic Pause from a Node
Any node can pause the run at runtime by throwing NodePausedException:
use Cainy\Laragraph\Exceptions\NodePausedException; class ConfidenceCheckNode implements Node { public function handle(NodeExecutionContext $context, array $state): array { if ($state['confidence'] < 0.7) { throw new NodePausedException($context->nodeName); } return ['status' => 'confident']; } }
You can also pass state mutations to persist before pausing, and surface a
human-readable gate reason that rides on the engine's routing column (not
$state):
throw new NodePausedException( nodeName: $context->nodeName, stateMutation: ['draft_attempt' => ($state['draft_attempt'] ?? 0) + 1], gateReason: 'Score too low — human review required', );
The HumanInterventionRequired event fires with (runId, nodeName, gateReason).
Gate reason is also readable on the paused run via $run->routing['gate_reason'].
Node Contracts
Nodes can implement optional contracts to declare capabilities to the engine.
HasName
Give a node a stable identifier used in edge routing and graph visualization:
use Cainy\Laragraph\Contracts\HasName; class ResearchAgentNode implements Node, HasName { public function name(): string { return 'research-agent'; } }
HasTags
Emit metadata alongside each node execution — useful for tracking token usage, model names, cost centers, or tenant IDs. Tags are automatically persisted to the workflow_node_executions table and broadcast on the NodeCompleted event:
use Cainy\Laragraph\Contracts\HasTags; class LLMNode implements Node, HasTags { private string $model = ''; private int $tokens = 0; public function handle(NodeExecutionContext $context, array $state): array { // ... call LLM, populate $this->model and $this->tokens ... return ['response' => $result]; } public function tags(): array { return [ 'model' => $this->model, 'tokens' => $this->tokens, 'cost_usd' => $this->tokens * 0.000003, ]; } }
The engine calls tags() after handle() returns, so the node can accumulate values during execution and expose them at the end.
Querying execution history
// All executions for a run $run->nodeExecutions; // Total cost for a run $run->nodeExecutions->sum(fn($e) => $e->tags['cost_usd'] ?? 0); // Per-node cost breakdown $run->nodeExecutions ->groupBy('node_name') ->map(fn($execs) => $execs->sum(fn($e) => $e->tags['cost_usd'] ?? 0)); // Failed executions only $run->nodeExecutions->filter(fn($e) => $e->failed());
NodeExecution columns: run_id, node_name, attempt, tags (JSON),
executed_at, error_class, error_message, error_trace, failed_at.
When a node fails after exhausting retries, the engine writes exactly one row
with the error columns populated — no polluting state.error or parsing
Laravel logs.
HasRetryPolicy
Define per-node retry behaviour with exponential backoff and optional jitter:
use Cainy\Laragraph\Contracts\HasRetryPolicy; use Cainy\Laragraph\Engine\RetryPolicy; class FlakyAPINode implements Node, HasRetryPolicy { public function retryPolicy(): RetryPolicy { return new RetryPolicy( initialInterval: 1.0, backoffFactor: 2.0, maxInterval: 30.0, maxAttempts: 5, jitter: true, ); } }
Restrict retries to specific exception types:
new RetryPolicy( maxAttempts: 3, retryOn: [RateLimitException::class, TimeoutException::class], ) // Or with a Closure for full control: new RetryPolicy( maxAttempts: 3, retryOn: fn(Throwable $e) => $e->getCode() === 429, )
HasQueue
Route a node's job to a specific queue or connection:
use Cainy\Laragraph\Contracts\HasQueue; class HeavyLLMNode implements Node, HasQueue { public function queue(): string { return 'llm'; } public function connection(): ?string { return null; // use default connection } }
HasMiddleware
Attach Laravel job middleware to a node's execution job:
use Cainy\Laragraph\Contracts\HasMiddleware; use Illuminate\Queue\Middleware\RateLimited; class AnthropicNode implements Node, HasMiddleware { public function middleware(): array { return [new RateLimited('anthropic')]; } }
HasLoop
Declare that a node should loop — driving tool execution cycles, polling, or any other repeated sub-task. The compiler automatically injects the loop edges at compile time.
use Cainy\Laragraph\Contracts\HasLoop; class PollingNode implements Node, HasLoop { public function loopNode(string $nodeName): Node { return new CheckStatusNode(); } public function loopCondition(): \Closure { return fn(array $state) => $state['status'] !== 'done'; } }
When compiled, the engine injects a {name}.__loop__ node and guards existing exit edges with the negated condition. Use Workflow::toolNode('name') to reference the synthetic loop node in interrupt points:
->interruptBefore(Workflow::toolNode('agent'))
IsFanInBarrier
Mark a node as a fan-in barrier. The engine tracks how many workers were dispatched into this node and how many have committed their results. It serialises concurrent arrivals under a database lock, and only the final arrival — the one that sees all predecessors complete — runs handle(). All earlier arrivals skip cleanly.
use Cainy\Laragraph\Contracts\IsFanInBarrier; class MyBarrierNode implements Node, IsFanInBarrier { public function handle(NodeExecutionContext $context, array $state): array { // Only called once — after every predecessor has committed. return ['merged' => true]; } }
BarrierNode implements IsFanInBarrier out of the box. Implement it on any custom node that acts as a convergence point for parallel branches.
Built-in Nodes
GateNode
Pauses the workflow unconditionally until manually resumed. Use as a static approval gate.
use Cainy\Laragraph\Nodes\GateNode; $this->addNode('approve', new GateNode(reason: 'Manager approval required')) ->transition('draft', 'approve') ->transition('approve', 'publish');
When the gate triggers, a HumanInterventionRequired event fires with the
reason. The reason is also stored on $run->routing['gate_reason'] (engine
state — not merged into $state). Resume via Laragraph::resume($runId).
GateNodeis a one-shot pause: itshandle()unconditionally throws. For human-in-the-loop checkpoints where you still want the node's side effects to run first, useinterruptAfter()on a regular node instead — see Human-in-the-Loop.
SendNode
Fan-out node — dispatches a Send for each item in a state list, sending each to the same target node with an isolated payload.
use Cainy\Laragraph\Nodes\SendNode; $this->addNode('fanout', new SendNode( sourceKey: 'queries', targetNode: 'worker', payloadKey: 'query', )) ->addNode('worker', WorkerNode::class) ->transition(Workflow::START, 'fanout') ->transition('fanout', 'worker');
Inside WorkerNode, access the payload via $context->payload('query').
BarrierNode
Fan-in barrier — waits for all parallel workers to complete before allowing the downstream edge to fire. Zero configuration required.
use Cainy\Laragraph\Nodes\BarrierNode; ->addNode('barrier', new BarrierNode()) ->transition('worker', 'barrier') ->transition('barrier', 'aggregator')
The engine automatically tracks how many workers were dispatched into the barrier and how many have committed their results. Early arrivals skip cleanly (removing their pointer to maintain equilibrium). Only the final arrival — when all predecessors are fully complete — runs handle() and evaluates the downstream edges. The node body itself is a no-op; all logic lives in the engine.
Works with both transition fan-out and Send-based fan-out, including multiple sequential barriers in the same workflow.
HttpNode
Makes an HTTP request and stores the response in state. The URL supports {state.key} interpolation.
use Cainy\Laragraph\Nodes\HttpNode; ->addNode('fetch', new HttpNode( url: 'https://api.example.com/items/{state.item_id}', method: 'GET', headers: ['Authorization' => 'Bearer token'], responseKey: 'api_response', ))
The response is stored as ['status' => 200, 'body' => [...], 'ok' => true] under responseKey.
For POST/PUT/PATCH requests, set bodyKey to a state key whose value will be sent as the request body:
new HttpNode(url: '...', method: 'POST', bodyKey: 'payload', responseKey: 'result')
DelayNode
Pauses execution for a given number of seconds, then continues.
use Cainy\Laragraph\Nodes\DelayNode; ->addNode('wait', new DelayNode(seconds: 300))
On first execution the node stores a resume-after timestamp, dispatches a delayed queue job, and pauses. The job automatically calls Laragraph::resume() when the delay elapses — no scheduled command or polling required.
CacheNode
Reads from or writes to the Laravel cache. The cache key supports {state.key} interpolation.
use Cainy\Laragraph\Nodes\CacheNode; ->addNode('load', new CacheNode(operation: 'get', cacheKey: 'report:{state.user_id}', stateKey: 'cached_report')) ->addNode('store', new CacheNode(operation: 'put', cacheKey: 'report:{state.user_id}', stateKey: 'report', ttl: 3600)) ->addNode('bust', new CacheNode(operation: 'forget', cacheKey: 'report:{state.user_id}', stateKey: 'report'))
NotifyNode
Dispatches a Laravel event with values from state as constructor arguments.
use Cainy\Laragraph\Nodes\NotifyNode; ->addNode('notify', new NotifyNode( eventClass: ReportReady::class, dataKeys: ['user_id', 'report_url'], ))
Prism Integration
LaraGraph ships with first-class support for Prism via the Cainy\Laragraph\Integrations\Prism namespace.
composer require prism-php/prism
PrismNode
A general-purpose LLM node. Supports text generation and structured output —
no tool loop injected by default. Use PrismToolNode (below) when you
want tool-calling with automatic re-entry.
use Cainy\Laragraph\Integrations\Prism\PrismNode; use Prism\Prism\Enums\Provider; $this->addNode('assistant', new PrismNode( provider: Provider::Anthropic, model: 'claude-sonnet-4-6', systemPrompt: 'You are a helpful assistant.', maxTokens: 1024, ));
By default the assistant's response is appended to state['messages'].
Overridable hooks
Subclass when you need dynamic behaviour or structured output:
use Prism\Prism\Contracts\Schema; use Prism\Prism\Schema\ObjectSchema; use Prism\Prism\Schema\StringSchema; class ClassifierNode extends PrismNode { protected function systemPrompt(array $state): string { return 'Classify the input into category + confidence.'; } protected function prompt(array $state): string { return "Input: {$state['text']}"; } protected function schema(): ?Schema { return new ObjectSchema( name: 'classification', description: 'Result of the classification', properties: [ new StringSchema('category', 'category'), new StringSchema('confidence', 'confidence 0..1'), ], requiredFields: ['category', 'confidence'], ); } public function outputKey(): string { return 'classification'; // default: 'output' } }
When schema() returns non-null, the node calls Prism's structured-output path
and writes the result to state[outputKey()] instead of state['messages'].
Available overrides: provider(), model(), maxTokens(), temperature(),
topP(), systemPrompt($state), prompt($state), messages($state),
tools(), schema(), messagesKey(), outputKey(), applyProviderOptions().
PrismToolNode
Extends PrismNode and implements HasLoop — the compiler injects a
synthetic .__loop__ tool-executor so the node re-enters itself after each
tool call completes (classic ReAct loop).
use Cainy\Laragraph\Integrations\Prism\PrismToolNode; use Prism\Prism\Tool; class WeatherAgent extends PrismToolNode { public function tools(): array { return [ (new Tool) ->as('get_weather') ->for('Get weather for a city') ->withStringParameter('city', 'City name') ->using(fn(string $city): string => "Sunny, 22°C in {$city}"), ]; } } $this->addNode('agent', new WeatherAgent( provider: Provider::Anthropic, model: 'claude-sonnet-4-6', ));
The injected graph:
START → agent ──(tool_calls present)──→ agent.__loop__ → agent
──(no tool_calls)──────→ END
To interrupt before tool execution runs:
->interruptBefore(Workflow::toolNode('agent'))
ToolNode (manual routing)
Abstract base for nodes that execute tool calls from state['messages']
without the automatic loop. Use when you want explicit edges around tool
execution (for conditional routing, logging, approval gates, etc.).
use Cainy\Laragraph\Integrations\Prism\ToolNode; class WeatherToolNode extends ToolNode { protected function toolMap(): array { return [ 'get_weather' => fn(array $args): string => "Sunny, 22°C in " . ($args['city'] ?? 'unknown'), ]; } }
Wire the edges yourself:
$this->addNode('agent', MyAgentNode::class) ->addNode('tools', WeatherToolNode::class) ->transition(Workflow::START, 'agent') ->transition('agent', 'tools', fn($s) => ! empty(end($s['messages'])['tool_calls'] ?? [])) ->transition('agent', Workflow::END, fn($s) => empty(end($s['messages'])['tool_calls'] ?? [])) ->transition('tools', 'agent');
Laravel AI Integration
LaraGraph integrates with Laravel AI via the AsGraphNode trait.
composer require laravel/ai
AsGraphNode Trait
Add AsGraphNode to a standard Laravel AI agent to make it a Laragraph node:
use Cainy\Laragraph\Contracts\Node; use Cainy\Laragraph\Integrations\LaravelAi\AsGraphNode; use Laravel\Ai\Contracts\Agent; use Laravel\Ai\Promptable; class ResearchAgent implements Agent, Node { use AsGraphNode, Promptable; public function instructions(): string { return 'You are a research assistant.'; } protected function getAgentPrompt(): string { return 'Research: ' . ($this->state['topic'] ?? 'general'); } }
Structured Output
If your agent implements HasStructuredOutput, the trait maps structured response keys directly to state mutation keys:
use Laravel\Ai\Contracts\HasStructuredOutput; use Illuminate\Contracts\JsonSchema\JsonSchema; class ClassifierAgent implements Agent, Node, HasStructuredOutput { use AsGraphNode, Promptable; public function instructions(): string { return 'Classify the input into a category and confidence score.'; } public function schema(JsonSchema $schema): array { return [ 'category' => $schema->string()->required(), 'confidence' => $schema->number()->min(0)->max(1)->required(), ]; } }
After execution, state['category'] and state['confidence'] are set directly.
Tool-Using Agents
Laravel AI agents implementing HasTools are automatically detected by the compiler — tool loop injection works exactly as with PrismNode:
use Laravel\Ai\Contracts\HasTools; class WeatherAgent implements Agent, Node, HasTools { use AsGraphNode, Promptable; public function tools(): array { return [new GetWeather]; } }
Sub-graph Workflows
Any Workflow subclass implements Node and can be embedded inside another workflow. The sub-graph is identified by its class name — no snapshot serialization required.
class ResearchSubgraph extends Workflow { public function definition(): void { $this->addNode('search', SearchNode::class) ->addNode('extract', ExtractNode::class) ->transition(Workflow::START, 'search') ->transition('search', 'extract') ->transition('extract', Workflow::END); } } class ParentPipeline extends Workflow { public function definition(): void { $this->addNode('research', ResearchSubgraph::class) ->addNode('write', WriteNode::class) ->transition(Workflow::START, 'research') ->transition('research', 'write') ->transition('write', Workflow::END); } }
When the engine executes a sub-graph node:
- A child
WorkflowRunis created and linked viaparent_run_id/parent_node_name. - The child workflow starts normally — its nodes run as independent queue jobs.
- The parent run pauses at the sub-graph node.
- When the child completes, the engine resumes the parent automatically.
- The parent node returns the state delta from the child's final state as a mutation.
$run->parent; // ?WorkflowRun $run->children; // Collection<WorkflowRun>
Embedded vs Send-dispatched
Sub-graphs behave differently depending on how they're reached:
| Reached via | Child initial state | Delta merged back |
|---|---|---|
Embedded (transition('sub', …)) |
Parent's full state | Child's state diff vs parent's state |
Send::toWorkflow('sub', $payload) |
The Send payload only | Child's full final state |
Use Send::toWorkflow() for per-item pipelines where parent state should not
leak into the child. Use plain embedding when the child should see and operate
on parent state (e.g. a reusable "research this topic" sub-graph).
Child Failure Cascade
By default, when a child workflow fails, the failure cascades to the parent:
the parent is marked Failed, its pointers are cleared, and WorkflowFailed
fires with the child's exception.
Opt out on a per-child-workflow basis by overriding shouldCascadeFailure():
class ToleratingChildWorkflow extends Workflow { public function shouldCascadeFailure(): bool { return false; // parent stays Paused; caller can decide what to do } public function definition(): void { /* ... */ } }
Useful for map-reduce patterns where individual child failures should be aggregated rather than fatal.
Accessing parent metadata from a child
Inside a child node, use the context accessors:
public function handle(NodeExecutionContext $context, array $state): array { $profileId = $context->parentMetadata()['profile_id'] ?? null; // ... }
$context->parentRunId, $context->parentNodeName, and $context->parentMetadata()
are all null at the top level.
Recursion Limit
The engine tracks total node executions per run and throws RecursionLimitExceeded if the limit is hit.
The default limit is config('laragraph.recursion_limit', 100). Override it per workflow:
class MyPipeline extends Workflow { public function definition(): void { $this->withRecursionLimit(500); // ... } }
For legitimate fan-out workflows (e.g. 50 items, each running a 5-step
pipeline), raise this to items × steps + headroom. The exception message
includes a hint pointing at ->withRecursionLimit() when the limit is hit.
Events
LaraGraph fires events throughout the workflow lifecycle. All events implement ShouldBroadcast and are broadcast on the workflow channel when broadcasting is enabled.
| Event | Payload |
|---|---|
WorkflowStarted |
runId, workflowKey |
NodeExecuting |
runId, nodeName |
NodeCompleted |
runId, nodeName, mutation, tags |
NodeFailed |
runId, nodeName, exception |
WorkflowCompleted |
runId, workflowKey |
WorkflowFailed |
runId, exception, workflowKey |
WorkflowResumed |
runId, workflowKey |
HumanInterventionRequired |
runId, nodeName, reason |
Broadcasting
Enable broadcasting in your .env:
LARAGRAPH_BROADCASTING_ENABLED=true LARAGRAPH_CHANNEL_TYPE=private # public | private | presence LARAGRAPH_CHANNEL_PREFIX=workflow.
Each run broadcasts on channel {prefix}{runId} (e.g. workflow.42). Authorize the channel in routes/channels.php as needed.
Configuration
// config/laragraph.php return [ // Queue name for ExecuteNode jobs (overridden per-node via HasQueue) 'queue' => env('LARAGRAPH_QUEUE', 'default'), // Queue connection (null = default connection) 'connection' => env('LARAGRAPH_QUEUE_CONNECTION'), // Hold jobs until the wrapping transaction commits (enable if you call // Laragraph::run() inside your own DB transactions) 'after_commit' => env('LARAGRAPH_AFTER_COMMIT', false), // Default max attempts per node (overridden per-node via HasRetryPolicy) 'max_node_attempts' => 3, // Default node timeout in seconds 'node_timeout' => 60, // Maximum node executions per run before RecursionLimitExceeded is thrown 'recursion_limit' => 100, // Prune completed/failed runs older than this many days 'prunable_after_days' => 30, // Default retry backoff settings (overridden per-node via HasRetryPolicy) 'retry' => [ 'initial_interval' => 0.5, 'backoff_factor' => 2.0, 'max_interval' => 128.0, 'jitter' => true, ], 'broadcasting' => [ 'enabled' => env('LARAGRAPH_BROADCASTING_ENABLED', false), 'channel_type' => env('LARAGRAPH_CHANNEL_TYPE', 'private'), 'channel_prefix' => env('LARAGRAPH_CHANNEL_PREFIX', 'workflow.'), ], ];
Testing
composer test
LaraGraph works with the sync queue driver in tests — set QUEUE_CONNECTION=sync in your phpunit.xml and runs execute synchronously, making assertions straightforward:
use Cainy\Laragraph\Facades\Laragraph; use Cainy\Laragraph\Enums\RunStatus; it('completes the pipeline', function () { $run = Laragraph::run(MyPipeline::class, ['input' => 'hello']); expect($run->fresh()) ->status->toBe(RunStatus::Completed) ->state->toHaveKey('output'); });
For unit-testing individual nodes, use the makeContext() test helper:
use function Cainy\Laragraph\Tests\makeContext; it('returns a summary mutation', function () { $node = new SummarizeNode(); $mutation = $node->handle( makeContext(nodeName: 'summarize'), ['text' => 'Long article...'], ); expect($mutation)->toHaveKey('summary'); });
License
The MIT License (MIT). Please see License File for more information.