enqueue/rdkafka
Kafka transport for Enqueue using the RdKafka (librdkafka) extension. Implements Queue Interop so you can produce and consume messages via Kafka with Enqueue tooling. Links to docs, support channels, and issue tracker.
Install Dependencies:
composer require enqueue/rdkafka
pecl install rdkafka
Ensure librdkafka is installed system-wide or via Docker (e.g., confluentinc/cp-kafka).
Configure Laravel:
Add to config/queue.php:
'connections' => [
'kafka' => [
'driver' => 'kafka',
'hosts' => env('KAFKA_HOSTS', 'localhost:9092'),
'topic' => env('KAFKA_TOPIC', 'laravel-jobs'),
'group_id' => env('KAFKA_GROUP_ID', 'laravel-consumers'),
],
],
Dispatch a Job:
use Illuminate\Support\Facades\Queue;
Queue::connection('kafka')->push(new ProcessPodcast);
Run Consumer:
php artisan queue:work --queue=kafka
Or use Enqueue’s CLI:
enqueue:consume kafka --connection=default
Replace Laravel’s queue:work for notifications with a Kafka consumer:
// app/Listeners/SendNotification.php
public function handle(UserRegistered $event) {
Queue::connection('kafka')->push(new SendEmail($event->user));
}
Consume with:
enqueue:consume kafka --connection=default --topic=laravel-jobs
Producer-Consumer Pattern:
Queue facade or Enqueue’s Producer.
$producer = app(ProducerInterface::class);
$producer->send(new Message(json_encode($job), ['topic' => 'orders']));
Consumer or Laravel’s queue:work with --queue=kafka.
$consumer = new KafkaConsumer($producer, ['group.id' => 'order-processors']);
$consumer->consume(function ($message) {
$job = unserialize($message->getBody());
$job->handle();
});
Hybrid Queue Strategy: Combine Kafka with Redis for local jobs:
$context = new Context();
$context->addTransport(new KafkaTransport($producer));
$context->addTransport(new RedisTransport($redis));
$context->createProducer()->send(new Message($job));
Partitioned Topics: Route jobs to specific partitions for ordered processing:
$producer->send(new Message($job), ['partition' => 0]);
Laravel Job Serialization:
Use serialize()/unserialize() for job payloads or leverage Enqueue’s Serializer:
$serializer = new JsonSerializer();
$message = new Message($serializer->serialize($job));
Error Handling: Wrap consumers in retry logic:
$consumer->consume(function ($message) {
try {
$job->handle();
$message->ack();
} catch (Exception $e) {
$message->nack();
throw $e;
}
});
Schema Registry: For structured data, integrate with Confluent Schema Registry:
$producer->setSchemaRegistry('http://schema-registry:8081');
$producer->send(new AvroMessage($job, 'UserRegistered'));
Docker Deployment: Use a sidecar container for Kafka consumers:
# docker-compose.yml
services:
app:
image: laravel-app
consumer:
image: php:8.1
volumes: [./:/app]
command: enqueue:consume kafka --connection=default
Offset Management:
auto.offset.reset=earliest in config and ensure ack()/nack() are called in a finally block.Consumer Group Rebalances:
session.timeout.ms and heartbeat.interval.ms in rdkafka.conf:
session.timeout.ms=30000
heartbeat.interval.ms=10000
Serialization Mismatches:
Serializer with metadata:
$message = new Message($job, ['headers' => ['version' => '1.0']]);
PECL Extension Conflicts:
ext-kafka and rdkafka PECL extensions may clash.pecl install rdkafka and ensure php.ini loads the correct extension:
extension=rdkafka.so
Laravel Middleware Gaps:
QueueMiddleware (e.g., AfterCommitMiddleware) doesn’t apply to Kafka jobs.class KafkaJobMiddleware {
public function handle($job, Closure $next) {
DB::beginTransaction();
try {
$result = $next($job);
DB::commit();
return $result;
} catch (Exception $e) {
DB::rollBack();
throw $e;
}
}
}
Enable RdKafka Logging:
Set in config/queue.php:
'debug' => env('KAFKA_DEBUG', 'basic'),
Or via rdkafka.conf:
debug=all
metadata.broker.list=localhost:9092
Check Consumer Lag: Use Kafka’s CLI tools:
kafka-consumer-groups --bootstrap-server localhost:9092 --group laravel-consumers --describe
Validate Message Flow: Inspect topics with:
kafka-console-consumer --bootstrap-server localhost:9092 --topic laravel-jobs --from-beginning
Custom Transport:
Extend Enqueue\Kafka\KafkaTransport for topic routing:
class CustomKafkaTransport extends KafkaTransport {
public function send(Message $message) {
$topic = $this->getTopicForJob($message->getBody());
return parent::send($message->withProperty('topic', $topic));
}
}
Dynamic Topic Selection: Use middleware to route jobs to dynamic topics:
Queue::connection('kafka')->push(new Job(), ['topic' => 'dynamic-topic']);
Schema Evolution:
Implement a SchemaRegistryClient interface for Avro/Protobuf:
interface SchemaRegistryClient {
public function getSchema(string $subject): string;
}
Monitoring Integration:
Add Prometheus metrics via Enqueue’s Metrics extension:
$producer->setMetrics(new PrometheusMetrics());
How can I help you explore Laravel packages today?