mateusjunges/laravel-kafka
Laravel Kafka brings a clean Laravel-friendly API for producing and consuming Kafka messages, with an emphasis on developer experience and easier testing. Ideal for integrating Kafka streams and event-driven workflows into your Laravel applications.
Installation:
composer require mateusjunges/laravel-kafka
Publish the config file:
php artisan vendor:publish --provider="Junges\Kafka\KafkaServiceProvider"
Configure Kafka Connection:
Edit .env with your Kafka broker details:
KAFKA_BROKERS=localhost:9092
KAFKA_CONSUMER_GROUP=your-group-name
First Use Case: Publish a message:
use Junges\Kafka\Facades\Kafka;
Kafka::publish('broker')
->onTopic('user.created')
->withBody(['user_id' => 1])
->send();
Junges\Kafka\Facades\Kafka (primary entry point)Basic Message Publishing:
Kafka::publish('broker')
->onTopic('events.user.created')
->withHeaders(['source' => 'api'])
->withBody(['user_id' => 123])
->send();
Async Publishing with Queues: Dispatch a job to publish messages:
KafkaPublishJob::dispatch('broker', 'topic', ['data' => 'value']);
Schema Registry Integration:
Kafka::publish('broker')
->onTopic('users')
->withSchema('UserSchema')
->withBody($user)
->send();
Basic Consumer Setup:
Kafka::consumer(['orders.placed'])
->withHandler(function ($message) {
// Process message
return 0; // Commit offset
})
->build()
->consume();
Consumer with Error Handling:
Kafka::consumer(['payments.processed'])
->withHandler(function ($message) {
try {
// Process logic
} catch (\Exception $e) {
return -1; // Requeue message
}
return 0;
})
->build()
->consume();
Consumer Groups:
Kafka::consumer(['notifications.sent'], 'notifications-group')
->withHandler(fn($message) => /* ... */)
->build()
->consume();
Laravel Events: Bind Kafka publishing to Laravel events:
event(new UserCreated($user));
// In EventServiceProvider:
UserCreated::dispatch(function ($event) {
Kafka::publish('broker')
->onTopic('user.created')
->withBody($event->user)
->send();
});
Queue Workers: Run consumers as Laravel queue workers:
php artisan queue:work --queue=kafka
Middleware: Add Kafka-specific middleware for request/response handling.
Connection Issues:
KAFKA_BROKERS in .env matches your broker's advertised listeners.broker:29092 (internal) or localhost:9092 (external).Serialization Errors:
withBody() with arrays or JSON strings:
->withBody(json_encode(['key' => 'value']))
Consumer Lag:
0 to commit, -1 to requeue, or >0 to skip.Schema Registry:
Enable Logging:
KAFKA_LOG_LEVEL=debug
Check Published Messages:
Kafka::fake();
Kafka::publish('broker')->onTopic('test')->withBody(['test' => true])->send();
Kafka::assertPublished(); // Verify message was sent
Consumer Debugging:
shouldReceiveMessages() in tests to simulate message flow:
Kafka::fake();
Kafka::shouldReceiveMessages([new ConsumedMessage(/* ... */)]);
Default Broker:
publish() method accepts a broker name. Ensure your config/kafka.php defines it:
'brokers' => [
'broker' => [
'hosts' => env('KAFKA_BROKERS', 'localhost:9092'),
],
],
SSL/TLS:
.env:
KAFKA_SSL_ENABLED=true
KAFKA_SSL_CA_FILE=/path/to/ca.pem
SASL Authentication:
KAFKA_SASL_ENABLED=true
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=user
KAFKA_SASL_PASSWORD=pass
Custom Producers/Consumers:
Junges\Kafka\Contracts\Producer or Junges\Kafka\Contracts\Consumer interfaces.Middleware:
Kafka::extend('custom', function ($app) {
return new CustomKafkaProducer($app);
});
Testing Extensions:
Kafka::assertPublished(function ($message) {
return $message->getBody()['status'] === 'processed';
});
How can I help you explore Laravel packages today?