micromus / kafka-bus
This is my package kafka-bus
Fund package maintenance!
Micromus
Requires
- php: ^8.2
- ext-pcntl: *
- ext-rdkafka: *
- psr/log: ^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.64
- pestphp/pest: ^2.34
- pestphp/pest-plugin-arch: ^2.7
- phpstan/extension-installer: ^1.3
- phpstan/phpstan: ^1.12
- phpstan/phpstan-deprecation-rules: ^1.1
- phpstan/phpstan-phpunit: ^1.3
- dev-main
- 1.x-dev
- v0.6.8
- v0.6.7
- v0.6.6
- v0.6.5
- v0.6.4
- v0.6.3
- v0.6.2
- v0.6.1
- v0.6.0
- v0.5.4
- v0.5.3
- v0.5.2
- v0.5.1
- v0.5.0
- v0.4.1
- v0.4.0
- v0.3.0
- v0.2.1
- v0.2.0
- v0.1.0
- v0.0.5
- v0.0.4
- v0.0.3
- v0.0.2
- v0.0.1
- dev-phpstan
- dev-dependabot/github_actions/actions/checkout-5
- dev-dependabot/github_actions/stefanzweifel/git-auto-commit-action-6
- dev-message-batching
- dev-connection-offset
This package is auto-updated.
Last update: 2025-08-25 10:03:57 UTC
README
This is where your description should go. Limit it to a paragraph or two. Consider adding a small example.
Installation
You can install the package via composer:
composer require micromus/kafka-bus
Requirements
- PHP ^8.2
ext-rdkafka
and a running Kafka cluster- Optional for consumers:
ext-pcntl
(to handle stop signals gracefully)
Usage (via Composer)
Quick start: Bus with producer and consumer
Below is a minimal example of wiring the bus, registering a topic, adding a producer route, and running a listener that handles messages from the same topic.
<?php use Micromus\KafkaBus\Bus; use Micromus\KafkaBus\Connections\Config\KafkaConnectionConfig; use Micromus\KafkaBus\Connections\Registry\ConnectionRegistry; use Micromus\KafkaBus\Connections\Registry\DriverRegistry; use Micromus\KafkaBus\Consumers\ConsumerStreamFactory; use Micromus\KafkaBus\Consumers\Handlers\MessageHandler; use Micromus\KafkaBus\Consumers\Handlers\MessageHandlerFactory; use Micromus\KafkaBus\Consumers\Router\ConsumerRoutes; use Micromus\KafkaBus\Consumers\Router\Route as ConsumerRoute; use Micromus\KafkaBus\Producers\Messages\ProducerMessage; use Micromus\KafkaBus\Producers\ProducerStreamFactory; use Micromus\KafkaBus\Topics\Topic; use Micromus\KafkaBus\Topics\TopicRegistry; require __DIR__ . '/vendor/autoload.php'; // Define topics $topicRegistry = (new TopicRegistry()) ->add(new Topic('production.fact.products.1', 'products')); // Create consumer worker (listener) that handles messages from the topic $consumeOptions = [ 'group.id' => 'products-microservice', 'auto.offset.reset' => 'earliest', ]; // Simple synchronous handler example class PrintHandler implements MessageHandler { public function handle(object $message): void { // $message is your domain message from pipeline fwrite(STDOUT, "Handled: " . json_encode($message) . PHP_EOL); } } $worker = new Bus\Listeners\Workers\Worker( 'default-listener', (new ConsumerRoutes()) ->add(new ConsumerRoute($topicRegistry->get('products'), new PrintHandler())), new Bus\Listeners\Workers\Options(additionalOptions: $consumeOptions) ); $workerRegistry = (new Bus\Listeners\Workers\WorkerRegistry()) ->add($worker); // Configure how to route producer messages to topics $routes = (new Bus\Publishers\Router\PublisherRoutes()) ->add(new Bus\Publishers\Router\Route(ProducerMessage::class, $topicRegistry->get('products'))); // Kafka connection(s) $connectionRegistry = new ConnectionRegistry( new DriverRegistry(), ['default' => new KafkaConnectionConfig('127.0.0.1:29092')] ); // Create Bus $publisherFactory = new Bus\Publishers\PublisherFactory( new ProducerStreamFactory(), $routes ); $listenerFactory = new Bus\Listeners\ListenerFactory( new ConsumerStreamFactory(new MessageHandlerFactory()), $workerRegistry ); $bus = new Bus( new Bus\ThreadRegistry( $connectionRegistry, new Bus\ThreadFactory($listenerFactory, $publisherFactory) ), 'default' // default connection name ); // Produce a message $bus->publish(new ProducerMessage(payload: 'test-message', headers: ['foo' => 'bar'])); // Consume in the same process (or run it separately) pcntl_async_signals(true); $listener = $bus->listener('default-listener'); pcntl_signal(SIGINT, fn () => $listener->forceStop()); $listener->listen();
Producing only
If you only need to produce messages, configure the bus and call publish
with ProducerMessage
instances. You do not need to start a listener in that case.
Consuming only
If you only need to consume, configure the worker(s) and call listener('name')->listen()
. Your MessageHandler
implementation will be invoked for each message received.
More examples
- Producer only: see
examples/producer.php
- Consumer only: see
examples/consumer.php
- Full setup with routing: see
examples/bus.php
Testing
composer test
Changelog
Please see CHANGELOG for more information on what has changed recently.
Contributing
Please see CONTRIBUTING for details.
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
Credits
License
The MIT License (MIT). Please see License File for more information.