alvarorosado/event-driven-kafka-messenger-transport
Install the package:
composer require alvarorosado/event-driven-kafka-messenger-transport
Configure Kafka DSN in .env:
KAFKA_DSN=ed+kafka://localhost:9092
Basic Messenger configuration (config/packages/messenger.yaml):
framework:
messenger:
transports:
kafka_events:
dsn: '%env(KAFKA_DSN)%'
options:
topics: ['user_events']
routing:
'App\Message\UserRegistered': kafka_events
Create a message class:
namespace App\Message;
class UserRegistered
{
public function __construct(public string $userId) {}
}
Dispatch a message (e.g., in a controller):
use App\Message\UserRegistered;
use Symfony\Component\Messenger\MessageBusInterface;
public function registerUser(MessageBusInterface $bus, string $userId)
{
$bus->dispatch(new UserRegistered($userId));
}
user_events in this case).Produce → Consume Cycle:
# config/packages/messenger.yaml
framework:
messenger:
transports:
kafka_events:
dsn: '%env(KAFKA_DSN)%'
options:
topics: ['user_events']
routing:
'App\Message\UserRegistered': kafka_events
'App\Message\UserUpdated': kafka_events
Consumer Setup (e.g., in config/packages/framework.yaml):
framework:
messenger:
buses:
async_bus:
middleware:
- 'App\Middleware\HandleKafkaMessages'
Middleware Example:
namespace App\Middleware;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Envelope;
class HandleKafkaMessages implements MiddlewareInterface
{
public function __construct(private MiddlewareInterface $next) {}
public function handle(Envelope $envelope, callable $proc): Envelope
{
return $this->next->handle($envelope, $proc);
}
}
Enable JSON mode and implement KafkaIdentifierStamp:
# config/packages/messenger.yaml
framework:
messenger:
transports:
kafka_events:
dsn: '%env(KAFKA_DSN)%'
options:
topics: ['user_events']
json_serialization:
enabled: true
Message Class with Stamp:
namespace App\Message;
use App\Transport\Stamp\KafkaIdentifierStamp;
class UserRegistered
{
public function __construct(public string $userId) {}
public function identifier(): string
{
return 'user_registered';
}
}
Hook Implementation (required for JSON mode):
namespace App\Transport\Hook;
use App\Transport\Stamp\KafkaIdentifierStamp;
use Symfony\Component\Messenger\Envelope;
class KafkaHook implements KafkaTransportHookInterface
{
public function beforeProduce(Envelope $envelope): Envelope
{
$message = $envelope->getMessage();
if (method_exists($message, 'identifier')) {
$envelope->with(new KafkaIdentifierStamp($message->identifier()));
}
return $envelope;
}
// Implement other required methods...
}
Filter messages by type in a multi-event topic:
framework:
messenger:
transports:
kafka_events:
dsn: '%env(KAFKA_DSN)%'
options:
topics: ['user_events']
json_serialization:
enabled: true
consumer:
routing:
- name: 'user_registered'
class: 'App\Message\UserRegistered'
- name: 'user_updated'
class: 'App\Message\UserUpdated'
# 'user_deleted' is ignored
Consumer Logic:
#[AsMessageHandler]
public function handleUserRegistered(UserRegistered $message)
{
// Process only registered users
}
#[AsMessageHandler]
public function handleUserUpdated(UserUpdated $message)
{
// Process only updated users
}
Route different message types to different topics:
framework:
messenger:
transports:
user_events:
dsn: '%env(KAFKA_DSN)%'
options:
topics: ['user_events']
json_serialization:
enabled: true
audit_events:
dsn: '%env(KAFKA_DSN)%'
options:
topics: ['audit_events']
json_serialization:
enabled: true
routing:
'App\Message\UserRegistered': user_events
'App\Message\AuditLog': audit_events
Ensure related messages land on the same partition:
namespace App\Message;
use App\Transport\Stamp\KafkaKeyStamp;
class UserRegistered
{
public function __construct(public string $userId) {}
public function identifier(): string
{
return 'user_registered';
}
public function key(): string
{
return $this->userId; // Partition by userId
}
}
Hook Integration:
public function beforeProduce(Envelope $envelope): Envelope
{
$message = $envelope->getMessage();
if (method_exists($message, 'key')) {
$envelope->with(new KafkaKeyStamp($message->key()));
}
return $envelope;
}
Attach metadata to messages:
namespace App\Message;
use App\Transport\Stamp\KafkaCustomHeadersStamp;
class UserRegistered
{
public function __construct(
public string $userId,
public string $tenantId
) {}
public function identifier(): string
{
return 'user_registered';
}
}
Hook Implementation:
public function beforeProduce(Envelope $envelope): Envelope
{
$message = $envelope->getMessage();
if ($message instanceof UserRegistered) {
$envelope->with(
new KafkaCustomHeadersStamp([
'tenant_id' => $message->tenantId,
'source' => 'web'
])
);
}
return $envelope;
}
Isolate consumer groups by environment:
framework:
messenger:
transports:
kafka_events:
dsn: '%env(KAFKA_DSN)%'
options:
topics: ['user_events']
consumer:
config:
group.id: '%env(APP_ENV)%-user-service'
Missing identifier() method:
If json_serialization.enabled: true but messages lack identifier(), the transport silently fails to deserialize.
Fix: Ensure all Kafka messages implement identifier() or disable JSON mode.
Circular references in JSON:
Default Symfony serializer may fail on circular references.
Fix: Use a custom serializer (e.g., ObjectNormalizer with ignore_attributes):
$normalizer = new ObjectNormalizer([
'ignore_attributes' => ['someCircularProperty'],
]);
DateTime serialization:
Default JSON mode may serialize DateTime as ISO strings. For custom formats:
json_serialization:
enabled: true
custom_serializer: 'App\Serializer\CustomDateSerializer'
auto.offset.reset behavior:
earliest: Reprocess all messages from the start (default).latest: Skip existing messages (only new ones).latest in production to avoid reprocessing old messages during consumer restarts.Commit strategy:
commit_async: true (default) improves performance but may lose messages on crashes.commit_async: false for critical systems where message loss is unacceptable.Group ID collisions:
group.id across environments (e.g., dev and prod) causes message duplication.group.id: '%env(APP_ENV)%-user-service'
Hook autodetection:
The transport auto-discovers classes implementing KafkaTransportHookInterface but does not inject dependencies.
Fix: Use Symfony’s autowiring or manually bind hooks:
services:
App\Transport\Hook\KafkaHook:
tags: ['messenger.hook']
How can I help you explore Laravel packages today?