micromus / kafka-bus-commiter
This is my package kafka-bus-repeater
Fund package maintenance!
Requires
- php: ^8.2
- micromus/kafka-bus: ^1.1.1
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.95
- phpstan/extension-installer: ^1.3
- phpstan/phpstan: ^2.1.56
- testo/testo: ^0.10.14
README
A middleware package for micromus/kafka-bus that provides idempotent Kafka message processing. It tracks which messages have already been handled, prevents duplicate processing, and allows limiting the maximum number of read attempts.
How It Works
ConsumerCommiterMiddleware is inserted into the consumer pipeline and handles four outcomes:
- Already committed — if the message was already successfully processed (
commitedAtis notnull), the middleware logs a warning and stops the pipeline. - Max attempts exceeded — if
maxAttemptis set and the attempt count has exceeded it, the middleware logs an error and stops the pipeline. - Successful processing — if both checks pass, the message continues down the pipeline; once handled,
commit()is called to record it as processed. - Handler error — if downstream processing throws an exception, the middleware calls
failed()and rethrows the exception.
Installation
composer require micromus/kafka-bus-commiter
Usage
Basic Example
Implement RepositorySourceInterface to persist message state (for example in a database or Redis), then pass the
middleware into your worker options:
use Micromus\KafkaBus\Bus; use Micromus\KafkaBus\Consumers\Router\ConsumerRoutesBuilder; use Micromus\KafkaBus\Consumers\Router\RouteInfo; use Micromus\KafkaBus\Topics\Topic; use Micromus\KafkaBus\Topics\TopicRegistry; use Micromus\KafkaBusCommiter\Middleware\ConsumerCommiterMiddleware; $topicRegistry = (new TopicRegistry()) ->add(new Topic('production.fact.products.1', 'products')); $consumerRoutes = ConsumerRoutesBuilder::make($topicRegistry) ->add(new RouteInfo('products', new YourMessageHandler())) ->build(); $workerRegistry = (new Bus\Listeners\Workers\MemoryWorkerRegistry()) ->add( new Bus\Listeners\Workers\Worker( name: 'default-listener', routes: $consumerRoutes, options: new Bus\Listeners\Workers\Options( middleware: [ new ConsumerCommiterMiddleware(new NativeMessageRepository(new DatabaseRepositorySource)) ] ) ) );
Middleware Options
new ConsumerCommiterMiddleware( repository: $repository, // required logger: $logger, // PSR-3 logger, defaults to NullLogger maxAttempt: 3, // max attempts, -1 = unlimited )
| Parameter | Type | Default | Description |
|---|---|---|---|
repository |
RepositorySourceInterface |
— | Storage for message state |
logger |
LoggerInterface |
NullLogger |
PSR-3 compatible logger |
maxAttempt |
int |
-1 |
Maximum number of processing attempts. -1 means unlimited |
Implementing the Repository
You need to provide your own implementation of RepositorySourceInterface:
use Micromus\KafkaBusCommiter\Attempt; use Micromus\KafkaBusCommiter\Interfaces\RepositorySourceInterface; class DatabaseRepositorySource implements RepositorySourceInterface { /** * Returns the current attempt for a given key. */ public function get(string $key): ?Attempt { // ... } /** * Increments the number of failed read attempts for a key. */ public function increment(string $key): void { // ... } /** * Marks the message key as successfully processed. */ public function commit(string $key): void { // ... } }
The middleware reads/writes processing state through RepositorySourceInterface.
If you need key derivation from message data, use IdempotencyMessageRepository as an adapter that maps ConsumerMessageInterface to repository keys.
Idempotency Keys
Out of the box the consumer uses Kafka's msgId() (a combination of topic, partition and offset) as the storage key.
That works as long as the same physical message is never replayed under a different offset. As soon as you have
retries, producer-side resends, or cross-cluster mirroring, the same logical event can arrive with a different
msgId() and slip past the duplicate check.
An idempotency key is a stable identifier that the producer attaches to a message so the consumer can recognize
duplicates regardless of where or how they arrive. This package transports the key through the
x-idempotency-key Kafka header (IdempotencyMessageRepository::HEADER_NAME).
Producing: HasIdempotency + ProducerIdempotencyMiddleware
On the producer side you mark your message class with the HasIdempotency interface and return the stable key.
The ProducerIdempotencyMiddleware reads that key and writes it into the x-idempotency-key header before the
message hits the broker.
use Micromus\KafkaBus\Interfaces\Producers\Messages\ProducerMessageInterface; use Micromus\KafkaBusCommiter\Interfaces\HasIdempotency; final readonly class ProductCreated implements ProducerMessageInterface, HasIdempotency { public function __construct( private string $productId, private string $payload, ) { } public function toPayload(): string { return $this->payload; } public function getIdempotencyKey(): string { return $this->productId; } }
Wire the middleware into the publisher route for that message class:
use Micromus\KafkaBus\Bus\Publishers\Router\Options; use Micromus\KafkaBus\Bus\Publishers\Router\PublisherRoutesBuilder; use Micromus\KafkaBusCommiter\Middleware\ProducerIdempotencyMiddleware; $publisherRoutes = PublisherRoutesBuilder::make($topicRegistry) ->add( ProductCreated::class, 'products', new Options(middleware: [new ProducerIdempotencyMiddleware()]) ) ->build();
Middleware is registered per publisher route, so you opt individual message classes into the header. Messages
that do not implement HasIdempotency pass through the middleware untouched — no header is added.
Consuming: IdempotencyMessageRepository
On the consumer side, plug IdempotencyMessageRepository into ConsumerCommiterMiddleware. It reads the
x-idempotency-key header and builds the storage key as "{header}-{topicName}", so the same idempotency key
in two different topics is still treated as two distinct events. If the header is missing, it falls back to
msgId() so legacy producers keep working.
use Micromus\KafkaBusCommiter\Middleware\ConsumerCommiterMiddleware; use Micromus\KafkaBusCommiter\Repositories\IdempotencyMessageRepository; $repository = new IdempotencyMessageRepository(new DatabaseRepositorySource()); new ConsumerCommiterMiddleware($repository, maxAttempt: 3);
Picking a key
Pick something that uniquely identifies the business event, not the transport. Good choices are an aggregate
id plus a version (order-42-v3), an outbox row id, or any value the upstream system already treats as unique.
Avoid values that change on retry (timestamps, random UUIDs generated per send attempt) — they defeat the whole
mechanism.
Testing
composer test
Changelog
Please see CHANGELOG for more information on what has changed recently.
Contributing
Please see CONTRIBUTING for details.
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
Credits
License
The MIT License (MIT). Please see License File for more information.