simpod / kafka-bundle
Kafka Symfony bundle.
Fund package maintenance!
simPod
Installs: 80 413
Dependents: 0
Suggesters: 0
Security: 0
Stars: 8
Watchers: 3
Forks: 7
Open Issues: 1
Requires
- php: ^8.2
- simpod/kafka: ^0.2.0
- symfony/config: ^6.4 || ^7.0
- symfony/console: ^6.4 || ^7.0
- symfony/contracts: ^3.0
- symfony/dependency-injection: ^6.4 || ^7.0
- symfony/framework-bundle: ^6.4 || ^7.0
- thecodingmachine/safe: ^2
Requires (Dev)
- doctrine/coding-standard: ^12.0
- infection/infection: ^0.27.0
- kwn/php-rdkafka-stubs: ^2.0
- phpstan/extension-installer: ^1.1
- phpstan/phpstan: ^1.0.0
- phpstan/phpstan-phpunit: ^1.0.0
- phpstan/phpstan-strict-rules: ^1.0.0
- phpunit/phpunit: ^10.3
- psalm/plugin-phpunit: ^0.18.4
- roave/infection-static-analysis-plugin: ^1.7
- symfony/yaml: ^7.0
- thecodingmachine/phpstan-safe-rule: ^1.0
- vimeo/psalm: ^5.8
Suggests
- kwn/php-rdkafka-stubs: Support and autocompletion for RDKafka in IDE | require as dev dependency
README
Installation
Add as Composer dependency:
composer require simpod/kafka-bundle
Then add KafkaBundle
to Symfony's bundles.php
:
use SimPod\KafkaBundle\SimPodKafkaBundle; return [ ... new SimPodKafkaBundle() ... ];
Usage
This package simply makes it easier to integrate https://github.com/arnaud-lb/php-rdkafka with Symfony. For more details how to work with Kafka in PHP, refer to its documentation.
Available console commands:
bin/console debug:kafka:consumers
to list all available consumer groupsbin/console kafka:consumer:run <consumer name>
to run consumer instance
Config:
You can create eg. kafka.yaml
file in your config directory with following content:
kafka: authentication: '%env(KAFKA_AUTHENTICATION)%' bootstrap_servers: '%env(KAFKA_BOOTSTRAP_SERVERS)%' client: id: 'your-application-name'
authentication
reads env varKAFKA_AUTHENTICATIOn
that contains authentication uri (sasl-plain://user:password
, or it might be just empty indicating no authentication).bootstrap_servers
reads env varKAFKA_BOOTSTRAP_SERVERS
that contains comma-separated list of bootstrap servers (broker-1.kafka:9092,broker-2.kafka:9092
).
If bootstrap_servers
isn't set, it defaults to 127.0.0.1:9092
Services
Following services are registered in container and can be DI injected.
Configuration
class: \SimPod\KafkaBundle\Kafka\Configuration
Configuration service allows easy access to all the configuration properties.
$config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getIdWithHostname());
Consuming
There's interface NamedConsumer
available. When your consumer implements it, this bundle autoregisters it.
This is example of simple consumer, it can be then run via bin/console kafka:consumer:run consumer1
<?php declare(strict_types=1); namespace Your\AppNamespace; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; use SimPod\KafkaBundle\Kafka\Configuration; use SimPod\KafkaBundle\Kafka\Clients\Consumer\NamedConsumer; final class ExampleKafkaConsumer implements NamedConsumer { private Configuration $configuration; public function __construct(Configuration $configuration) { $this->configuration = $configuration; } public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); $kafkaConsumer->subscribe(['topic1']); while (true) { ... } } public function getName(): string { return 'consumer1'; } private function getConfig(): ConsumerConfig { $config = new ConsumerConfig(); $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, $this->configuration->getBootstrapServers()); $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false); $config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getClientIdWithHostname()); $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest'); $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group'); return $config; } }
Development
There is kwn/php-rdkafka-stubs
listed as a dev dependency so it properly integrates php-rdkafka extension with IDE.