Installation:
composer require akson/messenger-kafka
For non-Flex projects, add to config/bundles.php:
return [
Akson\MessengerKafka\AksonMessengerKafkaBundle::class => ['all' => true],
];
Basic Configuration (config/packages/messenger.yaml):
framework:
messenger:
transports:
kafka_producer:
dsn: '%env(KAFKA_DSN)%'
options:
topic:
name: 'your_topic'
kafka_conf:
bootstrap.servers: '%env(KAFKA_BROKERS)%'
First Use Case: Dispatch a message via Kafka:
use Symfony\Component\Messenger\MessageBusInterface;
class YourCommandHandler
{
public function __construct(private MessageBusInterface $bus) {}
public function handle(YourCommand $command)
{
$this->bus->dispatch(new YourMessage());
}
}
Topic Configuration:
messenger.yaml under topic.name.topic:
name: '%env(KAFKA_TOPIC)%'
Serialization:
SerializerInterface for custom formats (e.g., JSON, Avro):
class JsonSerializer implements SerializerInterface
{
public function encode(Envelope $envelope): array
{
return ['body' => json_encode($envelope->getMessage())];
}
public function decode(array $encodedEnvelope): Envelope
{
$message = json_decode($encodedEnvelope['body'], true);
return new Envelope(new YourMessage($message));
}
}
options.serializer.Error Handling:
flushRetries and flushTimeout to retry failed sends:
options:
flushRetries: 3
flushTimeout: 5000
Consumer Setup:
group.id and auto.offset.reset:
consumer:
dsn: '%env(KAFKA_DSN)%'
options:
kafka_conf:
group.id: 'your_consumer_group'
topic_conf:
auto.offset.reset: 'earliest'
Async Commit:
commitAsync to improve performance:
options:
commitAsync: true
Message Handling:
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
$bus->add(new HandleMessageMiddleware($handler));
.env for sensitive configs (e.g., KAFKA_DSN, KAFKA_SASL_USERNAME).RetryStrategy for transient failures.poll.interval.ms, offset commits) via kafka_conf:
kafka_conf:
log_level: '6' # DEBUG
Offset Management:
enable.auto.offset.store prevents premature commits but requires manual offset handling.commitAsync and ensure handlers throw exceptions for failed messages.SSL/SASL Configs:
ssl.ca.location or sasl.mechanisms causes silent connection failures.Serialization Mismatches:
$this->bus->dispatch(new YourMessage(['invalid' => 'data']));
try-catch in decode() to log malformed messages.Consumer Lag:
max.poll.interval.ms (default: 300s) causes rebalances. Adjust based on workload:
kafka_conf:
max.poll.interval.ms: '60000' # 1 minute
kafka_conf:
debug: 'all'
kafka-consumer-groups --bootstrap-server localhost:9092 --group your_consumer_group --describe
Custom Middleware: Add Kafka-specific logic (e.g., headers, dead-letter queues):
$bus->add(new class implements MiddlewareInterface {
public function handle(Envelope $envelope, HandleTrait $handler): Envelope
{
$envelope = $handler->handle($envelope);
if ($envelope->last(FailedMessage::class)) {
// Redirect to DLQ
}
return $envelope;
}
});
Dynamic Topics:
Override Transport::getTopic() to resolve topics dynamically:
public function getTopic(Envelope $envelope): string
{
return 'dynamic-topic-' . $envelope->getMessage()->getType();
}
Avro Support:
Extend SerializerInterface with flix-tech/avro-serde-php:
use FlixTech\Avro\AvroDecoder;
public function decode(array $encodedEnvelope): Envelope
{
$avroDecoder = new AvroDecoder();
$message = $avroDecoder->decode($encodedEnvelope['body'], YourMessage::class);
return new Envelope($message);
}
kafka:// or kafka+ssl:// is used. Missing schemes cause InvalidArgumentException.topic_conf settings (e.g., auto.offset.reset) must align with consumer group expectations.ext-json and ext-sodium (for SASL/SCRAM). Verify with:
php -m | grep -E 'json|sodium'
How can I help you explore Laravel packages today?