mateusjunges/laravel-kafka
Laravel Kafka brings a clean Laravel-friendly API for producing and consuming Kafka messages, with an emphasis on developer experience and easier testing. Ideal for integrating Kafka streams and event-driven workflows into your Laravel applications.
Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any callable.
You can use an invokable class or a simple callback. Use the withHandler method to specify your handler:
$consumer = \Junges\Kafka\Facades\Kafka::consumer();
// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle your message here
});
Or, using an invokable class:
class Handler
{
public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle your message here
}
}
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withHandler(new Handler)
The ConsumerMessage contract gives you some handy methods to get the message properties:
getKey(): Returns the Kafka Message KeygetTopicName(): Returns the topic where the message was publishedgetPartition(): Returns the kafka partition where the message was publishedgetHeaders(): Returns the kafka message headersgetBody(): Returns the body of the messagegetOffset(): Returns the offset where the message was publishedWhen using manual commit mode (withAutoCommit(false)), your handlers receive a $consumer parameter that provides commit methods. This allows you to control exactly when message offsets are committed:
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withManualCommit() // Enable manual commit mode
->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
try {
// Process your message
$data = json_decode($message->getBody(), true);
processBusinessLogic($data);
// Commit the message after successful processing
$consumer->commit($message);
} catch (ValidationException $e) {
// Don't commit invalid messages, send to DLQ or handle differently
Log::warning('Invalid message format', ['message' => $message->getBody()]);
} catch (Exception $e) {
Log::error('Processing failed', ['error' => $e->getMessage()]);
throw $e;
}
});
The $consumer parameter provides these commit methods:
Synchronous commits (blocking):
$consumer->commit() - Commit current assignment offsets$consumer->commit($message) - Commit specific message offsetAsynchronous commits (non-blocking, better performance):
$consumer->commitAsync() - Commit current assignment offsets$consumer->commitAsync($message) - Commit specific message offsetYou can also create dedicated handler classes by implementing the Handler interface. Handler classes receive both the message and consumer parameters, just like closure handlers:
use Junges\Kafka\Contracts\Handler;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\MessageConsumer;
class ProcessOrderHandler implements Handler
{
public function __invoke(ConsumerMessage $message, MessageConsumer $consumer): void
{
try {
$order = json_decode($message->getBody(), true);
// Process the order
$this->processOrder($order);
// Manual commit after successful processing
$consumer->commit($message);
} catch (ValidationException $e) {
// Don't commit invalid messages
Log::warning('Invalid order data', ['message' => $message->getBody()]);
} catch (Exception $e) {
// Don't commit on processing errors
Log::error('Order processing failed', ['error' => $e->getMessage()]);
throw $e; // Re-throw to trigger DLQ handling if configured
}
}
private function processOrder(array $order): void
{
// Your business logic here
}
}
Using Handler classes with the consumer:
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer(['orders'])
->withManualCommit() // Enable manual commit mode
->withHandler(new ProcessOrderHandler())
->build();
$consumer->consume();
<x-sponsors.request-sponsor/>
How can I help you explore Laravel packages today?