tereta / queue
Kafka-backed message queue for the Tereta application: typed Producer/Consumer services, a queue:consumer CLI command and a Message interface for asynchronous task processing.
Requires
- php: >=8.4
- tereta/application: ^1.0
- tereta/cli: ^1.0
- tereta/core: ^1.0
- tereta/di: ^1.0
- tereta/kafka: ^1.0
- tereta/logger: ^1.0
README
π Π ΡΡΡΠΊΠΈΠΉ | English
Overview
Kafka-backed message queue for asynchronous task processing in the Tereta application. The module ships a Producer for dispatching typed messages, a long-running Consumer service with a queue:consumer CLI command, and a Message interface that callers implement to describe the unit of work.
The package wraps tereta/kafka for transport and tereta/logger for diagnostics. Messages are transported as PHP-native serialize() payloads and deserialized through a class whitelist auto-populated from every implementation of Tereta\Queue\Interfaces\Message discovered by tereta/di.
Caveat β side effects on deserialization.
unserialize()reconstructs the Message object on the consumer side without calling its__construct(). If your Message constructor has side effects (DB writes, network calls, registering listeners), those will not run on the consumer. Conversely, magic methods triggered during reconstruction (__wakeup,__unserialize) will run β keep them free of side effects. Move any service-dependent work to the optionalconstruct()method (see "Defining a Message" below) or toprocess().
Requirements
- PHP 8.4+
ext-rdkafka(provided viatereta/kafka)tereta/cli,tereta/logger,tereta/kafka,tereta/di,tereta/core,tereta/application
Configuration
Kafka itself is registered through tereta/kafka. Once that registry is available the queue module needs no additional setup:
use Tereta\Kafka\Builders\Config as KafkaConfigBuilder;
use Tereta\Kafka\Services\Registry as KafkaRegistry;
KafkaRegistry::singleton()->register(
KafkaConfigBuilder::factory()->create()
->set('metadata.broker.list', 'kafka:9092')
->set('group.id', 'workers')
);
Tereta\Queue\Module::register() registers a queue log channel that writes to ROOT_DIRECTORY/var/logs/queue.log and pre-populates the consumer's unserialize() whitelist with every class implementing Message.
Make sure the queue and error log levels you care about are enabled (Tereta\Logger\Services\Channel\Config::set('info', true), etc.) β tereta/logger is silent by default.
Defining a Message
A queue message is any class implementing Tereta\Queue\Interfaces\Message. The contract has two methods: getTopic() returns the Kafka topic, and process() performs the work after the consumer deserializes the payload.
use Tereta\Queue\Interfaces\Message;
class SendWelcomeEmail implements Message
{
public function __construct(
public readonly int $userId,
) {
}
public function getTopic(): string
{
return 'welcome-email';
}
public function process(): void
{
// do the work
}
}
If a message needs services injected at consume time (rather than at dispatch time), declare a construct() method β the consumer will call it with arguments resolved through tereta/di after deserialization:
public function construct(MyService $service): void
{
$this->service = $service;
}
Usage
Producer
use Tereta\Queue\Services\Producer as QueueProducer;
QueueProducer::singleton()->dispatch(new SendWelcomeEmail(userId: 42));
dispatch() enqueues into the underlying Kafka producer and flushes automatically every 10 seconds (configurable via setFlushInterval()). Call flush() explicitly before short-lived processes exit.
dispatch() may throw if the broker is unreachable, the topic is misconfigured, or tereta/kafka raises an internal error. The producer logs the failure to the error channel with the topic, message class, exception class and stack trace, then rethrows β wrap calls in try/catch if you need to retry, fall back, or persist the payload for later replay. RuntimeException is also raised when the message returns an empty topic from getTopic().
Consumer
Start a consumer for a given topic from the CLI:
./bin/tereta queue:consumer welcome-email
The CLI command runs a single iteration; configure persistent mode in code if you want a long-running worker:
use Tereta\Queue\Services\Consumer as QueueConsumer;
QueueConsumer::singleton()->setPersistent(true)->handle('welcome-email');
Run multiple consumer processes in parallel by sharing the same Kafka group.id β partition assignment is handled by the broker.
Delivery semantics
process()must be idempotent. The consumer commits the Kafka offset afterprocess()returns, so the queue is at-least-once: if the worker is killed (SIGKILL, OOM, container eviction, host crash) between message processing and offset commit, the broker will redeliver the message after restart. Messages are also replayed when the consumer group rebalances. Designprocess()so that running it twice on the same payload is safe β use idempotency keys, conditional writes, or external de-duplication for operations with side effects (payments, emails, third-party API calls). The module deliberately does not install signal handlers; if you need a coordinated shutdown for non-idempotent workloads, wrap your worker in a supervisor that holds messages in your own pre-commit log.
Error handling
The consumer loop wraps processMessage() in try/catch. On any exception it logs the failure with the topic, exception class, message and stack trace into the error channel, then commits the offset so the consumer keeps making progress instead of looping on a poison message. The original payload remains in the Kafka log for the configured retention window β replay by resetting the consumer group offset if you need to reprocess.
If you need to drop everything currently in a topic (typically in tests):
QueueConsumer::singleton()->clearQueue('welcome-email', maxWaitSeconds: 5);
Logging
Two channels are written:
queueβ informational events (consumer started, message processing) at fileROOT_DIRECTORY/var/logs/queue.log.errorβ failed message processing at the defaulterrorchannel fromtereta/logger.
Each entry carries structured context (topic, class, exception, error, trace) for correlation.
Author and License
Author: Tereta Alexander
Website: tereta.dev
License: Apache License 2.0. See LICENSE.
www.ββββββββββββββββββββββββ βββββββββββββββββ ββββββ
ββββββββββββββββββββββββββββββββββββββββββββββββββ
βββ ββββββ ββββββββββββββ βββ ββββββββ
βββ ββββββ ββββββββββββββ βββ ββββββββ
βββ βββββββββββ βββββββββββ βββ βββ βββ
βββ βββββββββββ βββββββββββ βββ βββ βββ
.dev
Copyright (c) 2008-2026 Tereta Alexander