Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Rdkafka Laravel Package

enqueue/rdkafka

Kafka transport for Enqueue using the RdKafka (librdkafka) extension. Implements Queue Interop so you can produce and consume messages via Kafka with Enqueue tooling. Links to docs, support channels, and issue tracker.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Steps to First Use

  1. Install Dependencies:

    composer require enqueue/rdkafka
    pecl install rdkafka
    

    Ensure librdkafka is installed system-wide or via Docker (e.g., confluentinc/cp-kafka).

  2. Configure Laravel: Add to config/queue.php:

    'connections' => [
        'kafka' => [
            'driver' => 'kafka',
            'hosts' => env('KAFKA_HOSTS', 'localhost:9092'),
            'topic' => env('KAFKA_TOPIC', 'laravel-jobs'),
            'group_id' => env('KAFKA_GROUP_ID', 'laravel-consumers'),
        ],
    ],
    
  3. Dispatch a Job:

    use Illuminate\Support\Facades\Queue;
    
    Queue::connection('kafka')->push(new ProcessPodcast);
    
  4. Run Consumer:

    php artisan queue:work --queue=kafka
    

    Or use Enqueue’s CLI:

    enqueue:consume kafka --connection=default
    

First Use Case: Event-Driven Notifications

Replace Laravel’s queue:work for notifications with a Kafka consumer:

// app/Listeners/SendNotification.php
public function handle(UserRegistered $event) {
    Queue::connection('kafka')->push(new SendEmail($event->user));
}

Consume with:

enqueue:consume kafka --connection=default --topic=laravel-jobs

Implementation Patterns

Core Workflows

  1. Producer-Consumer Pattern:

    • Produce: Dispatch jobs via Laravel’s Queue facade or Enqueue’s Producer.
      $producer = app(ProducerInterface::class);
      $producer->send(new Message(json_encode($job), ['topic' => 'orders']));
      
    • Consume: Use Enqueue’s Consumer or Laravel’s queue:work with --queue=kafka.
      $consumer = new KafkaConsumer($producer, ['group.id' => 'order-processors']);
      $consumer->consume(function ($message) {
          $job = unserialize($message->getBody());
          $job->handle();
      });
      
  2. Hybrid Queue Strategy: Combine Kafka with Redis for local jobs:

    $context = new Context();
    $context->addTransport(new KafkaTransport($producer));
    $context->addTransport(new RedisTransport($redis));
    
    $context->createProducer()->send(new Message($job));
    
  3. Partitioned Topics: Route jobs to specific partitions for ordered processing:

    $producer->send(new Message($job), ['partition' => 0]);
    

Integration Tips

  • Laravel Job Serialization: Use serialize()/unserialize() for job payloads or leverage Enqueue’s Serializer:

    $serializer = new JsonSerializer();
    $message = new Message($serializer->serialize($job));
    
  • Error Handling: Wrap consumers in retry logic:

    $consumer->consume(function ($message) {
        try {
            $job->handle();
            $message->ack();
        } catch (Exception $e) {
            $message->nack();
            throw $e;
        }
    });
    
  • Schema Registry: For structured data, integrate with Confluent Schema Registry:

    $producer->setSchemaRegistry('http://schema-registry:8081');
    $producer->send(new AvroMessage($job, 'UserRegistered'));
    
  • Docker Deployment: Use a sidecar container for Kafka consumers:

    # docker-compose.yml
    services:
      app:
        image: laravel-app
      consumer:
        image: php:8.1
        volumes: [./:/app]
        command: enqueue:consume kafka --connection=default
    

Gotchas and Tips

Pitfalls

  1. Offset Management:

    • Problem: Manual offset commits can cause duplicate processing if not handled carefully.
    • Fix: Use auto.offset.reset=earliest in config and ensure ack()/nack() are called in a finally block.
  2. Consumer Group Rebalances:

    • Problem: Kafka rebalances consumers during scaling, leading to missed messages.
    • Fix: Configure session.timeout.ms and heartbeat.interval.ms in rdkafka.conf:
      session.timeout.ms=30000
      heartbeat.interval.ms=10000
      
  3. Serialization Mismatches:

    • Problem: Jobs fail to deserialize due to version changes.
    • Fix: Use versioned payloads or Enqueue’s Serializer with metadata:
      $message = new Message($job, ['headers' => ['version' => '1.0']]);
      
  4. PECL Extension Conflicts:

    • Problem: ext-kafka and rdkafka PECL extensions may clash.
    • Fix: Use pecl install rdkafka and ensure php.ini loads the correct extension:
      extension=rdkafka.so
      
  5. Laravel Middleware Gaps:

    • Problem: Laravel’s QueueMiddleware (e.g., AfterCommitMiddleware) doesn’t apply to Kafka jobs.
    • Fix: Create a custom middleware:
      class KafkaJobMiddleware {
          public function handle($job, Closure $next) {
              DB::beginTransaction();
              try {
                  $result = $next($job);
                  DB::commit();
                  return $result;
              } catch (Exception $e) {
                  DB::rollBack();
                  throw $e;
              }
          }
      }
      

Debugging Tips

  • Enable RdKafka Logging: Set in config/queue.php:

    'debug' => env('KAFKA_DEBUG', 'basic'),
    

    Or via rdkafka.conf:

    debug=all
    metadata.broker.list=localhost:9092
    
  • Check Consumer Lag: Use Kafka’s CLI tools:

    kafka-consumer-groups --bootstrap-server localhost:9092 --group laravel-consumers --describe
    
  • Validate Message Flow: Inspect topics with:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic laravel-jobs --from-beginning
    

Extension Points

  1. Custom Transport: Extend Enqueue\Kafka\KafkaTransport for topic routing:

    class CustomKafkaTransport extends KafkaTransport {
        public function send(Message $message) {
            $topic = $this->getTopicForJob($message->getBody());
            return parent::send($message->withProperty('topic', $topic));
        }
    }
    
  2. Dynamic Topic Selection: Use middleware to route jobs to dynamic topics:

    Queue::connection('kafka')->push(new Job(), ['topic' => 'dynamic-topic']);
    
  3. Schema Evolution: Implement a SchemaRegistryClient interface for Avro/Protobuf:

    interface SchemaRegistryClient {
        public function getSchema(string $subject): string;
    }
    
  4. Monitoring Integration: Add Prometheus metrics via Enqueue’s Metrics extension:

    $producer->setMetrics(new PrometheusMetrics());
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
yosymfony/parser-utils
innmind/black-box
babenkoivan/elastic-migrations
babenkoivan/elastic-adapter
sandermuller/package-boost-php
sandermuller/boost-core
depa/sulu-google-reviews-bundle
croct/plug-symfony
develia/commons
dmstr/symfony-system-resources-bundle
cuci/prototurk-sdk
cuci/prototurk-sdk-symfony
renatomarinho/laravel-page-speed
develia/geo-bundle
austinheap/laravel-database-encryption
dreamzy/livewire-charts
touchestate-sdk/php-sdk
22h/doctrine-garbage-collection-bundle
imbo/imbo-coding-standard
visualbuilder/filament-lottie