Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Laravel Kafka Laravel Package

mateusjunges/laravel-kafka

Laravel Kafka brings a clean Laravel-friendly API for producing and consuming Kafka messages, with an emphasis on developer experience and easier testing. Ideal for integrating Kafka streams and event-driven workflows into your Laravel applications.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Installation:

    composer require mateusjunges/laravel-kafka
    

    Publish the config file:

    php artisan vendor:publish --provider="Junges\Kafka\KafkaServiceProvider"
    
  2. Configure Kafka Connection: Edit .env with your Kafka broker details:

    KAFKA_BROKERS=localhost:9092
    KAFKA_CONSUMER_GROUP=your-group-name
    
  3. First Use Case: Publish a message:

    use Junges\Kafka\Facades\Kafka;
    
    Kafka::publish('broker')
        ->onTopic('user.created')
        ->withBody(['user_id' => 1])
        ->send();
    

Where to Look First

  • Documentation: laravelkafka.com
  • Facade: Junges\Kafka\Facades\Kafka (primary entry point)
  • Testing Helpers: Built-in fake assertions for unit testing

Implementation Patterns

Producer Workflows

  1. Basic Message Publishing:

    Kafka::publish('broker')
        ->onTopic('events.user.created')
        ->withHeaders(['source' => 'api'])
        ->withBody(['user_id' => 123])
        ->send();
    
  2. Async Publishing with Queues: Dispatch a job to publish messages:

    KafkaPublishJob::dispatch('broker', 'topic', ['data' => 'value']);
    
  3. Schema Registry Integration:

    Kafka::publish('broker')
        ->onTopic('users')
        ->withSchema('UserSchema')
        ->withBody($user)
        ->send();
    

Consumer Workflows

  1. Basic Consumer Setup:

    Kafka::consumer(['orders.placed'])
        ->withHandler(function ($message) {
            // Process message
            return 0; // Commit offset
        })
        ->build()
        ->consume();
    
  2. Consumer with Error Handling:

    Kafka::consumer(['payments.processed'])
        ->withHandler(function ($message) {
            try {
                // Process logic
            } catch (\Exception $e) {
                return -1; // Requeue message
            }
            return 0;
        })
        ->build()
        ->consume();
    
  3. Consumer Groups:

    Kafka::consumer(['notifications.sent'], 'notifications-group')
        ->withHandler(fn($message) => /* ... */)
        ->build()
        ->consume();
    

Integration Tips

  • Laravel Events: Bind Kafka publishing to Laravel events:

    event(new UserCreated($user));
    // In EventServiceProvider:
    UserCreated::dispatch(function ($event) {
        Kafka::publish('broker')
            ->onTopic('user.created')
            ->withBody($event->user)
            ->send();
    });
    
  • Queue Workers: Run consumers as Laravel queue workers:

    php artisan queue:work --queue=kafka
    
  • Middleware: Add Kafka-specific middleware for request/response handling.


Gotchas and Tips

Common Pitfalls

  1. Connection Issues:

    • Ensure KAFKA_BROKERS in .env matches your broker's advertised listeners.
    • For Docker setups, use broker:29092 (internal) or localhost:9092 (external).
  2. Serialization Errors:

    • Kafka expects binary data. Use withBody() with arrays or JSON strings:
      ->withBody(json_encode(['key' => 'value']))
      
  3. Consumer Lag:

    • Monitor offset commits. Return 0 to commit, -1 to requeue, or >0 to skip.
  4. Schema Registry:

    • If using Avro/Protobuf, ensure schemas are registered before publishing.

Debugging Tips

  1. Enable Logging:

    KAFKA_LOG_LEVEL=debug
    
  2. Check Published Messages:

    Kafka::fake();
    Kafka::publish('broker')->onTopic('test')->withBody(['test' => true])->send();
    Kafka::assertPublished(); // Verify message was sent
    
  3. Consumer Debugging:

    • Use shouldReceiveMessages() in tests to simulate message flow:
      Kafka::fake();
      Kafka::shouldReceiveMessages([new ConsumedMessage(/* ... */)]);
      

Configuration Quirks

  1. Default Broker:

    • The publish() method accepts a broker name. Ensure your config/kafka.php defines it:
      'brokers' => [
          'broker' => [
              'hosts' => env('KAFKA_BROKERS', 'localhost:9092'),
          ],
      ],
      
  2. SSL/TLS:

    • Configure in .env:
      KAFKA_SSL_ENABLED=true
      KAFKA_SSL_CA_FILE=/path/to/ca.pem
      
  3. SASL Authentication:

    KAFKA_SASL_ENABLED=true
    KAFKA_SASL_MECHANISM=PLAIN
    KAFKA_SASL_USERNAME=user
    KAFKA_SASL_PASSWORD=pass
    

Extension Points

  1. Custom Producers/Consumers:

    • Implement Junges\Kafka\Contracts\Producer or Junges\Kafka\Contracts\Consumer interfaces.
  2. Middleware:

    • Add middleware to the Kafka facade for pre/post-processing:
      Kafka::extend('custom', function ($app) {
          return new CustomKafkaProducer($app);
      });
      
  3. Testing Extensions:

    • Extend fake assertions for custom message validation:
      Kafka::assertPublished(function ($message) {
          return $message->getBody()['status'] === 'processed';
      });
      
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
davejamesmiller/laravel-breadcrumbs
artisanry/parsedown
christhompsontldr/phpsdk
enqueue/dsn
bunny/bunny
enqueue/test
enqueue/null
enqueue/amqp-tools
milesj/emojibase
bower-asset/punycode
bower-asset/inputmask
bower-asset/jquery
bower-asset/yii2-pjax
laravel/nova
spatie/laravel-mailcoach
spatie/laravel-superseeder
laravel/liferaft
nst/json-test-suite
danielmiessler/sec-lists
jackalope/jackalope-transport