arkemlar/kafka-symfony-transport
Installation
composer require arkemlar/kafka-symfony-transport
Ensure ext-rdkafka is enabled in your PHP environment.
Configuration
Add to config/packages/messenger.yaml:
framework:
messenger:
transports:
kafka:
dsn: '%env(KAFKA_DSN)%'
options:
topic: 'your_topic_name'
group_id: 'your_consumer_group'
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
First Use Case Dispatch a message:
use Symfony\Component\Messenger\MessageBusInterface;
$bus->dispatch(new YourMessage());
Start the worker:
php bin/console messenger:consume kafka -vv
Producer Workflow
MessageBus to dispatch messages to Kafka.$message = new YourMessage();
$message->setMetadata(['kafka_topic' => 'dynamic-topic']);
$bus->dispatch($message);
Consumer Workflow
messenger.yaml or dynamically:
transports:
kafka_dynamic:
dsn: '%env(KAFKA_DSN)%'
options:
topic: 'dynamic-topic'
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
public function __invoke(YourMessage $message) {
// Process logic
}
Error Handling
PSR-3 logger (injected into handlers).symfony/messenger’s batching middleware for high-throughput scenarios.Serializable or use Symfony’s Serializer component.symfony/messenger’s async transport for hybrid workflows.RDKafka Extension
ext-rdkafka is compiled with SSL/SASL support if using secure clusters.php -m | grep rdkafka
Topic Configuration
messenger.yaml limits flexibility. Prefer dynamic topic resolution via message metadata.Consumer Groups
group_id requires rebalancing. Use kafka-consumer-groups CLI tool to manage offsets:
kafka-consumer-groups --bootstrap-server localhost:9092 --group your_group --reset-offsets --to-earliest --execute
-vvv for detailed RDKafka logs.symfony/messenger’s debug:message command to inspect dispatched messages:
php bin/console debug:message YourMessage
Custom Transport Factory Override the default factory for advanced configurations:
services:
Symfony\Component\Messenger\Transport\Serialization\SerializerInterface: '@custom_serializer'
Symfony\Component\Messenger\Transport\TransportInterface: '@custom_kafka_transport'
Middleware Integration Add custom middleware to the Kafka transport:
transports:
kafka:
dsn: '%env(KAFKA_DSN)%'
middleware:
- 'your_custom_middleware'
Event Listeners
Subscribe to MessageBusEvents for pre/post-dispatch hooks:
use Symfony\Component\Messenger\Event\SentMessageEvent;
$eventDispatcher->addListener(MessageBusEvents::SENT, function (SentMessageEvent $event) {
if ($event->getEnvelope()->getTransportName() === 'kafka') {
// Custom logic
}
});
How can I help you explore Laravel packages today?