squop/pipeline-flow-core

Laravel package for lightweight pipeline flow tracing

Maintainers

Package info

github.com/Daniil-Solovyev/pipeline-flow-core

pkg:composer/squop/pipeline-flow-core

Statistics

Installs: 1

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

1.0.0 2026-05-06 07:40 UTC

This package is auto-updated.

Last update: 2026-05-06 07:57:26 UTC


README

pipeline-flow-core — Laravel-пакет для записи и чтения pipeline-trace'ов.

Подходит для сценариев, где нужно:

  • собирать шаги backend-процесса по trace_id;
  • видеть статус сценария: running, ok, fail;
  • получать последние запуски и детали запуска через HTTP API;
  • хранить trace'ы в redis или database.

Что входит

  • PipelineManager для записи шагов
  • ScenarioTraceService для выдачи и прокидывания trace_id
  • встроенные драйверы хранения redis и database
  • read-only API для просмотра pipeline run'ов
  • встроенный HTML dashboard для мониторинга pipeline run'ов
  • artisan-команда очистки database-хранилища

Установка

1. Подключение через Composer

Обычный вариант:

composer require squop/pipeline-flow-core

Локальный пакет через path repository:

{
  "repositories": [
    {
      "type": "path",
      "url": "../packages/pipeline-flow-core",
      "options": {
        "symlink": true
      }
    }
  ],
  "require": {
    "squop/pipeline-flow-core": "^0.1.0"
  }
}

После этого:

composer update squop/pipeline-flow-core

Провайдер пакета подключается через Laravel auto-discovery.

2. Опубликовать конфиг

php artisan vendor:publish --tag=pipeline-flow-config

Будет создан файл config/pipeline-flow.php.

3. Для database driver опубликовать миграции

php artisan vendor:publish --tag=pipeline-flow-migrations
php artisan migrate

Если используете redis, этот шаг не нужен.

Конфиг

Минимальный пример:

<?php

return [
    'enabled' => true,
    'service_name' => 'backend',
    'driver' => 'redis',
    'run_ttl_seconds' => 3600,
    'recent_limit' => 100,
    'trace_header_name' => 'X-Scenario-Trace-Id',
    'routes' => [
        'enabled' => true,
        'prefix' => 'api/internal/pipelines',
        'authorize' => App\Support\PipelineRouteAuthorizer::class,
    ],
    'ui' => [
        'enabled' => true,
        'prefix' => 'pipelines',
        'title' => 'Pipeline Flow Monitor',
        'refresh_interval_seconds' => 10,
    ],
    'drivers' => [
        'redis' => [
            'connection' => 'default',
            'prefix' => 'pipeline:',
        ],
        'database' => [
            'connection' => null,
            'table' => 'pipeline_runs',
        ],
    ],
    'pipelines' => [
        'join_room' => [
            'active_timeout_seconds' => 30,
            'required_steps' => [
                'room_join',
                'livekit_token_issue',
            ],
            'optional_steps' => [
                'reverb_endpoint_resolve',
                'broadcast_auth',
            ],
            'success_step' => 'livekit_token_issue',
        ],
    ],
];

Ключевые параметры

  • enabled — включает пакет
  • service_name — имя сервиса, которое записывается в steps
  • driverredis или database
  • run_ttl_seconds — TTL trace'а
  • recent_limit — сколько последних trace'ов хранить в списке
  • trace_header_name — HTTP header для trace id
  • routes.enabled — подключать ли встроенные routes
  • routes.prefix — префикс HTTP API
  • routes.authorize — авторизация встроенного API
  • ui.enabled — подключать ли встроенный HTML dashboard
  • ui.prefix — URL dashboard
  • ui.title — заголовок dashboard
  • ui.refresh_interval_seconds — интервал автообновления UI

Авторизация встроенного API

routes.authorize поддерживает:

  • true — доступ разрешён
  • false — доступ запрещён
  • callable
  • строку с классом
  • строку вида Class@method

Пример:

'routes' => [
    'enabled' => true,
    'prefix' => 'api/internal/pipelines',
    'authorize' => App\Support\PipelineRouteAuthorizer::class,
],

Авторизатор должен вернуть bool.

Тот же authorizer используется и для встроенного dashboard.

Использование

1. Выдать или прочитать trace id

use Squop\PipelineFlow\ScenarioTraceService;

$trace_id = app(ScenarioTraceService::class)->issue();
$trace_id = app(ScenarioTraceService::class)->resolveFromRequest($request);

2. Записать успешный шаг

use Squop\PipelineFlow\PipelineManager;

app(PipelineManager::class)->recordStepOk(
    pipeline_id: 'join_room',
    trace_id: $trace_id,
    step_name: 'room_join',
    duration_ms: 42,
    run_meta: [
        'user_id' => 10,
        'room_id' => 15,
    ],
    step_meta: [
        'source' => 'api',
    ],
);

3. Записать ошибку

app(PipelineManager::class)->recordStepFail(
    pipeline_id: 'join_room',
    trace_id: $trace_id,
    step_name: 'livekit_token_issue',
    duration_ms: 150,
    error: 'livekit_service_not_configured',
    run_meta: [
        'user_id' => 10,
    ],
);

4. Вернуть trace id в ответе

use Squop\PipelineFlow\ScenarioTraceService;

$response = response()->json([
    'message' => 'ok',
]);

return app(ScenarioTraceService::class)->applyToJsonResponse($response, $trace_id);

Пакет:

  • добавит header X-Scenario-Trace-Id
  • добавит поле scenario_trace_id в JSON

Встроенный HTTP API

Если routes.enabled = true, пакет поднимет:

  • GET /api/internal/pipelines/recent
  • GET /api/internal/pipelines/{trace_id}

По умолчанию префикс берётся из routes.prefix.

GET /api/internal/pipelines/recent поддерживает query params:

  • page
  • per_page
  • status (ok, fail, running)
  • search (по trace_id и pipeline_id)

Ответ:

{
  "runs": [],
  "pagination": {
    "page": 1,
    "per_page": 12,
    "total": 0,
    "total_pages": 1,
    "has_prev": false,
    "has_next": false
  }
}

Семантика листинга зависит от driver:

  • redis — debug-режим: API пагинирует и фильтрует только окно последних run'ов из recent_limit
  • database — SQL-режим: API пагинирует и фильтрует весь неистёкший набор записей в таблице

Встроенный Dashboard UI

Если ui.enabled = true, пакет поднимет HTML dashboard по ui.prefix.

Пример:

  • GET /pipelines

Dashboard:

  • показывает последние run'ы по всем pipeline'ам через серверную пагинацию;
  • умеет поиск по trace_id и pipeline_id;
  • фильтрует по статусу;
  • показывает steps и meta по выбранному run;
  • автообновляется через встроенный JSON API.

Рекомендуемый вариант для запуска:

  1. включить enabled
  2. включить routes.enabled
  3. включить ui.enabled
  4. указать routes.authorize как свой authorizer
  5. открыть /{ui.prefix}

Логика статуса run

  • если есть шаг со статусом fail, весь run становится fail
  • если найден success_step со статусом ok, run становится ok
  • иначе run остаётся running
  • если run слишком долго находится в running, он будет помечен как timeout при чтении

Драйверы хранения

Redis

Используйте, если нужен быстрый временный storage без отдельной таблицы.

Нужен настроенный Redis connection из Laravel config.

Для GET /api/internal/pipelines/recent Redis использует окно последних trace'ов из recent_limit.

Database

Используйте, если нужен более предсказуемый storage в SQL.

Для GET /api/internal/pipelines/recent database driver использует настоящую SQL-пагинацию по неистёкшим строкам.

Для очистки старых записей:

php artisan pipeline-flow:prune

Команда удаляет истёкшие записи из database driver.

Расширение своими драйверами

Можно зарегистрировать свой driver через PipelineStoreRegistry.

Пример:

use Squop\PipelineFlow\Config\PipelineConfig;
use Squop\PipelineFlow\Stores\PipelineStoreRegistry;

app(PipelineStoreRegistry::class)->extend(
    'custom',
    function ($app, PipelineConfig $config) {
        return new CustomPipelineStore($config);
    }
);

После этого в конфиге:

'driver' => 'custom',

Класс драйвера должен реализовать Squop\PipelineFlow\Contracts\PipelineStoreInterface.

Что пакет не делает

  • не настраивает Docker, nginx, Redis и прочую инфраструктуру
  • не встраивает trace-запись в ваши контроллеры автоматически

Инструментацию сценариев вы добавляете сами в коде приложения.