Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Php Rdkafka Ffi Laravel Package

idealo/php-rdkafka-ffi

Unmaintained Kafka client for PHP 7.4–8.0 using FFI bindings to librdkafka, compatible with php-rdkafka interfaces. Supports producer (transactions), consumer, admin client, mock cluster for tests, and callback-based error/log handling.

View on GitHub
Deep Wiki
Context7
## Getting Started

### Minimal Setup
1. **Installation**
   Update Composer dependency to target the new version:
   ```bash
   composer require idealo/php-rdkafka-ffi:^0.6.0

Ensure ext-ffi is enabled in your php.ini and librdkafka 2.1.0+ is installed system-wide:

extension=ffi

Verify compatibility with PHP 8.3/8.4 (newly supported).

  1. First Use Case: Producer (Updated)

    use Idealo\RdKafka\Producer;
    
    $producer = new Producer('localhost:9092', [
        'transactional.id' => 'my-transactional-producer',
    ]);
    $producer->initTransactions();
    $producer->beginTransaction();
    $producer->produce('test-topic', 'Hello Kafka!');
    $producer->commitTransaction(); // Fixed transaction lifecycle
    
  2. First Use Case: Consumer (Unchanged)

    use Idealo\RdKafka\Consumer;
    
    $consumer = new Consumer('localhost:9092', 'test-group');
    $consumer->subscribe(['test-topic']);
    while ($message = $consumer->consume()) {
        echo $message->value;
    }
    
  3. Key Files (Updated)

    • src/Producer.php and src/Consumer.php (now compatible with librdkafka 2.1.0+).
    • src/Config.php (updated for PHP 8.3/8.4 type safety).
    • src/Message.php (transactional metadata added).
    • New: src/Transaction.php (internal helper for fixed transaction lifecycle).

Implementation Patterns

Workflows

  1. Producer Workflow (Updated for Transactions)

    • Transactional Producer:
      $producer = new Producer('broker:9092', [
          'transactional.id' => 'laravel-producer',
          'queue.buffering.max.messages' => 100000,
      ]);
      $producer->initTransactions();
      $producer->beginTransaction();
      try {
          $producer->produce('orders', json_encode($order));
          $producer->produce('notifications', json_encode($notification));
          $producer->commitTransaction(); // Atomic commit
      } catch (\Exception $e) {
          $producer->abortTransaction(); // Fixed error handling
      }
      
    • Async Delivery with Callbacks:
      $producer->onDelivery(function ($report) {
          if ($report->err) {
              error_log("Delivery failed: " . $report->errstr);
              // Retry logic or dead-letter queue
          }
      });
      
  2. Consumer Workflow (Unchanged)

    • Assign partitions explicitly for ordered processing:
      $consumer->assign(['test-topic' => [0, 1]]);
      
    • Use consume() in a loop with error handling:
      while (($message = $consumer->consume(1000)) !== null) {
          if ($message->err) {
              break; // Rebalance or handle error
          }
          // Process $message->value
          $consumer->commit($message);
      }
      
  3. Integration with Laravel (Updated for PHP 8.3/8.4)

    • Service Provider Binding (Type-safe):
      $this->app->singleton(Producer::class, function ($app) {
          return new Producer(
              config('kafka.brokers'),
              array_merge(
                  config('kafka.producer'),
                  ['transactional.id' => 'laravel-' . Str::uuid()]
              )
          );
      });
      
    • Queue Worker: Use the consumer in a Laravel queue worker with PHP 8.4’s typed properties:
      public function handle(): void {
          while (($message = $this->consumer->consume()) !== null) {
              // Process message...
          }
      }
      
  4. Configuration Management (Updated)

    • Centralize configs in config/kafka.php with PHP 8.3/8.4 defaults:
      return [
          'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
          'producer' => [
              'queue.buffering.max.messages' => 100000,
              'transactional.id' => env('KAFKA_TRANSACTIONAL_ID', 'default'),
              'socket.timeout.ms' => 30000,
          ],
          'consumer' => [
              'group.id' => 'laravel-consumer',
              'auto.offset.reset' => 'earliest',
              'enable.auto.commit' => false,
          ],
      ];
      

Gotchas and Tips

Pitfalls

  1. FFI and librdkafka Version Mismatch

    • Error: FFI type error: Invalid memory access or librdkafka version not supported. Fix: Ensure system librdkafka2.1.0 and PHP FFI is recompiled:
      pecl install ffi && docker-php-ext-enable ffi
      
    • Tip: Use php-rdkafka-ffi:check-ffi CLI tool (if provided) to validate setup.
  2. Transactional Producer Lifecycle (Fixed in v0.6.0)

    • Error: KAFKA_RESP_ERR_TRANSACTIONALID_AUTHORIZATION_FAILED. Fix: Ensure transactional.id is unique and configured in broker ACLs. New: Use initTransactions() before beginTransaction():
      $producer->initTransactions(); // Required for librdkafka 2.1.0+
      $producer->beginTransaction();
      
  3. PHP 8.3/8.4 Deprecations

    • Error: Non-static method FFI::new() called statically. Fix: The package now uses instance methods internally (no user code changes needed).
    • Tip: Enable strict_types=1 in composer.json for PHP 8.3+ compatibility.
  4. Consumer Rebalancing (Unchanged but Critical)

    • Avoid long-running consume() loops without commits.
    • Error: Consumer group rebalance in progress. Fix: Implement onRebalance() callback or use assign() for static partitions.
  5. Message Serialization (Unchanged but Recommended)

    • The package expects raw strings. For JSON/other formats:
      $producer->produce('topic', json_encode($data, JSON_THROW_ON_ERROR));
      
    • Tip: Use JsonSerializable interface for complex objects.

Debugging

  1. Enable Logging (Updated for PHP 8.3/8.4) Add to config:

    'debug' => 'all',
    'log.callback' => function ($level, $facility, $message) {
        error_log(sprintf("[Kafka-%s] %s", $facility, $message));
    },
    

    Logs appear in stderr or via error_log.

  2. Common Errors (Updated)

    • KAFKA_RESP_ERR_TRANSACTIONALID_AUTHORIZATION_FAILED: Verify transactional.id uniqueness and broker ACLs.
    • KAFKA_RESP_ERR__ALL_BROKERS_DOWN: Check broker connectivity (telnet broker 9092).
    • KAFKA_RESP_ERR_INVALID_MSG: Validate message keys/values (e.g., null keys may fail in librdkafka 2.1.0+).
  3. Metrics (Unchanged but Useful) Use librdkafka metrics via FFI:

    $metrics = $producer->getMetrics();
    // Access via $metrics->['outgoing-byte-rate'] etc.
    

Extension Points

  1. Custom Serializers (Updated for PHP 8.3/8.4) Extend with typed properties:

    class JsonProducer extends Producer {
        public function __construct(string $brokers, array $config = []) {
            parent::__construct($brokers, $config);
        }
    
        public function produceJson(string $topic, array $data): void {
            parent::produce($topic, json_encode($data, JSON_THROW_ON_ERROR));
        }
    }
    
  2. Middleware for Messages (Unchanged) Intercept messages before/after processing:

    $consumer->onMessage(function (Message $message) {
        $message->value = gzdecode($message->value);
        // PHP 8.3+ typed property access:
        $message->headers['processed-at'] = now()->toIso8601String();
    });
    
  3. Schema Registry Integration (Unchanged but Recommended) Combine with confluent-php-client for Avro/Protobuf:

    $schema = (new SchemaRegistryClient
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
emuniq/filament-browser-notifications
syriable/filament-translator
hungnm28/livewire-form
wenprise/eloquent
crudly/encrypted
fadion/bouncy
cuci/prototurk-sdk
gos/pubsub-router-bundle
cuci/prototurk-sdk-symfony
clementtalleu/easyadmin-markdown-bundle
codeflextech/permission-manager
karnoweb/livewire-datepicker
sayedenam/sayed-dashboard
milito/query-filter
apiboxsym/user-bundle
apiboxsym/health-check-bundle
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui