simsoft / data-flow
A lightweight, composable ETL (Extract, Transform, Load) pipeline library for PHP with fluent API, spreadsheet support, and flow control.
Requires
- php: >=8.3
- league/flysystem: ^3
- openspout/openspout: ^4.0
- phpoffice/phpspreadsheet: ^5.7
- psr/log: ^3.0
- symfony/cache: ^7.4
Requires (Dev)
- phpmd/phpmd: ^2.15
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^11
- simsoft/fliq: ^1.0
Suggests
- simsoft/fliq: Required for ActiveQueryExtractor — high-performance Active Record / ORM
This package is auto-updated.
Last update: 2026-05-23 19:14:26 UTC
README
A lightweight, composable ETL pipeline library for PHP 8.3+
DataFlow helps you move data from one place to another — read from a source (database, CSV, API), transform it (filter, map, validate, enrich), and write it to a destination (database, spreadsheet, file). This pattern is called ETL (Extract, Transform, Load) and is the backbone of data migration, reporting, syncing, and batch processing.
With DataFlow, you describe your pipeline as a fluent chain:
(new DataFlow()) ->from($source) // Extract: where data comes from ->transform($logic) // Transform: reshape, filter, validate ->load($destination) // Load: where data goes ->run();
No framework required. No external services. Just PHP.
Why This Library
- Fluent, composable API — chain extractors, transformers, and loaders in a single readable expression
- Built-in resilience — retry with exponential backoff + jitter, circuit breaker, and checkpoint/resume without external dependencies
- Zero-overhead opt-in — every resilience feature uses the null object pattern; disabled features cost nothing at runtime
- Generator-based streaming — constant memory footprint regardless of dataset size
- Per-stage error strategies — configure Skip, Retry, Throw, or LogAndContinue independently on each stage
- Crash recovery — checkpoint/resume enables long-running pipelines to recover from failures without reprocessing from scratch
- Circuit breaker — prevents cascading failures when downstream services degrade, a pattern common in microservices (Resilience4j, Polly) but unique among PHP ETL libraries
- Dead letter collection — failed and circuit-open rows are captured with full context for inspection or reprocessing
- Inline schema validation — validate row data with pipe-delimited rules ( Laravel-style syntax) without leaving the pipeline
- Real-time metrics — pluggable MetricsExporter interface for emitting events to logging, StatsD, Prometheus, or custom systems
- Dry-run mode — validate entire pipelines without performing actual writes
Install
composer require simsoft/data-flow
Basic Usage
Example using extract, transform and load.
require "vendor/autoload.php"; use Simsoft\DataFlow\DataFlow; (new DataFlow()) ->from([1, 2, 3]) ->transform(function($num) { return $num * 2; }) ->load(function($num) { echo $num . PHP_EOL; }) ->run(); // Output: // 2 // 4 // 6
Limit
Limit data output.
require "vendor/autoload.php"; use Simsoft\DataFlow\DataFlow; (new DataFlow()) ->from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) ->transform(function($num) { return $num * 2; }) ->limit(5) // output only 5 data. ->load(function($num) { echo $num . PHP_EOL; }) ->run(); // Output: // 2 // 4 // 6 // 8 // 10
Filter
Filter method help you to filter the data.
require "vendor/autoload.php"; use Simsoft\DataFlow\DataFlow; (new DataFlow()) ->from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) ->filter(function($num) { // The call back should return bool. // In this case, return even number only. return $num % 2 === 0; }) ->load(function($num) { echo $num . PHP_EOL; }) ->run(); // Output: // 2 // 4 // 6 // 8 // 10
Chunk
Splitting data into smaller, manageable parts of a fixed size
(new DataFlow()) ->from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) ->chunk(3) // set chunk size ->load(function(array $chunk, $key) { echo $key . '=' . json_encode($chunk, JSON_THROW_ON_ERROR) . PHP_EOL; }) ->run(); // Output: // 0=[1,2,3] // 1=[4,5,6] // 2=[7,8,9] // 3=[10]
Mapping
Mapping method allow you to convey the data to another format. Original keys are preserved; mapped keys are added or overwritten.
(new DataFlow()) ->from([ ['First Name' => 'John', 'Last Name' => 'Doe', 'age' => 20], ['First Name' => 'Jane', 'Last Name' => 'Doe', 'age' => 30], ['First Name' => 'John', 'Last Name' => 'Smith', 'age' => 50], ['First Name' => 'Jane', 'Last Name' => 'Smith', 'age' => 60], ]) ->map([ // rename the key 'first_name' => 'First Name', 'last_name' => 'Last Name', // customise data via callback method. 'full_name' => fn($data) => $data['first_name'] . ' ' . $data['last_name'], 'senior' => fn($data) => $data['age'] > 30 ? 'Yes' : 'No', ]) ->load(function($data) { echo $data['full_name'] . ' is ' . $data['age'] . ' years old. ' . $data['senior'] . PHP_EOL; }) ->run(); // Output: // John Doe is 20 years old. No // Jane Doe is 30 years old. Yes // John Smith is 50 years old. Yes // Jane Smith is 60 years old. Yes
Set New Map
setNewMap() converts source data into a completely new array containing **only
** the mapped keys. Unlike map() which merges into the existing row,
setNewMap() discards all original keys.
(new DataFlow()) ->from([ ['first_name' => 'John', 'last_name' => 'Doe', 'age' => 20, 'status' => 'active', 'internal_id' => 'x99'], ['first_name' => 'Jane', 'last_name' => 'Smith', 'age' => 30, 'status' => 'inactive', 'internal_id' => 'x42'], ]) ->setNewMap([ 'name' => fn($row) => $row['first_name'] . ' ' . $row['last_name'], 'age' => 'age', ]) ->load(function($data) { // $data contains ONLY 'name' and 'age' — no 'status', 'internal_id', etc. echo json_encode($data) . PHP_EOL; }) ->run(); // Output: // {"name":"John Doe","age":20} // {"name":"Jane Smith","age":30}
map() vs setNewMap()
map() |
setNewMap() |
|
|---|---|---|
| Original keys | Preserved | Discarded |
| Result contains | All original keys + mapped keys | Only mapped keys |
| Use case | Add/rename columns while keeping the rest | Reshape into a new structure, drop unwanted fields |
Preview
preview() is a debugging helper that limits the pipeline to N rows and dumps
each row's key and value. Use it to inspect the data structure at any point in
the pipeline.
(new DataFlow()) ->from([ ['name' => 'John', 'email' => 'john@example.com'], ['name' => 'Jane', 'email' => 'jane@example.com'], ['name' => 'Bob', 'email' => 'bob@example.com'], ]) ->map(['full_name' => fn($row) => strtoupper($row['name'])]) ->preview(2) // show first 2 rows then stop ->run(); // Output: // Key: int(0) // Value: array(3) { ["name"]=> "John", ["email"]=> "john@example.com", ["full_name"]=> "JOHN" } // // Key: int(1) // Value: array(3) { ["name"]=> "Jane", ["email"]=> "jane@example.com", ["full_name"]=> "JANE" }
Insert preview() at any point to understand the data shape before writing the
next stage.
Flow Continuation
Connecting flows into a chain.
$flow1 = (new DataFlow()) ->from([1, 2, 3]) ->transform(function($num) { return $num * 2; }); (new DataFlow()) ->from($flow1) // connect flow1 to flow2. ->transform(function($num) { return $num * 3; }) ->load(function($num) { echo $num . PHP_EOL; }) ->run(); // Output: // 6 // 12 // 18
Pipeline Result
Every run() call returns a PipelineResult with execution metadata.
use Simsoft\DataFlow\DataFlow; $result = (new DataFlow()) ->from([1, 2, 3, 4, 5]) ->transform(fn($n) => $n * 2) ->load(fn($n) => $n) ->run(); echo "Processed: {$result->getProcessedRows()} rows\n"; echo "Duration: " . round($result->getDurationMs()) . "ms\n"; echo "Peak memory: " . round($result->getPeakMemoryBytes() / 1024) . " KB\n";
Error Handling
Configure per-stage error strategies for production resilience.
use Simsoft\DataFlow\DataFlow; use Simsoft\DataFlow\Enums\ErrorStrategy; $result = (new DataFlow()) ->from($records) ->transform( (new MyTransformer()) ->withErrorStrategy(ErrorStrategy::Skip) // skip failing rows ->withName('validator') ) ->load(fn($row) => $row) ->run(); echo "Processed: {$result->getProcessedRows()}\n"; echo "Failed: {$result->getFailedRows()}\n";
Available strategies: Throw (default), Skip, Retry, LogAndContinue.
Dry-Run Mode
Validate pipelines without performing actual writes.
$result = (new DataFlow()) ->from($records) ->transform(fn($row) => $row) ->load(new DatabaseLoader()) ->dryRun() ->run(); echo "Would process: {$result->getProcessedRows()} rows\n"; // No data was actually written
Logging & Progress
Inject a PSR-3 logger and track progress on large datasets.
use Simsoft\DataFlow\DataFlow; $result = (new DataFlow()) ->from($largeDataset) ->withLogger($psrLogger) ->onProgress(function (int $count, float $elapsedMs) { echo "\r Processed {$count} rows..."; }, interval: 1000) ->onError(function (\Throwable $e, mixed $row, string $stage) { error_log("[{$stage}] {$e->getMessage()}"); }) ->transform(fn($row) => $row) ->load(fn($row) => $row) ->run();
Advanced Usage
- Using Closure
- Useful Processors
- Customized ETL Processor
- Create Reusable Data Flow
- Using Payload
- Macro & Mixin
- Error Handling
- Observability & Metrics
- Dry-Run Mode
- Schema Validation
- Circuit Breaker
- Checkpoint & Resume
- Metrics Exporter
- Spreadsheet (PhpSpreadsheet)
License
The Simsoft DataFlow is licensed under the MIT License. See the LICENSE file for details