greensight / laravel-phprdkafka-consumer
Opiniated High Level consumer for laravel-phprdkafka
Requires
- php: ^8.0
- ext-rdkafka: *
- greensight/laravel-phprdkafka: ^0.1.4
- illuminate/support: ^7 || ^8
Requires (Dev)
- brianium/paratest: ^6.2
- nunomaduro/collision: ^5.3
- orchestra/testbench: ^6.15
- phpunit/phpunit: ^9.3
- spatie/laravel-ray: ^1.9
This package is auto-updated.
Last update: 2022-01-11 08:39:07 UTC
README
Deprecated, use https://github.com/ensi-platform/laravel-php-rdkafka-consumer instead
Opiniated High Level consumer for greensight/laravel-phprdkafka
Installation
Firstly, you have to install and configure greensight/laravel-phprdkafka
Then,
composer require greensight/laravel-phprdkafka-consumer
Publish the config file with:
php artisan vendor:publish --provider="Greensight\LaravelPhpRdKafkaConsumer\LaravelPhpRdKafkaConsumerServiceProvider" --tag="kafka-consumer-config"
Now go to config/kafka-consumer.php
and add processors there.
Usage
The package provides php artisan kafka:consume {topic} {consumer=default}
command that executes the first processor that matches given topic and consumer name. Consumer name is taken from greensight/laravel-phprdkafka config file.
Processors in config have the following configuration options:
[ /* | Optional, defaults to `null`. | Here you may specify which topic should be handled by this processor. | Processor handles all topics by default. */ 'topic' => 'stage.crm.fact.registrations.1', /* | Optional, defaults to `null`. | Here you may specify which greensight/laravel-phprdkafka consumer should be handled by this processor. | Processor handles all consumers by default. */ 'consumer' => 'default', /* | Optional, defaults to `action`. | Here you may specify processor's type. Defaults to `action` | Supported types: | - `action` - a simple class with execute method; | - `job` - Laravel Queue Job. It will be dispatched using `dispatch` or `dispatchSync` method; */ 'type' => 'action', /* | Required. | Fully qualified class name of a processor class. */ 'class' => \App\Domain\Communication\Actions\SendConfirmationEmailAction::class, /* | Optional, defaults to `false`. | Proxy messages to Laravel's queue. | Supported values: | - `false` - do not stream message. Execute processor in syncronous mode; | - `true` - stream message to Laravel's default queue; | - `<your-favorite-queue-name-as-string>` - stream message to this queue; */ 'queue' => false, /* | Optional, defaults to 5000. | Kafka consume timeout in milliseconds . */ 'consume_timeout' => 5000, ]
Synchronous processors
Most of the time all tou need is a synchronous processor. A simple example of such processor:
class SendConfirmationEmailAction { public function execute(Message $message): void { // var_dump($message->payload); } }
Queueable processors
If you want to stream message to Laravel's own queue you can use spatie/laravel-queueable-action
If for some reason you don't want to rely on that package you can swich to Laravel Jobs
In both cases you also need to specify 'queue' => true
or 'queue' => 'my-favorite-queue'
in the package's config for a given processor.
Processor using Laravel Job example:
use RdKafka\Message; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; class ConsumeMessageJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable; public function __construct(protected Message $message) { } public function handle(): void { // var_dump($this->message->payload); } }
Testing
composer test
Changelog
Please see CHANGELOG for more information on what has changed recently.
License
The MIT License (MIT). Please see License File for more information.