Install the package:
composer require sts-gaming-group/kafka-bundle
Ensure ext-rdkafka is installed and enabled in your PHP environment.
Configure Kafka:
Create config/packages/sts_gaming_group_kafka.yaml:
sts_gaming_group_kafka:
consumers:
instances:
App\Consumers\ExampleConsumer:
brokers: ['127.0.0.1:9092']
topics: ['example_topic']
group_id: 'example_group'
producers:
instances:
App\Producers\ExampleProducer:
brokers: ['127.0.0.1:9092']
Create a Consumer:
namespace App\Consumers;
use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
use StsGamingGroup\KafkaBundle\Client\Consumer\Message;
use StsGamingGroup\KafkaBundle\RdKafka\Context;
class ExampleConsumer implements ConsumerInterface {
public function consume(Message $message, Context $context): void {
$data = $message->getData();
// Process message
}
public function handleException(\Exception $exception, Context $context): void {}
public function getName(): string { return 'example_consumer'; }
}
Run the Consumer:
bin/console kafka:consumers:consume example_consumer
Message object to access decoded data (default: AvroDecoder).handleException to log or retry failed messages.Message Consumption:
ConsumerInterface for custom logic.Message object to access decoded data (e.g., $message->getData()).Context for retry metadata (e.g., $context->getRetryNo()).Message Production:
ProducerInterface into your service:
use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;
class MyService {
public function __construct(private ProducerInterface $producer) {}
public function sendMessage(string $topic, array $data): void {
$this->producer->produce($topic, $data);
}
}
services.yaml:
services:
App\Producers\ExampleProducer: ~
Event-Driven Architecture:
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent;
class KafkaEventSubscriber implements EventSubscriberInterface {
public static function getSubscribedEvents(): array {
return [
PostMessageConsumedEvent::getEventName('example_consumer') => 'onMessageConsumed',
];
}
public function onMessageConsumed(PostMessageConsumedEvent $event): void {
// Logic here
}
}
ConsumerInterface or ProducerInterface where needed.bin/console kafka:consumers:consume example_consumer --brokers localhost:9093
schema_registry in YAML for Avro decoding:
sts_gaming_group_kafka:
consumers:
instances:
App\Consumers\ExampleConsumer:
schema_registry: 'http://localhost:8081'
Offset Management:
enable.auto.commit: true) may lead to duplicate messages if the process crashes before committing.enable.auto.commit: false) are slower but more reliable. Use async commits for performance:
$this->commitOffset($context, true); // Async commit
Retry Logic:
RecoverableMessageException for retries:
throw new RecoverableMessageException('Failed to process', 0);
max_retries: 3
retry_delay: 500
Decoder/Validator Order:
type() in ValidatorInterface to POST_DENORMALIZE_TYPE for post-validation.Schema Registry:
schema_registry config with AvroDecoder will throw errors. Ensure the URL is correct and accessible.bin/console kafka:consumers:status
PlainDecoder to inspect raw Kafka payloads:
decoder: StsGamingGroup\KafkaBundle\Decoder\PlainDecoder
Custom Decoders:
Implement DecoderInterface for non-standard formats (e.g., Protobuf):
class ProtobufDecoder implements DecoderInterface {
public function decode(ResolvedConfiguration $config, string $message) {
return YourProtobufClass::decode($message);
}
}
Denormalizers: Transform decoded data into DTOs or domain objects:
class UserDenormalizer implements DenormalizerInterface {
public function denormalize($data): User {
return new User($data['id'], $data['name']);
}
}
Producers:
Extend ProducerInterface for custom serialization or batching:
class BatchProducer implements ProducerInterface {
public function produce(string $topic, $data): void {
// Batch logic here
}
}
kafka:consumers:consume commands.max_retries: 0 disables retries).How can I help you explore Laravel packages today?