tereta/kafka

Maintainers

Package info

gitlab.com/tereta/library/kafka

Issues

pkg:composer/tereta/kafka

Statistics

Installs: 55

Dependents: 2

Suggesters: 0

Stars: 0

1.0.5 2026-04-27 23:20 UTC

This package is auto-updated.

Last update: 2026-04-29 05:43:26 UTC


README

🌐 Русский | English

Overview

Wrapper around the PHP rdkafka extension (Apache Kafka). Provides a registry of producers/consumers, a configuration builder, a safe shutdown-flush, and integration with Tereta\Logger for diagnostics.

Requirements

  • PHP 8.4+
  • ext-rdkafka extension
  • tereta/core, tereta/logger

Configuration

Registration is done in .config.php:

use Tereta\Kafka\Builders\Config as KafkaConfigBuilder;
use Tereta\Kafka\Services\Registry as KafkaRegistry;

KafkaRegistry::singleton()->register(
    KafkaConfigBuilder::factory()->create()
        ->set('metadata.broker.list', 'kafka:9092')
        ->set('group.id', 'workers')
);

Tereta\Kafka\Builders\Config::set() accepts any librdkafka option. When building the config for a producer/consumer, a separate whitelist of relevant options is applied.

Usage

Producer

use Tereta\Kafka\Services\Registry as KafkaRegistry;

$producer = KafkaRegistry::singleton()->getProducer();
$producer->newTopic('events')->produce(RD_KAFKA_PARTITION_UA, 0, 'payload');
$producer->flush();

flush() is synchronous and throws RuntimeException on failure. On process termination the destructor calls flush() itself so that pending messages aren't lost.

Consumer

use Tereta\Kafka\Services\Registry as KafkaRegistry;

$consumer = KafkaRegistry::singleton()->getConsumer();
$consumer->subscribeTopic('events');
$consumer->waitAssignment();

while (true) {
    $payload = $consumer->getMessage(maxWaitSeconds: 15, pollTimeoutMs: 1000);
    if ($payload === null) {
        continue;
    }
    // handle the message
    $consumer->commit();
}

Transient librdkafka errors are logged into the error channel and do not break the loop.

Author and License

Author: Tereta Alexander
Website: tereta.dev
License: Apache License 2.0. See LICENSE.

 www.β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—
     β•šβ•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•”β•β•β•β•β•β•šβ•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—
        β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—  β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—     β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•‘
        β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ•”β•β•β•  β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•”β•β•β•     β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•‘
        β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•‘  β–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—   β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ•‘  β–ˆβ–ˆβ•‘
        β•šβ•β•   β•šβ•β•β•β•β•β•β•β•šβ•β•  β•šβ•β•β•šβ•β•β•β•β•β•β•   β•šβ•β•   β•šβ•β•  β•šβ•β•
                                                      .dev

Copyright (c) 2025-2026 Tereta Alexander