mateusjunges/laravel-kafka
Laravel Kafka brings a clean Laravel-friendly API for producing and consuming Kafka messages, with an emphasis on developer experience and easier testing. Ideal for integrating Kafka streams and event-driven workflows into your Laravel applications.
Manual commit gives you complete control over when message offsets are committed to Kafka. This provides stronger processing guarantees and better error handling compared to auto-commit mode.
<x-sponsors.request-sponsor/>
By default, the package uses auto-commit mode where messages are automatically committed after your handler successfully processes them. With manual commit, you decide exactly when to commit messages, allowing for:
To enable manual commit, set withManualCommit() when creating your consumer:
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer(['my-topic'])
->withManualCommit() // Disable auto-commit
->withHandler(function($message, $consumer) {
// Your handler with manual commit control
})
->build();
Your message handlers receive a $consumer parameter with these commit methods:
// Commit all current assignment offsets
$consumer->commit();
// Commit specific message offset
$consumer->commit($message);
// Commit specific partition offsets
$consumer->commit([$topicPartition1, $topicPartition2]);
// Commit all current assignment offsets asynchronously
$consumer->commitAsync();
// Commit specific message offset asynchronously
$consumer->commitAsync($message);
// Commit specific partition offsets asynchronously
$consumer->commitAsync([$topicPartition1, $topicPartition2]);
All commit methods accept these parameter types:
null (default): Commit offsets for current assignmentConsumerMessage: Commit offset for the specific messageRdKafka\Message: Commit offset for the underlying Kafka messageRdKafka\TopicPartition[]: Commit specific partition offsets$consumer = Kafka::consumer(['orders'])
->withManualCommit()
->withHandler(function($message, $consumer) {
try {
// Process the order
$order = json_decode($message->getBody(), true);
processOrder($order);
// Commit only after successful processing
$consumer->commit($message);
} catch (Exception $e) {
Log::error('Order processing failed', [
'error' => $e->getMessage(),
'order_id' => $order['id'] ?? 'unknown'
]);
}
});
$consumer->withHandler(function($message, $consumer) {
// Process message
processMessage($message);
// Use async commit for better throughput
$consumer->commitAsync($message);
// Handler can continue immediately without waiting for commit
});
$consumer->withHandler(function($message, $consumer) {
$maxRetries = 3;
$attempt = 0;
while ($attempt < $maxRetries) {
try {
processMessage($message);
$consumer->commit($message);
return; // Success
} catch (RetryableException $e) {
$attempt++;
Log::warning("Retry attempt {$attempt}", ['error' => $e->getMessage()]);
if ($attempt >= $maxRetries) {
// Send to DLQ or handle permanent failure
Log::error('Max retries exceeded', ['error' => $e->getMessage()]);
throw $e;
}
sleep(pow(2, $attempt));
} catch (Exception $e) {
// Non-retryable error
Log::error('Non-retryable error', ['error' => $e->getMessage()]);
throw $e;
}
}
});
Manual commit works seamlessly with DLQ functionality:
$consumer = Kafka::consumer(['orders'])
->withManualCommit()
->withDlq('orders-dlq') // Configure DLQ
->withHandler(function($message, $consumer) {
try {
processOrder($message);
$consumer->commit($message);
} catch (ValidationException $e) {
// Don't commit - let DLQ handling take over
Log::error('Invalid order format', ['error' => $e->getMessage()]);
throw $e; // This will trigger DLQ
}
});
// High-throughput scenario - use async
$consumer->commitAsync($message);
// Critical data - use sync for guarantee
$consumer->commit($message);
$consumer->withHandler(function($message, $consumer) {
static $messages = [];
// Collect messages
$messages[] = $message;
// Batch commit every 100 messages
if (count($messages) >= 100) {
// Process all messages
foreach ($messages as $msg) {
processMessage($msg);
}
// Commit the last message's offset (commits all previous)
$consumer->commitAsync(end($messages));
$messages = [];
}
});
To migrate existing auto-commit consumers to manual commit:
$consumer = Kafka::consumer(['topic'])
->withAutoCommit() // or omit, it's the default
->withHandler(function($message, $consumer) {
processMessage($message);
});
$consumer = Kafka::consumer(['topic'])
->withManualCommit() // Enable manual control
->withHandler(function($message, $consumer) {
try {
processMessage($message);
$consumer->commit($message); // Explicit commit
} catch (Exception $e) {
// Handle errors without committing
Log::error('Processing failed', ['error' => $e->getMessage()]);
}
});
commitAsync()Messages being reprocessed repeatedly:
commit() after successful processingPoor performance:
commitAsync() instead of commit() for better throughputOffset commit errors:
How can I help you explore Laravel packages today?