sbooker / domain-events-persistence
Domain events storage
Installs: 3 348
Dependents: 1
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 1
Requires
- php: ^7.4 || ^8.0
- psr/log: ^1.0 || ^2.0 || ^3.0
- sbooker/domain-events: ^2.0
- sbooker/persistent-pointer: ^1.0
- sbooker/transaction-manager: ^2.3
- symfony/serializer: ^5.0 || ^6.0
Requires (Dev)
- gamez/ramsey-uuid-normalizer: ^2.0 || ^3.0
- phpunit/phpunit: ^9.0
- symfony/property-access: ^5.0 || ^6.0
- symfony/property-info: ^5.0 || ^6.0
- vimeo/psalm: ^4.7
Suggests
- gamez/ramsey-uuid-normalizer: ^2.0 || ^3.0
- symfony/property-access: ^5.0
- symfony/property-info: ^5.0
This package is auto-updated.
Last update: 2025-08-22 16:07:02 UTC
README
Domain Events Persistence Library (sbooker/domain-events-persistence
)
Готовая реализация паттерна Transactional Outbox для библиотеки sbooker/domain-events.
Назначение библиотеки
Эта библиотека решает проблему надежности в системах, управляемых событиями: как гарантировать, что доменное событие будет обработано, если оно было создано в рамках транзакции, которая успешно завершилась?
sbooker/domain-events-persistence
решает эту проблему, сохраняя ваши доменные события в постоянное хранилище (например, в ту же базу данных) внутри той же транзакции, что и ваши доменные сущности. Это достигается благодаря глубокой интеграции с sbooker/transaction-manager.
Затем отдельный фоновый процесс (консьюмер) считывает эти события и обрабатывает их, используя sbooker/persistent-pointer для отслеживания прогресса.
Ключевые особенности
- Атомарное сохранение: События сохраняются в той же транзакции, что и агрегаты. Гарантируется, что либо сохраняется всё, либо ничего.
- Полностью автоматическое созранение событий:
DomainEventPreCommitProcessor
автоматически извлекает события из ваших сущностей прямо перед коммитом транзакции. Больше не нужно вызывать$entity->dispatchEvents()
вручную! - Надежный консьюмер: Встроенный механизм
Consumer
используетpersistent-pointer
для отслеживания позиции последнего обработанного события, что гарантирует обработку "хотя бы один раз" (at-least-once). - Готовность к параллельной обработке: Архитектура консьюмеров позволяет запускать несколько воркеров для обработки событий без брокера сообщений.
- Гибкое именование событий: Поддерживаются разные стратегии именования событий (по имени класса или через карту
MapNameGiver
) для долгосрочной стабильности.
Установка
composer require sbooker/domain-events-persistence
Вам также понадобятся реализации для ваших фреймворков и ORM:
# Основные зависимости composer require sbooker/domain-events sbooker/transaction-manager sbooker/persistent-pointer # Реализация для Doctrine composer require sbooker/doctrine-transaction-handler
Быстрый старт
Шаг 1: Конечная цель: чистый код слоя приложения
Благодаря полной автоматизации, ваш код в слое проложения становится предельно простым и не знает ничего о событиях.
// src/UseCase/CreateProduct/Handler.php final class Handler { private TransactionManager $transactionManager; // ... public function handle(Command $command): void { $this->transactionManager->transactional(function () use ($command): void { $product = new Product(/* ... */); // Внутри создается событие $this->transactionManager->persist($product); // Никаких вызовов dispatchEvents()! // Процессор сделает это автоматически перед коммитом. }); } }
Шаг 2: Сборка зависимостей (Composition Root)
Чтобы достичь такой простоты, вам нужно один раз собрать все компоненты вместе в вашем DI-контейнере.
// bootstrap.php или ваш DI-контейнер // --- Предполагается, что у вас уже есть эти сервисы --- /** @var Sbooker\TransactionManager\TransactionHandler $transactionHandler */ /** @var Symfony\Component\Serializer\SerializerInterface $serializer */ /** @var App\Infrastructure\Security\MyActorStorage $actorStorage */ /** @var Psr\Log\LoggerInterface $logger */ /** @var App\Infrastructure\Persistence\DoctrineConsumeStorage $consumeStorage */ // 1. Выбираем стратегию именования событий $eventNameGiver = new Sbooker\DomainEvents\Persistence\ClassNameNameGiver(); // 2. Создаем Publisher, который будет сохранять события в БД $persistentPublisher = new Sbooker\DomainEvents\Persistence\PersistentPublisher($eventNameGiver, $serializer); // 3. Создаем декоратор, который добавляет Actor'а к событиям (опционально) $actorAwarePublisher = new Sbooker\DomainEvents\ActorAwarePublisher($persistentPublisher, $actorStorage); // 4. Создаем процессор, автоматизирующий сохранение событий $preCommitProcessor = new Sbooker\DomainEvents\Persistence\DomainEventPreCommitProcessor($actorAwarePublisher); // 5. Создаем TransactionManager и регистрируем в нем наш процессор. // TransactionManager автоматически вызовет setTransactionManager() на процессоре и паблишере. $transactionManager = new Sbooker\TransactionManager\TransactionManager( $transactionHandler, $preCommitProcessor ); // 6. Создаем фабрику для консьюмеров $consumerFactory = new Sbooker\DomainEvents\Persistence\ConsumerFactory( $consumeStorage, $transactionManager, $eventNameGiver, $serializer, // Serializer здесь выступает как Denormalizer $logger ); // Теперь все готово для использования! $handler = new Handler($transactionManager, /* ... */);
Шаг 3: Создание воркера-консьюмера
Создайте консольную команду или фоновый процесс (рекомендуется) на базе решений на event loop, например на sbooker/event-loop-worker, который будет в цикле обрабатывать события.
// src/Email/Infrastructure/ProcessProductEventsCommand.php use Sbooker\DomainEvents\Persistence\ConsumerFactory; use App\Subscribers\EmailNotifier; // Ваш подписчик на события final class ProcessProductEventsCommand extends Command { private ConsumerFactory $consumerFactory; private EmailNotifier $subscriber; // Ваш сервис-обработчик // ... constructor ... protected function execute(InputInterface $input, OutputInterface $output): int { // Создаем консьюмер для конкретного подписчика. // Имя 'email_notifier' будет использоваться для создания Pointer'а. $consumer = $this->consumerFactory->createBySubscriber( 'email_notifier', $this->subscriber ); $output->writeln('Starting event consumer...'); while (true) { // consume() атомарно находит событие, обрабатывает его и сохраняет новую позицию указателя. $processed = $consumer->consume(); if (!$processed) { // Если событий нет, ждем и повторяем sleep(5); } } } }
Продвинутое использование
Внешняя генерация позиций (PositionGenerator
)
По умолчанию, position
для PersistentEvent
предполагается автоинкрементным полем в базе данных, и библиотека не управляет его генерацией. Однако существуют сценарии, когда последовательность событий должна управляться извне:
- Интеграция с legacy-системой, у которой уже есть своя нумерация событий.
- Использование СУБД, которые не поддерживают
SEQUENCE
или имеют проблемы сAUTO_INCREMENT
в кластерных конфигурациях (например, старые версии MySQL).
Для этих случаев предназначен необязательный интерфейс PositionGenerator
.
Решение в рамках экосистемы: sbooker/persistent-sequences
Для решения этой задачи без привлечения внешней инфраструктуры (вроде Redis) была создана библиотека sbooker/persistent-sequences. Она реализует персистентную, конкурентно-безопасную последовательность средствами реляционной СУБД.
Шаг 1: Установите и настройте persistent-sequences
Сначала установите библиотеку:
composer require sbooker/persistent-sequences
Затем создайте адаптер, который будет связывать persistent-sequences
с интерфейсом PositionGenerator
.
// src/Infrastructure/Persistence/SequencePositionGenerator.php use Sbooker\DomainEvents\Persistence\PositionGenerator; use Sbooker\PersistentSequences\SequenceGenerator; use Sbooker\PersistentSequences\Algorithm; final class SequencePositionGenerator implements PositionGenerator { private const SEQUENCE_NAME = 'domain_events'; private SequenceGenerator $sequenceGenerator; private Algorithm $algorithm; public function __construct(SequenceGenerator $sequenceGenerator, Algorithm $algorithm) { $this->sequenceGenerator = $sequenceGenerator; } public function next(): int { // Получаем следующее значение из именованной последовательности return $this->sequenceGenerator->next(self::SEQUENCE_NAME, $this->algorithm); } }
Шаг 2: Передайте его в PersistentPublisher
Теперь при сборке зависимостей просто передайте ваш генератор третьим аргументом в конструктор PersistentPublisher
.
// bootstrap.php или ваш DI-контейнер /** @var Sbooker\TransactionManager\TransactionHandler $transactionHandler */ /** @var Symfony\Component\Serializer\SerializerInterface $serializer */ /** @var SequencePositionGenerator $positionGenerator */ // <-- Ваш генератор // 1. Создаем Publisher, передавая в него PositionGenerator $persistentPublisher = new Sbooker\DomainEvents\Persistence\PersistentPublisher( $eventNameGiver, $serializer, $positionGenerator // <-- Вот он! ); // ... остальная сборка ... $transactionManager = new Sbooker\TransactionManager\TransactionManager( $transactionHandler, $preCommitProcessor );
Теперь при сохранении события PersistentPublisher
будет вызывать ваш SequencePositionGenerator
для получения и установки позиции, обеспечивая единую, монотонно возрастающую последовательность для всех событий.
License
See LICENSE file.