gurento / kafka-consumer
Laravel Kafka consumer module with topic mapping, retry tracking, and operational logs.
Requires
- php: ^8.2
- illuminate/console: ^11.0|^12.0
- illuminate/database: ^11.0|^12.0
- illuminate/support: ^11.0|^12.0
- mateusjunges/laravel-kafka: ^2.9
Requires (Dev)
- orchestra/testbench: ^9.0|^10.0
- phpunit/phpunit: ^10.0|^11.0
README
gurento/kafka-consumer is a Laravel package for consuming Kafka messages and writing them into Eloquent models using configurable topic mappings.
It is built for operations teams that need:
- declarative topic-to-model mapping
- safe
updateOrCreateupserts - failed-message tracking and re-consume flows
- operational counters and heartbeat metadata
- command-driven workflow for normal consume and replay
The package ships with a default engine based on mateusjunges/laravel-kafka, so it works out of the box.
Features
- Topic configuration in DB (
kafka_topics) - Message logs in DB (
kafka_consume_logs) - Payload key exclusion
- Payload-to-model field mapping
- Configurable upsert key
- Failed log retry scheduling metadata
- Re-consume command mode for failed messages
- Consume metadata tracking (partition, offset, key)
Requirements
- PHP 8.2+
- Laravel 11 or 12
- Kafka broker access
rdkafkaextension configured for your PHP runtime
Installation
composer require gurento/kafka-consumer php artisan vendor:publish --tag=kafka-consumer-config php artisan vendor:publish --tag=kafka-consumer-migrations php artisan migrate
What Gets Installed
After migration:
kafka_topicstable: topic config + counters + health metadatakafka_consume_logstable: per-message processing logs
Quick Start
- Create a topic configuration.
- Start consumer:
php artisan gurento:kafka-consume
- Inspect results in:
kafka_topics.messages_consumedkafka_consume_logs
Topic Configuration Model
Each kafka_topics row defines how a Kafka topic maps into your model.
Core columns:
topic: Kafka topic namemodel_class: Fully-qualified model class (example:App\\Models\\Office)upsert_key: Unique model column used byupdateOrCreatefield_map: JSON array of mapping pairs (frompayload field,tomodel field)exclude_keys: Optional payload keys to ignoreis_active: whether topic is consumed
Operational columns (auto-managed):
messages_consumed,messages_failed,messages_reconsumedlast_consumed_at,consumer_last_heartbeat_atconsumer_last_error,consumer_lag_seconds
Field Mapping Example
If Kafka payload is:
{
"uuid": "off-001",
"name": "Accounting Office",
"meta": {"source":"hr"}
}
Use field_map:
[
{"from": "uuid", "to": "id"},
{"from": "name", "to": "name"}
]
And exclude_keys:
["meta"]
The package upserts by upsert_key (for example id).
Command Usage
Normal Consumption
php artisan gurento:kafka-consume
Options:
--topics=*consume only selected topics--limit=process only N messages then stop--from-beginningread from earliest offsets (replay mode)
Examples:
php artisan gurento:kafka-consume --topics=HR_APP.LIVE.office php artisan gurento:kafka-consume --topics=HR_APP.LIVE.office --limit=500 php artisan gurento:kafka-consume --from-beginning
Re-consume Failed Logs
Instead of polling Kafka, reprocess failed entries from kafka_consume_logs:
php artisan gurento:kafka-consume --reconsume-failed --reconsume-limit=100
Useful after fixing:
- wrong
field_map - wrong
model_class - temporary DB/infrastructure failures
How Replay Works
--from-beginning sets offset reset to earliest and uses a unique consumer group (unless provided) to avoid corrupting your normal consumer offsets.
Recommended pattern:
- Run normal consumer with stable group for real-time operations.
- Use
--from-beginningonly for backfills/replays.
Failure and Retry Behavior
On processing errors, package logs:
status=failederror- retry metadata (
attempt_count,next_retry_at,retryable)
Re-consume mode updates status to reconsumed_success when replay succeeds.
Plug-and-Play Engine
By default, package auto-binds:
Gurento\\KafkaConsumer\\Contracts\\ConsumerEngine- to
Gurento\\KafkaConsumer\\Engines\\LaravelKafkaConsumerEngine
No host-app binding is required for standard usage.
Custom Engine (Optional)
If you need a different transport implementation, override binding in host app:
$this->app->singleton( \Gurento\KafkaConsumer\Contracts\ConsumerEngine::class, YourCustomEngine::class, );
Your engine must implement:
public function consume(array $topics, callable $handler, array $options = []): void;
Programmatic Consumption Hook
If you already consume Kafka elsewhere, call the service directly:
app(\Gurento\KafkaConsumer\Services\KafkaConsumerService::class) ->handle($topicName, $payload, $metadata);
Production Recommendations
- Run consumer as a daemon (Supervisor/systemd)
- Monitor
messages_failedandconsumer_last_error - Use separate groups for replay/backfill jobs
- Keep
field_mapexplicit and reviewed - Keep topic configs in source-controlled seeders where possible
Troubleshooting
No active topics configured
Make sure kafka_topics has rows with is_active = true.
Command exists but not working as expected
Run:
php artisan optimize:clear composer dump-autoload
Kafka connection/auth issues
Verify host app config/kafka.php and environment values (KAFKA_BROKERS, auth settings).
Replay not processing any failed logs
Check failed log rows:
status = failedretryable = truenext_retry_at <= now()(orNULL)
Filament UI Integration
Install companion package for admin UI:
gurento/kafka-consumer-filament
See its README for plugin registration and panel setup.
License
MIT