mateusjunges/laravel-kafka
Laravel Kafka makes it easy to produce and consume Kafka messages in Laravel with a clean, expressive API and improved testability. Build producers and consumers quickly, integrate with your app workflows, and avoid painful Kafka testing setups.
Installation:
composer require mateusjunges/laravel-kafka
Ensure rdkafka PHP extension is installed (installation guide).
Configuration: Publish the config file:
php artisan vendor:publish --provider="Junges\Kafka\KafkaServiceProvider"
Update .env with your Kafka broker(s) and topic configurations.
First Producer Use Case:
use Junges\Kafka\Facades\Kafka;
Kafka::publish('broker')
->onTopic('user.created')
->withBody(['user_id' => 123])
->send();
First Consumer Use Case:
Kafka::consumer(['user.created'])
->withHandler(function ($message) {
// Process message
return 0; // Commit offset
})
->build()
->consume();
Kafka::publish() and Kafka::consumer() are the primary entry points.Kafka::fake() for unit tests (see Testing section).Kafka::publish('broker')
->onTopic('orders.placed')
->withHeaders(['source' => 'web'])
->withBody(['order_id' => 456, 'amount' => 99.99])
->send();
Kafka::publish('broker')
->onTopic('logs')
->withBody(['level' => 'info', 'message' => 'User logged in'])
->async()
->send();
$producer = Kafka::publish('broker')->onTopic('notifications');
foreach ($users as $user) {
$producer->withBody(['user_id' => $user->id, 'message' => 'Hello!'])
->send();
}
Kafka::consumer(['orders.placed'])
->withHandler(function ($message) {
// Process order
Order::find($message->getBody()['order_id'])->markAsProcessed();
return 0; // Commit offset
})
->build()
->consume();
Kafka::consumer(['orders.placed'])
->withGroup('order-processors')
->withHandler(function ($message) {
// Process order
return 0;
})
->build()
->consume();
$consumer = Kafka::consumer(['user.avro'])
->usingDeserializer(new AvroDeserializer($registry, $serializer))
->withHandler(function ($message) {
// $message->getBody() is now a decoded Avro object
return 0;
})
->build();
Laravel Events: Bind Kafka producers to Laravel events for decoupled publishing:
Event::listen(UserCreated::class, function (UserCreated $event) {
Kafka::publish('broker')
->onTopic('user.created')
->withBody($event->user->toArray())
->send();
});
Queue Workers: Run consumers as Laravel queue workers:
php artisan queue:work --queue=kafka
Configure in config/queue.php:
'connections' => [
'kafka' => [
'driver' => 'kafka',
'broker' => 'broker',
'topics' => ['orders.placed'],
],
],
Retry Logic: Implement exponential backoff for failed messages:
->withHandler(function ($message) {
try {
// Process logic
return 0;
} catch (\Exception $e) {
return -1; // Requeue with delay
}
}),
Monitoring: Use Kafka's built-in metrics or integrate with Laravel Horizon for consumer monitoring.
Connection Leaks:
send()/consume() are called in a try-finally block.$producer = Kafka::publish('broker')->onTopic('topic');
try {
$producer->send();
} finally {
$producer->close(); // If supported
}
Offset Management:
0 commits the offset; returning -1 requeues the message. Incorrect values can lead to lost or duplicate messages.Schema Registry Dependencies:
confluentinc/cp-schema-registry).Async Producer Blocking:
Kafka::publish('broker')
->onTopic('logs')
->withBody(['message' => 'Test'])
->async()
->onError(function ($error) {
Log::error("Kafka error: " . $error->getMessage());
})
->send();
Testing Assumptions:
Kafka::fake() only mocks the producer, not the consumer. Consumers require manual message injection via shouldReceiveMessages().Kafka::fake() for producer tests and shouldReceiveMessages() for consumer tests.Enable Debug Logging:
Add to .env:
KAFKA_LOG_LEVEL=debug
Logs will appear in storage/logs/laravel.log.
Check Producer/Consumer State:
// Check if a producer is connected
if (Kafka::publish('broker')->isConnected()) {
// Proceed
}
Inspect Messages:
$producer = Kafka::publish('broker')->onTopic('topic');
$producer->withBody(['debug' => true]);
$producer->send();
// Check Kafka UI (e.g., Kafka UI, Conduktor) for the message.
Common Errors:
KAFKA_BROKERS in .env and rdkafka extension is loaded.KAFKA_USERNAME/KAFKA_PASSWORD and SASL/SSL configs.auto.create.topics.enable=true in config.Default Config Overrides: The package merges config from:
.env (e.g., KAFKA_BROKERS, KAFKA_GROUP_ID)config/kafka.php (e.g., default_producer_options)withConfigOptions()).Example:
Kafka::publish('broker')
->withConfigOptions(['request.required.acks' => '1'])
->onTopic('topic');
SSL/TLS:
Configure in .env:
KAFKA_SSL_CA_LOCATION=/path/to/ca.pem
KAFKA_SSL_CERT_LOCATION=/path/to/client.crt
KAFKA_SSL_KEY_LOCATION=/path/to/client.key
Or via config:
'producers' => [
'broker' => [
'ssl_ca_location' => '/path/to/ca.pem',
],
],
SASL/SCRAM:
KAFKA_SASL_MECHANISM=SCRAM-S
How can I help you explore Laravel packages today?