Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Messenger Kafka Laravel Package

akson/messenger-kafka

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Installation:

    composer require akson/messenger-kafka
    

    For non-Flex projects, add to config/bundles.php:

    return [
        Akson\MessengerKafka\AksonMessengerKafkaBundle::class => ['all' => true],
    ];
    
  2. Basic Configuration (config/packages/messenger.yaml):

    framework:
        messenger:
            transports:
                kafka_producer:
                    dsn: '%env(KAFKA_DSN)%'
                    options:
                        topic:
                            name: 'your_topic'
                        kafka_conf:
                            bootstrap.servers: '%env(KAFKA_BROKERS)%'
    
  3. First Use Case: Dispatch a message via Kafka:

    use Symfony\Component\Messenger\MessageBusInterface;
    
    class YourCommandHandler
    {
        public function __construct(private MessageBusInterface $bus) {}
    
        public function handle(YourCommand $command)
        {
            $this->bus->dispatch(new YourMessage());
        }
    }
    

Implementation Patterns

Producer Workflow

  1. Topic Configuration:

    • Define topics in messenger.yaml under topic.name.
    • Use environment variables for dynamic topic names:
      topic:
          name: '%env(KAFKA_TOPIC)%'
      
  2. Serialization:

    • Implement SerializerInterface for custom formats (e.g., JSON, Avro):
      class JsonSerializer implements SerializerInterface
      {
          public function encode(Envelope $envelope): array
          {
              return ['body' => json_encode($envelope->getMessage())];
          }
      
          public function decode(array $encodedEnvelope): Envelope
          {
              $message = json_decode($encodedEnvelope['body'], true);
              return new Envelope(new YourMessage($message));
          }
      }
      
    • Register serializer in options.serializer.
  3. Error Handling:

    • Use flushRetries and flushTimeout to retry failed sends:
      options:
          flushRetries: 3
          flushTimeout: 5000
      

Consumer Workflow

  1. Consumer Setup:

    • Configure group.id and auto.offset.reset:
      consumer:
          dsn: '%env(KAFKA_DSN)%'
          options:
              kafka_conf:
                  group.id: 'your_consumer_group'
              topic_conf:
                  auto.offset.reset: 'earliest'
      
  2. Async Commit:

    • Enable commitAsync to improve performance:
      options:
          commitAsync: true
      
  3. Message Handling:

    • Use middleware for logging/retries:
      use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
      
      $bus->add(new HandleMessageMiddleware($handler));
      

Integration Tips

  • Environment Variables: Use .env for sensitive configs (e.g., KAFKA_DSN, KAFKA_SASL_USERNAME).
  • Retry Logic: Combine with Symfony’s RetryStrategy for transient failures.
  • Monitoring: Log Kafka metrics (e.g., poll.interval.ms, offset commits) via kafka_conf:
    kafka_conf:
        log_level: '6'  # DEBUG
    

Gotchas and Tips

Pitfalls

  1. Offset Management:

    • Disabling enable.auto.offset.store prevents premature commits but requires manual offset handling.
    • Fix: Use commitAsync and ensure handlers throw exceptions for failed messages.
  2. SSL/SASL Configs:

    • Missing ssl.ca.location or sasl.mechanisms causes silent connection failures.
    • Debug: Check Kafka broker logs for rejected connections.
  3. Serialization Mismatches:

    • Decoding errors crash consumers. Validate serializers with:
      $this->bus->dispatch(new YourMessage(['invalid' => 'data']));
      
    • Tip: Use try-catch in decode() to log malformed messages.
  4. Consumer Lag:

    • High max.poll.interval.ms (default: 300s) causes rebalances. Adjust based on workload:
      kafka_conf:
          max.poll.interval.ms: '60000'  # 1 minute
      

Debugging

  • Enable Debug Logging:
    kafka_conf:
        debug: 'all'
    
  • Check Consumer Offsets: Use Kafka CLI:
    kafka-consumer-groups --bootstrap-server localhost:9092 --group your_consumer_group --describe
    

Extension Points

  1. Custom Middleware: Add Kafka-specific logic (e.g., headers, dead-letter queues):

    $bus->add(new class implements MiddlewareInterface {
        public function handle(Envelope $envelope, HandleTrait $handler): Envelope
        {
            $envelope = $handler->handle($envelope);
            if ($envelope->last(FailedMessage::class)) {
                // Redirect to DLQ
            }
            return $envelope;
        }
    });
    
  2. Dynamic Topics: Override Transport::getTopic() to resolve topics dynamically:

    public function getTopic(Envelope $envelope): string
    {
        return 'dynamic-topic-' . $envelope->getMessage()->getType();
    }
    
  3. Avro Support: Extend SerializerInterface with flix-tech/avro-serde-php:

    use FlixTech\Avro\AvroDecoder;
    
    public function decode(array $encodedEnvelope): Envelope
    {
        $avroDecoder = new AvroDecoder();
        $message = $avroDecoder->decode($encodedEnvelope['body'], YourMessage::class);
        return new Envelope($message);
    }
    

Configuration Quirks

  • DSN Format: Ensure kafka:// or kafka+ssl:// is used. Missing schemes cause InvalidArgumentException.
  • Topic Conf Overrides: topic_conf settings (e.g., auto.offset.reset) must align with consumer group expectations.
  • PHP Extensions: Requires ext-json and ext-sodium (for SASL/SCRAM). Verify with:
    php -m | grep -E 'json|sodium'
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
milito/query-filter
apiboxsym/user-bundle
apiboxsym/health-check-bundle
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui
babelqueue/php-sdk
facebook/capi-param-builder-php
babelqueue/symfony
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours