d1oxyde/kafka-bundle
Laravel-friendly Kafka integration bundle providing configuration, producer/consumer helpers, and streamlined setup for working with Apache Kafka in PHP applications. Suitable for event-driven apps needing message publishing and processing.
Install the Bundle
composer require d1oxyde/kafka-bundle
Ensure ext-rdkafka and enqueue/rdkafka are installed (PHP 7.1+).
Configure the Bundle
Add to config/bundles.php:
return [
// ...
D1oxyde\KafkaBundle\D1oxydeKafkaBundle::class => ['all' => true],
];
Basic Configuration
Define Kafka brokers in config/packages/d1oxyde_kafka.yaml:
d1oxyde_kafka:
clients:
default:
hosts: ['localhost:9092']
topic: 'test_topic'
First Use Case: Producing a Message
Inject the KafkaProducer service and publish:
use D1oxyde\KafkaBundle\Producer\KafkaProducerInterface;
class MyService
{
public function __construct(private KafkaProducerInterface $producer) {}
public function sendMessage(string $message): void
{
$this->producer->send('default', $message);
}
}
Synchronous Publishing
Use KafkaProducerInterface for fire-and-forget:
$this->producer->send('default', json_encode(['event' => 'user.created']));
Asynchronous Publishing with Callbacks
Leverage KafkaAsyncProducer for non-blocking operations:
$this->asyncProducer->send('default', $payload, function ($error, $result) {
if ($error) {
// Handle failure
}
});
Partitioning and Keys Control message routing via keys:
$this->producer->send('default', $payload, ['partition' => 0, 'key' => 'user:123']);
Basic Consumer Setup Define a consumer class:
use D1oxyde\KafkaBundle\Consumer\KafkaConsumerInterface;
class UserConsumer implements KafkaConsumerInterface
{
public function consume($message): void
{
$data = json_decode($message, true);
// Process $data
}
}
Registering Consumers
Configure in config/packages/d1oxyde_kafka.yaml:
d1oxyde_kafka:
consumers:
user_consumer:
class: App\Consumer\UserConsumer
topics: ['user_events']
group_id: 'user-group'
Handling Offsets Manually commit offsets if needed:
$this->consumer->commitOffsets();
Symfony Messenger Bridge
Use Enqueue\SymfonyBridge to integrate with Symfony Messenger:
# config/packages/messenger.yaml
messenger:
transports:
kafka: '%env(KAFKA_DSN)%'
routing:
'App\Message\UserCreated': kafka
Retry Logic Implement exponential backoff for transient failures:
$this->producer->send($topic, $payload, ['retry' => 3, 'retry_delay' => 1000]);
Schema Registry (Avro/Protobuf) Extend the bundle to support schema validation by wrapping payloads:
$this->producer->send('default', $serializer->serialize($payload, 'json'));
Missing ext-rdkafka
Ensure the PHP extension is installed and enabled. Verify with:
php -m | grep rdkafka
Topic Auto-Creation The bundle does not auto-create topics. Pre-create topics via:
kafka-topics --create --topic test_topic --bootstrap-server localhost:9092
Consumer Group Conflicts
Ensure group_id is unique per consumer instance to avoid offset conflicts.
Serialization Mismatches
Always serialize payloads explicitly (e.g., json_encode) to avoid binary corruption.
Enable Verbose Logging
Configure Monolog in config/packages/monolog.yaml:
handlers:
kafka:
type: stream
path: "%kernel.logs_dir%/kafka.log"
level: debug
Check Consumer Lag Monitor lag via Kafka CLI:
kafka-consumer-groups --bootstrap-server localhost:9092 --group user-group --describe
Test Locally with Docker
Use bitnami/kafka for local testing:
# docker-compose.yml
services:
kafka:
image: bitnami/kafka
ports:
- "9092:9092"
Custom Producers/Consumers
Implement KafkaProducerInterface or KafkaConsumerInterface for custom logic.
Middleware for Messages
Add preprocessing/validation via Symfony’s KernelEvents:
// src/EventListener/KafkaListener.php
public function onKernelRequest(GetResponseEvent $event)
{
$payload = $event->getRequest()->attributes->get('kafka_payload');
// Validate/modify $payload
}
Dynamic Topic Routing
Override D1oxyde\KafkaBundle\Producer\KafkaProducer to route messages dynamically:
public function send($topic, $message, array $options = [])
{
$topic = $this->resolveTopic($message);
// ... rest of logic
}
Health Checks Add a health endpoint to verify Kafka connectivity:
use D1oxyde\KafkaBundle\Producer\KafkaProducerInterface;
public function __invoke(Request $request, KafkaProducerInterface $producer)
{
try {
$producer->send('health', 'ping');
return new Response('OK');
} catch (\Exception $e) {
return new Response('FAIL', 500);
}
}
How can I help you explore Laravel packages today?