Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Kafka Bundle Laravel Package

atcliff/kafka-bundle

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Install the package:

    composer require sts-gaming-group/kafka-bundle
    

    Ensure ext-rdkafka is installed and enabled in your PHP environment.

  2. Configure Kafka: Create config/packages/sts_gaming_group_kafka.yaml:

    sts_gaming_group_kafka:
      consumers:
        instances:
          App\Consumers\ExampleConsumer:
            brokers: ['127.0.0.1:9092']
            topics: ['example_topic']
            group_id: 'example_group'
      producers:
        instances:
          App\Producers\ExampleProducer:
            brokers: ['127.0.0.1:9092']
    
  3. Create a Consumer:

    namespace App\Consumers;
    use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
    use StsGamingGroup\KafkaBundle\Client\Consumer\Message;
    use StsGamingGroup\KafkaBundle\RdKafka\Context;
    
    class ExampleConsumer implements ConsumerInterface {
        public function consume(Message $message, Context $context): void {
            $data = $message->getData();
            // Process message
        }
        public function handleException(\Exception $exception, Context $context): void {}
        public function getName(): string { return 'example_consumer'; }
    }
    
  4. Run the Consumer:

    bin/console kafka:consumers:consume example_consumer
    

First Use Case: Consuming Messages

  • Use the Message object to access decoded data (default: AvroDecoder).
  • Handle exceptions in handleException to log or retry failed messages.

Implementation Patterns

Workflows

  1. Message Consumption:

    • Implement ConsumerInterface for custom logic.
    • Use Message object to access decoded data (e.g., $message->getData()).
    • Leverage Context for retry metadata (e.g., $context->getRetryNo()).
  2. Message Production:

    • Inject ProducerInterface into your service:
      use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;
      class MyService {
          public function __construct(private ProducerInterface $producer) {}
          public function sendMessage(string $topic, array $data): void {
              $this->producer->produce($topic, $data);
          }
      }
      
    • Register the producer in services.yaml:
      services:
        App\Producers\ExampleProducer: ~
      
  3. Event-Driven Architecture:

    • Subscribe to events for pre/post-processing:
      use Symfony\Component\EventDispatcher\EventSubscriberInterface;
      use StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent;
      
      class KafkaEventSubscriber implements EventSubscriberInterface {
          public static function getSubscribedEvents(): array {
              return [
                  PostMessageConsumedEvent::getEventName('example_consumer') => 'onMessageConsumed',
              ];
          }
          public function onMessageConsumed(PostMessageConsumedEvent $event): void {
              // Logic here
          }
      }
      

Integration Tips

  • Dependency Injection: Use Symfony’s DI to inject ConsumerInterface or ProducerInterface where needed.
  • Configuration Overrides: Override config via CLI for testing:
    bin/console kafka:consumers:consume example_consumer --brokers localhost:9093
    
  • Schema Registry: Configure schema_registry in YAML for Avro decoding:
    sts_gaming_group_kafka:
      consumers:
        instances:
          App\Consumers\ExampleConsumer:
            schema_registry: 'http://localhost:8081'
    

Gotchas and Tips

Pitfalls

  1. Offset Management:

    • Auto-commit (enable.auto.commit: true) may lead to duplicate messages if the process crashes before committing.
    • Manual commits (enable.auto.commit: false) are slower but more reliable. Use async commits for performance:
      $this->commitOffset($context, true); // Async commit
      
  2. Retry Logic:

    • Uncaught exceptions terminate the consumer. Use RecoverableMessageException for retries:
      throw new RecoverableMessageException('Failed to process', 0);
      
    • Configure retry delays in YAML:
      max_retries: 3
      retry_delay: 500
      
  3. Decoder/Validator Order:

    • Validators run before denormalization by default. Change type() in ValidatorInterface to POST_DENORMALIZE_TYPE for post-validation.
  4. Schema Registry:

    • Missing schema_registry config with AvroDecoder will throw errors. Ensure the URL is correct and accessible.

Debugging

  • Check Consumer Status:
    bin/console kafka:consumers:status
    
  • Log Raw Messages: Use PlainDecoder to inspect raw Kafka payloads:
    decoder: StsGamingGroup\KafkaBundle\Decoder\PlainDecoder
    

Extension Points

  1. Custom Decoders: Implement DecoderInterface for non-standard formats (e.g., Protobuf):

    class ProtobufDecoder implements DecoderInterface {
        public function decode(ResolvedConfiguration $config, string $message) {
            return YourProtobufClass::decode($message);
        }
    }
    
  2. Denormalizers: Transform decoded data into DTOs or domain objects:

    class UserDenormalizer implements DenormalizerInterface {
        public function denormalize($data): User {
            return new User($data['id'], $data['name']);
        }
    }
    
  3. Producers: Extend ProducerInterface for custom serialization or batching:

    class BatchProducer implements ProducerInterface {
        public function produce(string $topic, $data): void {
            // Batch logic here
        }
    }
    

Configuration Quirks

  • CLI Overrides: CLI args override YAML config only for kafka:consumers:consume commands.
  • Default Values: Missing config uses defaults (e.g., max_retries: 0 disables retries).
  • Topic Validation: Ensure topics exist in Kafka before running consumers.
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
anousss007/vigilance
supportpal/eloquent-model
ardenexal/fhir-models
laravel-at/laravel-image-sanitize
romalytar/yammi-audit-log-laravel
ardenexal/fhir-validation
arshaviras/weather-widget
laravel-chronicle/core
sunchayn/nimbus
daikazu/eloquent-salesforce-objects
unseen-codes/chat
romalytar/yammi-jobs-monitoring-laravel
kisame76/filament-db-table-state
nqxcode/laravel-lucene-search
dpfx/laravel-livewire-wizards
workos/workos-php-laravel
sofa/laravel-global-scope
nawasara/auth-primitives
adhocrat-io/arkhe-main
make-dev/orca-harpoon