Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Event Driven Kafka Messenger Transport Laravel Package

alvarorosado/event-driven-kafka-messenger-transport

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Install the package:

    composer require alvarorosado/event-driven-kafka-messenger-transport
    
  2. Configure Kafka DSN in .env:

    KAFKA_DSN=ed+kafka://localhost:9092
    
  3. Basic Messenger configuration (config/packages/messenger.yaml):

    framework:
      messenger:
        transports:
          kafka_events:
            dsn: '%env(KAFKA_DSN)%'
            options:
              topics: ['user_events']
        routing:
          'App\Message\UserRegistered': kafka_events
    
  4. Create a message class:

    namespace App\Message;
    
    class UserRegistered
    {
        public function __construct(public string $userId) {}
    }
    
  5. Dispatch a message (e.g., in a controller):

    use App\Message\UserRegistered;
    use Symfony\Component\Messenger\MessageBusInterface;
    
    public function registerUser(MessageBusInterface $bus, string $userId)
    {
        $bus->dispatch(new UserRegistered($userId));
    }
    

First Use Case: Simple Event Publishing

  • Dispatch a message to Kafka with zero additional configuration.
  • The transport automatically serializes the message and sends it to the specified topic (user_events in this case).

Implementation Patterns

1. Basic Workflow

Produce → Consume Cycle:

# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      kafka_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
    routing:
      'App\Message\UserRegistered': kafka_events
      'App\Message\UserUpdated': kafka_events

Consumer Setup (e.g., in config/packages/framework.yaml):

framework:
  messenger:
    buses:
      async_bus:
        middleware:
          - 'App\Middleware\HandleKafkaMessages'

Middleware Example:

namespace App\Middleware;

use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Envelope;

class HandleKafkaMessages implements MiddlewareInterface
{
    public function __construct(private MiddlewareInterface $next) {}

    public function handle(Envelope $envelope, callable $proc): Envelope
    {
        return $this->next->handle($envelope, $proc);
    }
}

2. Advanced JSON Serialization

Enable JSON mode and implement KafkaIdentifierStamp:

# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      kafka_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
          json_serialization:
            enabled: true

Message Class with Stamp:

namespace App\Message;

use App\Transport\Stamp\KafkaIdentifierStamp;

class UserRegistered
{
    public function __construct(public string $userId) {}

    public function identifier(): string
    {
        return 'user_registered';
    }
}

Hook Implementation (required for JSON mode):

namespace App\Transport\Hook;

use App\Transport\Stamp\KafkaIdentifierStamp;
use Symfony\Component\Messenger\Envelope;

class KafkaHook implements KafkaTransportHookInterface
{
    public function beforeProduce(Envelope $envelope): Envelope
    {
        $message = $envelope->getMessage();
        if (method_exists($message, 'identifier')) {
            $envelope->with(new KafkaIdentifierStamp($message->identifier()));
        }
        return $envelope;
    }

    // Implement other required methods...
}

3. Selective Consumption

Filter messages by type in a multi-event topic:

framework:
  messenger:
    transports:
      kafka_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
          json_serialization:
            enabled: true
          consumer:
            routing:
              - name: 'user_registered'
                class: 'App\Message\UserRegistered'
              - name: 'user_updated'
                class: 'App\Message\UserUpdated'
              # 'user_deleted' is ignored

Consumer Logic:

#[AsMessageHandler]
public function handleUserRegistered(UserRegistered $message)
{
    // Process only registered users
}

#[AsMessageHandler]
public function handleUserUpdated(UserUpdated $message)
{
    // Process only updated users
}

4. Multi-Topic Configuration

Route different message types to different topics:

framework:
  messenger:
    transports:
      user_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
          json_serialization:
            enabled: true
      audit_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['audit_events']
          json_serialization:
            enabled: true
    routing:
      'App\Message\UserRegistered': user_events
      'App\Message\AuditLog': audit_events

5. Partitioning with Keys

Ensure related messages land on the same partition:

namespace App\Message;

use App\Transport\Stamp\KafkaKeyStamp;

class UserRegistered
{
    public function __construct(public string $userId) {}

    public function identifier(): string
    {
        return 'user_registered';
    }

    public function key(): string
    {
        return $this->userId; // Partition by userId
    }
}

Hook Integration:

public function beforeProduce(Envelope $envelope): Envelope
{
    $message = $envelope->getMessage();
    if (method_exists($message, 'key')) {
        $envelope->with(new KafkaKeyStamp($message->key()));
    }
    return $envelope;
}

6. Custom Headers for Metadata

Attach metadata to messages:

namespace App\Message;

use App\Transport\Stamp\KafkaCustomHeadersStamp;

class UserRegistered
{
    public function __construct(
        public string $userId,
        public string $tenantId
    ) {}

    public function identifier(): string
    {
        return 'user_registered';
    }
}

Hook Implementation:

public function beforeProduce(Envelope $envelope): Envelope
{
    $message = $envelope->getMessage();
    if ($message instanceof UserRegistered) {
        $envelope->with(
            new KafkaCustomHeadersStamp([
                'tenant_id' => $message->tenantId,
                'source' => 'web'
            ])
        );
    }
    return $envelope;
}

7. Consumer Group Isolation

Isolate consumer groups by environment:

framework:
  messenger:
    transports:
      kafka_events:
        dsn: '%env(KAFKA_DSN)%'
        options:
          topics: ['user_events']
          consumer:
            config:
              group.id: '%env(APP_ENV)%-user-service'

Gotchas and Tips

1. JSON Serialization Pitfalls

  • Missing identifier() method: If json_serialization.enabled: true but messages lack identifier(), the transport silently fails to deserialize. Fix: Ensure all Kafka messages implement identifier() or disable JSON mode.

  • Circular references in JSON: Default Symfony serializer may fail on circular references. Fix: Use a custom serializer (e.g., ObjectNormalizer with ignore_attributes):

    $normalizer = new ObjectNormalizer([
        'ignore_attributes' => ['someCircularProperty'],
    ]);
    
  • DateTime serialization: Default JSON mode may serialize DateTime as ISO strings. For custom formats:

    json_serialization:
      enabled: true
      custom_serializer: 'App\Serializer\CustomDateSerializer'
    

2. Consumer Configuration Quirks

  • auto.offset.reset behavior:

    • earliest: Reprocess all messages from the start (default).
    • latest: Skip existing messages (only new ones).
    • Tip: Use latest in production to avoid reprocessing old messages during consumer restarts.
  • Commit strategy:

    • commit_async: true (default) improves performance but may lose messages on crashes.
    • Tip: Set commit_async: false for critical systems where message loss is unacceptable.
  • Group ID collisions:

    • Reusing the same group.id across environments (e.g., dev and prod) causes message duplication.
    • Fix: Include environment in group IDs:
      group.id: '%env(APP_ENV)%-user-service'
      

3. Hook System Caveats

  • Hook autodetection: The transport auto-discovers classes implementing KafkaTransportHookInterface but does not inject dependencies. Fix: Use Symfony’s autowiring or manually bind hooks:

    services:
      App\Transport\Hook\KafkaHook:
        tags: ['messenger.hook']
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
directorytree/privacy-filter-classifier
directorytree/privacy-filter
datacore/hub-sdk
develia/commons
cuci/prototurk-sdk
cuci/prototurk-sdk-symfony
develia/geo-bundle
dreamzy/livewire-charts
touchestate-sdk/php-sdk
22h/doctrine-garbage-collection-bundle
agtp/agtp-php
agtp/mod-php
splash/sonata-admin
splash/metadata
splash/openapi
splash/scopes
splash/toolkit
testo/output-teamcity
testo/bridge-symfony
spatie/flare-daemon-runtime