tereta/queue

Kafka-backed message queue for the Tereta application: typed Producer/Consumer services, a queue:consumer CLI command and a Message interface for asynchronous task processing.

Maintainers

Package info

gitlab.com/tereta/library/queue

Homepage

Issues

pkg:composer/tereta/queue

Statistics

Installs: 37

Dependents: 2

Suggesters: 0

Stars: 0

1.0.3 2026-04-28 22:42 UTC

This package is auto-updated.

Last update: 2026-04-29 05:43:26 UTC


README

🌐 Русский | English

Overview

Kafka-backed message queue for asynchronous task processing in the Tereta application. The module ships a Producer for dispatching typed messages, a long-running Consumer service with a queue:consumer CLI command, and a Message interface that callers implement to describe the unit of work.

The package wraps tereta/kafka for transport and tereta/logger for diagnostics. Messages are transported as PHP-native serialize() payloads and deserialized through a class whitelist auto-populated from every implementation of Tereta\Queue\Interfaces\Message discovered by tereta/di.

Caveat β€” side effects on deserialization. unserialize() reconstructs the Message object on the consumer side without calling its __construct(). If your Message constructor has side effects (DB writes, network calls, registering listeners), those will not run on the consumer. Conversely, magic methods triggered during reconstruction (__wakeup, __unserialize) will run β€” keep them free of side effects. Move any service-dependent work to the optional construct() method (see "Defining a Message" below) or to process().

Requirements

  • PHP 8.4+
  • ext-rdkafka (provided via tereta/kafka)
  • tereta/cli, tereta/logger, tereta/kafka, tereta/di, tereta/core, tereta/application

Configuration

Kafka itself is registered through tereta/kafka. Once that registry is available the queue module needs no additional setup:

use Tereta\Kafka\Builders\Config as KafkaConfigBuilder;
use Tereta\Kafka\Services\Registry as KafkaRegistry;

KafkaRegistry::singleton()->register(
    KafkaConfigBuilder::factory()->create()
        ->set('metadata.broker.list', 'kafka:9092')
        ->set('group.id', 'workers')
);

Tereta\Queue\Module::register() registers a queue log channel that writes to ROOT_DIRECTORY/var/logs/queue.log and pre-populates the consumer's unserialize() whitelist with every class implementing Message.

Make sure the queue and error log levels you care about are enabled (Tereta\Logger\Services\Channel\Config::set('info', true), etc.) β€” tereta/logger is silent by default.

Defining a Message

A queue message is any class implementing Tereta\Queue\Interfaces\Message. The contract has two methods: getTopic() returns the Kafka topic, and process() performs the work after the consumer deserializes the payload.

use Tereta\Queue\Interfaces\Message;

class SendWelcomeEmail implements Message
{
    public function __construct(
        public readonly int $userId,
    ) {
    }

    public function getTopic(): string
    {
        return 'welcome-email';
    }

    public function process(): void
    {
        // do the work
    }
}

If a message needs services injected at consume time (rather than at dispatch time), declare a construct() method β€” the consumer will call it with arguments resolved through tereta/di after deserialization:

public function construct(MyService $service): void
{
    $this->service = $service;
}

Usage

Producer

use Tereta\Queue\Services\Producer as QueueProducer;

QueueProducer::singleton()->dispatch(new SendWelcomeEmail(userId: 42));

dispatch() enqueues into the underlying Kafka producer and flushes automatically every 10 seconds (configurable via setFlushInterval()). Call flush() explicitly before short-lived processes exit.

dispatch() may throw if the broker is unreachable, the topic is misconfigured, or tereta/kafka raises an internal error. The producer logs the failure to the error channel with the topic, message class, exception class and stack trace, then rethrows β€” wrap calls in try/catch if you need to retry, fall back, or persist the payload for later replay. RuntimeException is also raised when the message returns an empty topic from getTopic().

Consumer

Start a consumer for a given topic from the CLI:

./bin/tereta queue:consumer welcome-email

The CLI command runs a single iteration; configure persistent mode in code if you want a long-running worker:

use Tereta\Queue\Services\Consumer as QueueConsumer;

QueueConsumer::singleton()->setPersistent(true)->handle('welcome-email');

Run multiple consumer processes in parallel by sharing the same Kafka group.id β€” partition assignment is handled by the broker.

Delivery semantics

process() must be idempotent. The consumer commits the Kafka offset after process() returns, so the queue is at-least-once: if the worker is killed (SIGKILL, OOM, container eviction, host crash) between message processing and offset commit, the broker will redeliver the message after restart. Messages are also replayed when the consumer group rebalances. Design process() so that running it twice on the same payload is safe β€” use idempotency keys, conditional writes, or external de-duplication for operations with side effects (payments, emails, third-party API calls). The module deliberately does not install signal handlers; if you need a coordinated shutdown for non-idempotent workloads, wrap your worker in a supervisor that holds messages in your own pre-commit log.

Error handling

The consumer loop wraps processMessage() in try/catch. On any exception it logs the failure with the topic, exception class, message and stack trace into the error channel, then commits the offset so the consumer keeps making progress instead of looping on a poison message. The original payload remains in the Kafka log for the configured retention window β€” replay by resetting the consumer group offset if you need to reprocess.

If you need to drop everything currently in a topic (typically in tests):

QueueConsumer::singleton()->clearQueue('welcome-email', maxWaitSeconds: 5);

Logging

Two channels are written:

  • queue β€” informational events (consumer started, message processing) at file ROOT_DIRECTORY/var/logs/queue.log.
  • error β€” failed message processing at the default error channel from tereta/logger.

Each entry carries structured context (topic, class, exception, error, trace) for correlation.

Author and License

Author: Tereta Alexander
Website: tereta.dev
License: Apache License 2.0. See LICENSE.

 www.β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—
     β•šβ•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•”β•β•β•β•β•β•šβ•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—
        β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—  β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—     β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•‘
        β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ•”β•β•β•  β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•”β•β•β•     β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•‘
        β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•‘  β–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—   β–ˆβ–ˆβ•‘   β–ˆβ–ˆβ•‘  β–ˆβ–ˆβ•‘
        β•šβ•β•   β•šβ•β•β•β•β•β•β•β•šβ•β•  β•šβ•β•β•šβ•β•β•β•β•β•β•   β•šβ•β•   β•šβ•β•  β•šβ•β•
                                                      .dev

Copyright (c) 2008-2026 Tereta Alexander