Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Kafka Bundle Laravel Package

d1oxyde/kafka-bundle

Laravel-friendly Kafka integration bundle providing configuration, producer/consumer helpers, and streamlined setup for working with Apache Kafka in PHP applications. Suitable for event-driven apps needing message publishing and processing.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Install the Bundle

    composer require d1oxyde/kafka-bundle
    

    Ensure ext-rdkafka and enqueue/rdkafka are installed (PHP 7.1+).

  2. Configure the Bundle Add to config/bundles.php:

    return [
        // ...
        D1oxyde\KafkaBundle\D1oxydeKafkaBundle::class => ['all' => true],
    ];
    
  3. Basic Configuration Define Kafka brokers in config/packages/d1oxyde_kafka.yaml:

    d1oxyde_kafka:
        clients:
            default:
                hosts: ['localhost:9092']
                topic: 'test_topic'
    
  4. First Use Case: Producing a Message Inject the KafkaProducer service and publish:

    use D1oxyde\KafkaBundle\Producer\KafkaProducerInterface;
    
    class MyService
    {
        public function __construct(private KafkaProducerInterface $producer) {}
    
        public function sendMessage(string $message): void
        {
            $this->producer->send('default', $message);
        }
    }
    

Implementation Patterns

Producer Workflows

  1. Synchronous Publishing Use KafkaProducerInterface for fire-and-forget:

    $this->producer->send('default', json_encode(['event' => 'user.created']));
    
  2. Asynchronous Publishing with Callbacks Leverage KafkaAsyncProducer for non-blocking operations:

    $this->asyncProducer->send('default', $payload, function ($error, $result) {
        if ($error) {
            // Handle failure
        }
    });
    
  3. Partitioning and Keys Control message routing via keys:

    $this->producer->send('default', $payload, ['partition' => 0, 'key' => 'user:123']);
    

Consumer Workflows

  1. Basic Consumer Setup Define a consumer class:

    use D1oxyde\KafkaBundle\Consumer\KafkaConsumerInterface;
    
    class UserConsumer implements KafkaConsumerInterface
    {
        public function consume($message): void
        {
            $data = json_decode($message, true);
            // Process $data
        }
    }
    
  2. Registering Consumers Configure in config/packages/d1oxyde_kafka.yaml:

    d1oxyde_kafka:
        consumers:
            user_consumer:
                class: App\Consumer\UserConsumer
                topics: ['user_events']
                group_id: 'user-group'
    
  3. Handling Offsets Manually commit offsets if needed:

    $this->consumer->commitOffsets();
    

Integration Tips

  • Symfony Messenger Bridge Use Enqueue\SymfonyBridge to integrate with Symfony Messenger:

    # config/packages/messenger.yaml
    messenger:
        transports:
            kafka: '%env(KAFKA_DSN)%'
        routing:
            'App\Message\UserCreated': kafka
    
  • Retry Logic Implement exponential backoff for transient failures:

    $this->producer->send($topic, $payload, ['retry' => 3, 'retry_delay' => 1000]);
    
  • Schema Registry (Avro/Protobuf) Extend the bundle to support schema validation by wrapping payloads:

    $this->producer->send('default', $serializer->serialize($payload, 'json'));
    

Gotchas and Tips

Common Pitfalls

  1. Missing ext-rdkafka Ensure the PHP extension is installed and enabled. Verify with:

    php -m | grep rdkafka
    
  2. Topic Auto-Creation The bundle does not auto-create topics. Pre-create topics via:

    kafka-topics --create --topic test_topic --bootstrap-server localhost:9092
    
  3. Consumer Group Conflicts Ensure group_id is unique per consumer instance to avoid offset conflicts.

  4. Serialization Mismatches Always serialize payloads explicitly (e.g., json_encode) to avoid binary corruption.

Debugging Tips

  • Enable Verbose Logging Configure Monolog in config/packages/monolog.yaml:

    handlers:
        kafka:
            type: stream
            path: "%kernel.logs_dir%/kafka.log"
            level: debug
    
  • Check Consumer Lag Monitor lag via Kafka CLI:

    kafka-consumer-groups --bootstrap-server localhost:9092 --group user-group --describe
    
  • Test Locally with Docker Use bitnami/kafka for local testing:

    # docker-compose.yml
    services:
        kafka:
            image: bitnami/kafka
            ports:
                - "9092:9092"
    

Extension Points

  1. Custom Producers/Consumers Implement KafkaProducerInterface or KafkaConsumerInterface for custom logic.

  2. Middleware for Messages Add preprocessing/validation via Symfony’s KernelEvents:

    // src/EventListener/KafkaListener.php
    public function onKernelRequest(GetResponseEvent $event)
    {
        $payload = $event->getRequest()->attributes->get('kafka_payload');
        // Validate/modify $payload
    }
    
  3. Dynamic Topic Routing Override D1oxyde\KafkaBundle\Producer\KafkaProducer to route messages dynamically:

    public function send($topic, $message, array $options = [])
    {
        $topic = $this->resolveTopic($message);
        // ... rest of logic
    }
    
  4. Health Checks Add a health endpoint to verify Kafka connectivity:

    use D1oxyde\KafkaBundle\Producer\KafkaProducerInterface;
    
    public function __invoke(Request $request, KafkaProducerInterface $producer)
    {
        try {
            $producer->send('health', 'ping');
            return new Response('OK');
        } catch (\Exception $e) {
            return new Response('FAIL', 500);
        }
    }
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui
babelqueue/php-sdk
facebook/capi-param-builder-php
babelqueue/symfony
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle