philiprehberger/php-pipeline

Composable pipeline pattern for processing data through ordered stages

Maintainers

Package info

github.com/philiprehberger/php-pipeline

pkg:composer/philiprehberger/php-pipeline

Statistics

Installs: 1

Dependents: 0

Suggesters: 0

Stars: 1

Open Issues: 0

v1.1.1 2026-03-17 20:06 UTC

This package is auto-updated.

Last update: 2026-03-17 20:07:15 UTC


README

Tests Latest Version on Packagist License

Composable pipeline pattern for processing data through ordered stages.

Requirements

Dependency Version
PHP ^8.2

Installation

composer require philiprehberger/php-pipeline

Usage

Basic Pipeline

use PhilipRehberger\Pipeline\Pipeline;

$result = Pipeline::send('hello world')
    ->through([
        fn (string $value, \Closure $next) => $next(strtoupper($value)),
        fn (string $value, \Closure $next) => $next(str_replace(' ', '-', $value)),
    ])
    ->thenReturn();

// "HELLO-WORLD"

Class-Based Stages

Implement the Stage contract for reusable, testable stages:

use PhilipRehberger\Pipeline\Contracts\Stage;
use Closure;

class TrimStage implements Stage
{
    public function handle(mixed $passable, Closure $next): mixed
    {
        return $next(trim($passable));
    }
}

$result = Pipeline::send('  hello  ')
    ->pipe(TrimStage::class)
    ->thenReturn();

// "hello"

Conditional Stages

$isAdmin = true;

$result = Pipeline::send($data)
    ->pipe(ValidateStage::class)
    ->when($isAdmin, AdminEnrichStage::class)
    ->unless($isAdmin, GuestFilterStage::class)
    ->process();

Pipeline Context

Share state between stages using PipelineContext:

use PhilipRehberger\Pipeline\Pipeline;
use PhilipRehberger\Pipeline\PipelineContext;

$context = new PipelineContext();

$result = Pipeline::send($data)
    ->withContext($context)
    ->through([
        function (mixed $value, \Closure $next, PipelineContext $ctx) {
            $ctx->set('started_at', microtime(true));
            return $next($value);
        },
        function (mixed $value, \Closure $next, PipelineContext $ctx) {
            // Access values set by earlier stages
            $started = $ctx->get('started_at');
            return $next($value);
        },
    ])
    ->thenReturn();

// Read context after pipeline completes
$context->all();

Tap

Add side-effect stages that observe the payload without modifying it:

$result = Pipeline::send('hello')
    ->pipe(fn (string $value, \Closure $next) => $next(strtoupper($value)))
    ->tap(function (string $value) {
        logger()->info('After uppercase: ' . $value);
    })
    ->thenReturn();

// "HELLO" — tap does not change the payload

Error Handling

$result = Pipeline::send($data)
    ->through([RiskyStage::class])
    ->onFailure(function (\Throwable $e, mixed $passable) {
        return $passable; // Return original data on failure
    })
    ->process();

API

Method Description
Pipeline::send(mixed $passable) Create a new pipeline with the given data
->through(array $stages) Set the array of stages
->pipe(string|callable $stage) Append a single stage
->when(bool $condition, string|callable $stage) Add stage if condition is true
->unless(bool $condition, string|callable $stage) Add stage if condition is false
->withContext(PipelineContext $context) Attach shared context passed to stages
->tap(callable $fn) Add a side-effect stage that does not modify the payload
->onFailure(callable $handler) Register a failure handler
->process() Execute the pipeline and return the result
->thenReturn() Alias for process()

Development

composer install
vendor/bin/phpunit
vendor/bin/pint --test
vendor/bin/phpstan analyse

License

MIT