squop / pipeline-flow-core
Laravel package for lightweight pipeline flow tracing
Requires
- php: ^8.2
- illuminate/console: ^12.0
- illuminate/database: ^12.0
- illuminate/http: ^12.0
- illuminate/redis: ^12.0
- illuminate/routing: ^12.0
- illuminate/support: ^12.0
Requires (Dev)
- orchestra/testbench: ^10.0
- phpunit/phpunit: ^11.5
This package is auto-updated.
Last update: 2026-05-06 07:57:26 UTC
README
pipeline-flow-core — Laravel-пакет для записи и чтения pipeline-trace'ов.
Подходит для сценариев, где нужно:
- собирать шаги backend-процесса по
trace_id; - видеть статус сценария:
running,ok,fail; - получать последние запуски и детали запуска через HTTP API;
- хранить trace'ы в
redisилиdatabase.
Что входит
PipelineManagerдля записи шаговScenarioTraceServiceдля выдачи и прокидыванияtrace_id- встроенные драйверы хранения
redisиdatabase - read-only API для просмотра pipeline run'ов
- встроенный HTML dashboard для мониторинга pipeline run'ов
- artisan-команда очистки database-хранилища
Установка
1. Подключение через Composer
Обычный вариант:
composer require squop/pipeline-flow-core
Локальный пакет через path repository:
{
"repositories": [
{
"type": "path",
"url": "../packages/pipeline-flow-core",
"options": {
"symlink": true
}
}
],
"require": {
"squop/pipeline-flow-core": "^0.1.0"
}
}
После этого:
composer update squop/pipeline-flow-core
Провайдер пакета подключается через Laravel auto-discovery.
2. Опубликовать конфиг
php artisan vendor:publish --tag=pipeline-flow-config
Будет создан файл config/pipeline-flow.php.
3. Для database driver опубликовать миграции
php artisan vendor:publish --tag=pipeline-flow-migrations php artisan migrate
Если используете redis, этот шаг не нужен.
Конфиг
Минимальный пример:
<?php return [ 'enabled' => true, 'service_name' => 'backend', 'driver' => 'redis', 'run_ttl_seconds' => 3600, 'recent_limit' => 100, 'trace_header_name' => 'X-Scenario-Trace-Id', 'routes' => [ 'enabled' => true, 'prefix' => 'api/internal/pipelines', 'authorize' => App\Support\PipelineRouteAuthorizer::class, ], 'ui' => [ 'enabled' => true, 'prefix' => 'pipelines', 'title' => 'Pipeline Flow Monitor', 'refresh_interval_seconds' => 10, ], 'drivers' => [ 'redis' => [ 'connection' => 'default', 'prefix' => 'pipeline:', ], 'database' => [ 'connection' => null, 'table' => 'pipeline_runs', ], ], 'pipelines' => [ 'join_room' => [ 'active_timeout_seconds' => 30, 'required_steps' => [ 'room_join', 'livekit_token_issue', ], 'optional_steps' => [ 'reverb_endpoint_resolve', 'broadcast_auth', ], 'success_step' => 'livekit_token_issue', ], ], ];
Ключевые параметры
enabled— включает пакетservice_name— имя сервиса, которое записывается в stepsdriver—redisилиdatabaserun_ttl_seconds— TTL trace'аrecent_limit— сколько последних trace'ов хранить в спискеtrace_header_name— HTTP header для trace idroutes.enabled— подключать ли встроенные routesroutes.prefix— префикс HTTP APIroutes.authorize— авторизация встроенного APIui.enabled— подключать ли встроенный HTML dashboardui.prefix— URL dashboardui.title— заголовок dashboardui.refresh_interval_seconds— интервал автообновления UI
Авторизация встроенного API
routes.authorize поддерживает:
true— доступ разрешёнfalse— доступ запрещён- callable
- строку с классом
- строку вида
Class@method
Пример:
'routes' => [ 'enabled' => true, 'prefix' => 'api/internal/pipelines', 'authorize' => App\Support\PipelineRouteAuthorizer::class, ],
Авторизатор должен вернуть bool.
Тот же authorizer используется и для встроенного dashboard.
Использование
1. Выдать или прочитать trace id
use Squop\PipelineFlow\ScenarioTraceService; $trace_id = app(ScenarioTraceService::class)->issue();
$trace_id = app(ScenarioTraceService::class)->resolveFromRequest($request);
2. Записать успешный шаг
use Squop\PipelineFlow\PipelineManager; app(PipelineManager::class)->recordStepOk( pipeline_id: 'join_room', trace_id: $trace_id, step_name: 'room_join', duration_ms: 42, run_meta: [ 'user_id' => 10, 'room_id' => 15, ], step_meta: [ 'source' => 'api', ], );
3. Записать ошибку
app(PipelineManager::class)->recordStepFail( pipeline_id: 'join_room', trace_id: $trace_id, step_name: 'livekit_token_issue', duration_ms: 150, error: 'livekit_service_not_configured', run_meta: [ 'user_id' => 10, ], );
4. Вернуть trace id в ответе
use Squop\PipelineFlow\ScenarioTraceService; $response = response()->json([ 'message' => 'ok', ]); return app(ScenarioTraceService::class)->applyToJsonResponse($response, $trace_id);
Пакет:
- добавит header
X-Scenario-Trace-Id - добавит поле
scenario_trace_idв JSON
Встроенный HTTP API
Если routes.enabled = true, пакет поднимет:
GET /api/internal/pipelines/recentGET /api/internal/pipelines/{trace_id}
По умолчанию префикс берётся из routes.prefix.
GET /api/internal/pipelines/recent поддерживает query params:
pageper_pagestatus(ok,fail,running)search(поtrace_idиpipeline_id)
Ответ:
{
"runs": [],
"pagination": {
"page": 1,
"per_page": 12,
"total": 0,
"total_pages": 1,
"has_prev": false,
"has_next": false
}
}
Семантика листинга зависит от driver:
redis— debug-режим: API пагинирует и фильтрует только окно последних run'ов изrecent_limitdatabase— SQL-режим: API пагинирует и фильтрует весь неистёкший набор записей в таблице
Встроенный Dashboard UI
Если ui.enabled = true, пакет поднимет HTML dashboard по ui.prefix.
Пример:
GET /pipelines
Dashboard:
- показывает последние run'ы по всем pipeline'ам через серверную пагинацию;
- умеет поиск по
trace_idиpipeline_id; - фильтрует по статусу;
- показывает steps и meta по выбранному run;
- автообновляется через встроенный JSON API.
Рекомендуемый вариант для запуска:
- включить
enabled - включить
routes.enabled - включить
ui.enabled - указать
routes.authorizeкак свой authorizer - открыть
/{ui.prefix}
Логика статуса run
- если есть шаг со статусом
fail, весь run становитсяfail - если найден
success_stepсо статусомok, run становитсяok - иначе run остаётся
running - если run слишком долго находится в
running, он будет помечен как timeout при чтении
Драйверы хранения
Redis
Используйте, если нужен быстрый временный storage без отдельной таблицы.
Нужен настроенный Redis connection из Laravel config.
Для GET /api/internal/pipelines/recent Redis использует окно последних trace'ов из recent_limit.
Database
Используйте, если нужен более предсказуемый storage в SQL.
Для GET /api/internal/pipelines/recent database driver использует настоящую SQL-пагинацию по неистёкшим строкам.
Для очистки старых записей:
php artisan pipeline-flow:prune
Команда удаляет истёкшие записи из database driver.
Расширение своими драйверами
Можно зарегистрировать свой driver через PipelineStoreRegistry.
Пример:
use Squop\PipelineFlow\Config\PipelineConfig; use Squop\PipelineFlow\Stores\PipelineStoreRegistry; app(PipelineStoreRegistry::class)->extend( 'custom', function ($app, PipelineConfig $config) { return new CustomPipelineStore($config); } );
После этого в конфиге:
'driver' => 'custom',
Класс драйвера должен реализовать Squop\PipelineFlow\Contracts\PipelineStoreInterface.
Что пакет не делает
- не настраивает Docker, nginx, Redis и прочую инфраструктуру
- не встраивает trace-запись в ваши контроллеры автоматически
Инструментацию сценариев вы добавляете сами в коде приложения.