menumbing / event-stream
Event stream component for Hyperf
Requires
- php: >=8.1
- hyperf/config: ^3.1
- hyperf/di: ^3.1
- hyperf/event: ^3.1
- hyperf/process: ^3.1
- hyperf/redis: ^3.1
- hyperf/signal: ^3.1
- menumbing/contracts: ^1.0
- menumbing/serializer: ^1.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- mockery/mockery: ^1.0
- phpstan/phpstan: ^1.0
- phpunit/phpunit: ^10.0
- swoole/ide-helper: dev-master
Suggests
- swow/swow: Required to create swow components.
This package is auto-updated.
Last update: 2025-06-11 15:59:22 UTC
README
A powerful event streaming component for Hyperf applications, providing a simple way to produce and consume events using stream engine like Redis and Kafka.
Installation
You can install the package via composer:
composer require menumbing/event-stream
Requirements
- PHP >= 8.1
- Hyperf Framework >= 3.1
- Redis Server
Configuration
Publish the configuration file:
php bin/hyperf.php vendor:publish menumbing/event-stream
This will create a event_stream.php
configuration file in your config/autoload
directory.
Configuration Options
The configuration file contains the following options:
return [ // The name of the consumer group used for stream processing 'group_name' => env('APP_NAME', 'menumbing'), // Available stream drivers configuration 'drivers' => [ 'default' => [ 'driver' => RedisStream::class, 'id' => DefaultRedisId::class, 'options' => [ 'pool' => 'default', // Redis connection pool name 'read_from' => ReadMessageFrom::GROUP_CREATED, // Starting point for reading messages 'wait_time' => 100, // Wait time in milliseconds between read attempts 'retention_period' => 7, // Message retention period in days 'approx' => false, // Use approximate for deleting messages ], ], ], // Consumer configuration settings 'consumer' => [ 'processes' => [], // Array of process configurations for different streams 'block_for' => 1, // The number of seconds to sleep between processing batches 'retry_after' => 60, // The number of seconds to retry zombie message due to offline consumer or failed process ], // Serialization configuration for messages 'serialization' => [ 'serializer' => 'default', // The serializer service to use 'format' => 'json', // The format to serialize messages to ] ];
Usage
Producing Events
To produce events, you need to:
- Create an event class
- Annotate it with
#[ProducedEvent]
- Dispatch the event using the event dispatcher
<?php namespace App\Event; use Menumbing\EventStream\Annotation\ProducedEvent; #[ProducedEvent(stream: 'user-events', name: 'user.created', driver: 'default')] class UserCreated { public function __construct( public readonly int $userId, public readonly string $username, public readonly string $email ) { } }
When the event dispatched with EventDispatcher it will be produced to event stream with name stream-1
, event name user.created
and driver default
.
Here is how you dispatch the event:
<?php namespace App\Controller; use App\Event\UserCreated; use Hyperf\Di\Annotation\Inject; use Psr\EventDispatcher\EventDispatcherInterface; class UserController { #[Inject] protected EventDispatcherInterface $eventDispatcher; public function create() { // Create user logic... $userId = 123; $username = 'john_doe'; $email = 'john@example.com'; // Dispatch event $this->eventDispatcher->dispatch(new UserCreated($userId, $username, $email)); // ... } }
Consuming Events
To consume events, you need to:
- Create an event class on other service
- Annotate it with
#[ConsumedEvent]
- Configure it in the
event_stream.php
configuration file
Example consumer process:
<?php namespace App\Event; use Menumbing\EventStream\Annotation\ConsumedEvent; #[ConsumedEvent(stream: 'user-events', name: 'user.created', driver: 'default', retries: 0)] class ConsumeUserCreated { public function __construct( public readonly int $userId, public readonly string $username, public readonly string $email ) { } }
It will automatically create a consumer process for each stream. The option retries
indicate how many attempt should consumer process the message until it fire ConsumeFailed
event. Alternatively, you can configure consumers in the event_stream.php
configuration file:
'consumer' => [ 'processes' => [ 'user-events' => 2, // This will create 2 processes for 'user-events' stream ], 'block_for' => 1, // The number of seconds to sleep between processing batches of messages. 'retry_after' => 30, // The number of seconds to retry zombie message due to offline consumer or failed process. ],
Handling Consumed Events
To handle consumed events, create an event listeners for your event classes:
<?php namespace App\Listener; use App\Event\ConsumeUserCreated; use Hyperf\Event\Annotation\Listener; use Hyperf\Event\Contract\ListenerInterface; #[Listener] class UserCreatedListener implements ListenerInterface { public function listen(): array { return [ ConsumeUserCreated::class, ]; } public function process(object $event): void { // Handle the UserCreated event if ($event instanceof ConsumeUserCreated) { // Process the event echo "User created: {$event->username} ({$event->email})\n"; } } }
Debugging
The package includes a debug listener that logs various events when debugging is enabled. To enable debugging, set app_debug
to true
in your configuration.
Advanced Usage
Custom ID Providers
You can create custom ID providers by implementing the IdProviderInterface
:
<?php namespace App\EventStream; use Menumbing\Contract\EventStream\IdProviderInterface; class CustomIdProvider implements IdProviderInterface { public function newId(): string { // Generate and return a custom ID return uniqid('custom-', true); } }
Then configure it in your event_stream.php
:
'drivers' => [ 'custom' => [ 'driver' => RedisStream::class, 'id' => CustomIdProvider::class, 'options' => [ // ... ], ], ],
Custom Stream Drivers
You can create custom stream drivers by implementing the StreamInterface
:
<?php namespace App\EventStream; use Generator; use Menumbing\Contract\EventStream\StreamInterface; use Menumbing\Contract\EventStream\StreamMessage; class CustomStreamDriver implements StreamInterface { public function createGroup(string $name, string $stream): bool { // Implementation } public function publish(StreamMessage $message): string { // Implementation } public function subscribe(string $consumer, string $group, array $streams): Generator { // Implementation } public function ack(string $group, string $stream, array $ids): bool { // Implementation } }
Then configure it in your event_stream.php
:
'drivers' => [ 'custom' => [ 'driver' => CustomStreamDriver::class, 'options' => [ // Custom options ], ], ],
License
The MIT License (MIT). Please see License File for more information.