Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Laravel Kafka Laravel Package

mateusjunges/laravel-kafka

Laravel Kafka makes it easy to produce and consume Kafka messages in Laravel with a clean, expressive API and improved testability. Build producers and consumers quickly, integrate with your app workflows, and avoid painful Kafka testing setups.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Installation:

    composer require mateusjunges/laravel-kafka
    

    Ensure rdkafka PHP extension is installed (installation guide).

  2. Configuration: Publish the config file:

    php artisan vendor:publish --provider="Junges\Kafka\KafkaServiceProvider"
    

    Update .env with your Kafka broker(s) and topic configurations.

  3. First Producer Use Case:

    use Junges\Kafka\Facades\Kafka;
    
    Kafka::publish('broker')
        ->onTopic('user.created')
        ->withBody(['user_id' => 123])
        ->send();
    
  4. First Consumer Use Case:

    Kafka::consumer(['user.created'])
        ->withHandler(function ($message) {
            // Process message
            return 0; // Commit offset
        })
        ->build()
        ->consume();
    

Where to Look First

  • Documentation: laravelkafka.com (official docs)
  • Facade Methods: Kafka::publish() and Kafka::consumer() are the primary entry points.
  • Testing: Use Kafka::fake() for unit tests (see Testing section).

Implementation Patterns

Producer Workflows

Synchronous Publishing

Kafka::publish('broker')
    ->onTopic('orders.placed')
    ->withHeaders(['source' => 'web'])
    ->withBody(['order_id' => 456, 'amount' => 99.99])
    ->send();
  • Use Case: Low-throughput systems (e.g., admin actions, critical events).
  • Note: Blocks until message is delivered.

Asynchronous Publishing

Kafka::publish('broker')
    ->onTopic('logs')
    ->withBody(['level' => 'info', 'message' => 'User logged in'])
    ->async()
    ->send();
  • Use Case: High-throughput systems (e.g., logging, analytics).
  • Note: Non-blocking; messages are batched and sent in the background.

Batch Publishing

$producer = Kafka::publish('broker')->onTopic('notifications');
foreach ($users as $user) {
    $producer->withBody(['user_id' => $user->id, 'message' => 'Hello!'])
             ->send();
}
  • Use Case: Bulk operations (e.g., sending notifications to multiple users).
  • Optimization: Reuse the producer instance to avoid connection overhead.

Consumer Workflows

Basic Consumer

Kafka::consumer(['orders.placed'])
    ->withHandler(function ($message) {
        // Process order
        Order::find($message->getBody()['order_id'])->markAsProcessed();
        return 0; // Commit offset
    })
    ->build()
    ->consume();
  • Use Case: Simple event processing (e.g., order fulfillment).

Group-Based Consumer

Kafka::consumer(['orders.placed'])
    ->withGroup('order-processors')
    ->withHandler(function ($message) {
        // Process order
        return 0;
    })
    ->build()
    ->consume();
  • Use Case: Scalable consumers with offset tracking (e.g., microservices).

Custom Deserialization

$consumer = Kafka::consumer(['user.avro'])
    ->usingDeserializer(new AvroDeserializer($registry, $serializer))
    ->withHandler(function ($message) {
        // $message->getBody() is now a decoded Avro object
        return 0;
    })
    ->build();
  • Use Case: Complex data formats (e.g., Avro, Protobuf).

Integration Tips

  1. Laravel Events: Bind Kafka producers to Laravel events for decoupled publishing:

    Event::listen(UserCreated::class, function (UserCreated $event) {
        Kafka::publish('broker')
            ->onTopic('user.created')
            ->withBody($event->user->toArray())
            ->send();
    });
    
  2. Queue Workers: Run consumers as Laravel queue workers:

    php artisan queue:work --queue=kafka
    

    Configure in config/queue.php:

    'connections' => [
        'kafka' => [
            'driver' => 'kafka',
            'broker' => 'broker',
            'topics' => ['orders.placed'],
        ],
    ],
    
  3. Retry Logic: Implement exponential backoff for failed messages:

    ->withHandler(function ($message) {
        try {
            // Process logic
            return 0;
        } catch (\Exception $e) {
            return -1; // Requeue with delay
        }
    }),
    
  4. Monitoring: Use Kafka's built-in metrics or integrate with Laravel Horizon for consumer monitoring.


Gotchas and Tips

Pitfalls

  1. Connection Leaks:

    • Issue: Forgetting to close producers/consumers can exhaust connections.
    • Fix: Use dependency injection or ensure send()/consume() are called in a try-finally block.
    • Example:
      $producer = Kafka::publish('broker')->onTopic('topic');
      try {
          $producer->send();
      } finally {
          $producer->close(); // If supported
      }
      
  2. Offset Management:

    • Issue: Returning 0 commits the offset; returning -1 requeues the message. Incorrect values can lead to lost or duplicate messages.
    • Tip: Document offset behavior in your consumer handlers.
  3. Schema Registry Dependencies:

    • Issue: AVRO/Protobuf deserializers require a running schema registry.
    • Fix: Mock the registry in tests or use a local instance (e.g., confluentinc/cp-schema-registry).
  4. Async Producer Blocking:

    • Issue: Async producers may silently fail if the broker is unreachable.
    • Fix: Implement error handling:
      Kafka::publish('broker')
          ->onTopic('logs')
          ->withBody(['message' => 'Test'])
          ->async()
          ->onError(function ($error) {
              Log::error("Kafka error: " . $error->getMessage());
          })
          ->send();
      
  5. Testing Assumptions:

    • Issue: Kafka::fake() only mocks the producer, not the consumer. Consumers require manual message injection via shouldReceiveMessages().
    • Fix: Use Kafka::fake() for producer tests and shouldReceiveMessages() for consumer tests.

Debugging Tips

  1. Enable Debug Logging: Add to .env:

    KAFKA_LOG_LEVEL=debug
    

    Logs will appear in storage/logs/laravel.log.

  2. Check Producer/Consumer State:

    // Check if a producer is connected
    if (Kafka::publish('broker')->isConnected()) {
        // Proceed
    }
    
  3. Inspect Messages:

    $producer = Kafka::publish('broker')->onTopic('topic');
    $producer->withBody(['debug' => true]);
    $producer->send();
    // Check Kafka UI (e.g., Kafka UI, Conduktor) for the message.
    
  4. Common Errors:

    • "No broker available": Verify KAFKA_BROKERS in .env and rdkafka extension is loaded.
    • "Authentication failed": Check KAFKA_USERNAME/KAFKA_PASSWORD and SASL/SSL configs.
    • "Topic does not exist": Ensure the topic exists or use auto.create.topics.enable=true in config.

Configuration Quirks

  1. Default Config Overrides: The package merges config from:

    • .env (e.g., KAFKA_BROKERS, KAFKA_GROUP_ID)
    • config/kafka.php (e.g., default_producer_options)
    • Per-producer/consumer overrides (e.g., withConfigOptions()).

    Example:

    Kafka::publish('broker')
        ->withConfigOptions(['request.required.acks' => '1'])
        ->onTopic('topic');
    
  2. SSL/TLS: Configure in .env:

    KAFKA_SSL_CA_LOCATION=/path/to/ca.pem
    KAFKA_SSL_CERT_LOCATION=/path/to/client.crt
    KAFKA_SSL_KEY_LOCATION=/path/to/client.key
    

    Or via config:

    'producers' => [
        'broker' => [
            'ssl_ca_location' => '/path/to/ca.pem',
        ],
    ],
    
  3. SASL/SCRAM:

    KAFKA_SASL_MECHANISM=SCRAM-S
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle
atriumphp/atrium
sandermuller/package-boost-laravel
sandermuller/boost-skills
redaxo/core
yusufgenc/filament-api-forge
l3aro/rating-star-for-filament
leek/filament-subtenant-scope