idealo/php-rdkafka-ffi
Unmaintained Kafka client for PHP 7.4–8.0 using FFI bindings to librdkafka, compatible with php-rdkafka interfaces. Supports producer (transactions), consumer, admin client, mock cluster for tests, and callback-based error/log handling.
## Getting Started
### Minimal Setup
1. **Installation**
Update Composer dependency to target the new version:
```bash
composer require idealo/php-rdkafka-ffi:^0.6.0
Ensure ext-ffi is enabled in your php.ini and librdkafka 2.1.0+ is installed system-wide:
extension=ffi
Verify compatibility with PHP 8.3/8.4 (newly supported).
First Use Case: Producer (Updated)
use Idealo\RdKafka\Producer;
$producer = new Producer('localhost:9092', [
'transactional.id' => 'my-transactional-producer',
]);
$producer->initTransactions();
$producer->beginTransaction();
$producer->produce('test-topic', 'Hello Kafka!');
$producer->commitTransaction(); // Fixed transaction lifecycle
First Use Case: Consumer (Unchanged)
use Idealo\RdKafka\Consumer;
$consumer = new Consumer('localhost:9092', 'test-group');
$consumer->subscribe(['test-topic']);
while ($message = $consumer->consume()) {
echo $message->value;
}
Key Files (Updated)
src/Producer.php and src/Consumer.php (now compatible with librdkafka 2.1.0+).src/Config.php (updated for PHP 8.3/8.4 type safety).src/Message.php (transactional metadata added).src/Transaction.php (internal helper for fixed transaction lifecycle).Producer Workflow (Updated for Transactions)
$producer = new Producer('broker:9092', [
'transactional.id' => 'laravel-producer',
'queue.buffering.max.messages' => 100000,
]);
$producer->initTransactions();
$producer->beginTransaction();
try {
$producer->produce('orders', json_encode($order));
$producer->produce('notifications', json_encode($notification));
$producer->commitTransaction(); // Atomic commit
} catch (\Exception $e) {
$producer->abortTransaction(); // Fixed error handling
}
$producer->onDelivery(function ($report) {
if ($report->err) {
error_log("Delivery failed: " . $report->errstr);
// Retry logic or dead-letter queue
}
});
Consumer Workflow (Unchanged)
$consumer->assign(['test-topic' => [0, 1]]);
consume() in a loop with error handling:
while (($message = $consumer->consume(1000)) !== null) {
if ($message->err) {
break; // Rebalance or handle error
}
// Process $message->value
$consumer->commit($message);
}
Integration with Laravel (Updated for PHP 8.3/8.4)
$this->app->singleton(Producer::class, function ($app) {
return new Producer(
config('kafka.brokers'),
array_merge(
config('kafka.producer'),
['transactional.id' => 'laravel-' . Str::uuid()]
)
);
});
public function handle(): void {
while (($message = $this->consumer->consume()) !== null) {
// Process message...
}
}
Configuration Management (Updated)
config/kafka.php with PHP 8.3/8.4 defaults:
return [
'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
'producer' => [
'queue.buffering.max.messages' => 100000,
'transactional.id' => env('KAFKA_TRANSACTIONAL_ID', 'default'),
'socket.timeout.ms' => 30000,
],
'consumer' => [
'group.id' => 'laravel-consumer',
'auto.offset.reset' => 'earliest',
'enable.auto.commit' => false,
],
];
FFI and librdkafka Version Mismatch
FFI type error: Invalid memory access or librdkafka version not supported.
Fix: Ensure system librdkafka ≥ 2.1.0 and PHP FFI is recompiled:
pecl install ffi && docker-php-ext-enable ffi
php-rdkafka-ffi:check-ffi CLI tool (if provided) to validate setup.Transactional Producer Lifecycle (Fixed in v0.6.0)
KAFKA_RESP_ERR_TRANSACTIONALID_AUTHORIZATION_FAILED.
Fix: Ensure transactional.id is unique and configured in broker ACLs.
New: Use initTransactions() before beginTransaction():
$producer->initTransactions(); // Required for librdkafka 2.1.0+
$producer->beginTransaction();
PHP 8.3/8.4 Deprecations
Non-static method FFI::new() called statically.
Fix: The package now uses instance methods internally (no user code changes needed).strict_types=1 in composer.json for PHP 8.3+ compatibility.Consumer Rebalancing (Unchanged but Critical)
consume() loops without commits.Consumer group rebalance in progress.
Fix: Implement onRebalance() callback or use assign() for static partitions.Message Serialization (Unchanged but Recommended)
$producer->produce('topic', json_encode($data, JSON_THROW_ON_ERROR));
JsonSerializable interface for complex objects.Enable Logging (Updated for PHP 8.3/8.4) Add to config:
'debug' => 'all',
'log.callback' => function ($level, $facility, $message) {
error_log(sprintf("[Kafka-%s] %s", $facility, $message));
},
Logs appear in stderr or via error_log.
Common Errors (Updated)
KAFKA_RESP_ERR_TRANSACTIONALID_AUTHORIZATION_FAILED:
Verify transactional.id uniqueness and broker ACLs.KAFKA_RESP_ERR__ALL_BROKERS_DOWN:
Check broker connectivity (telnet broker 9092).KAFKA_RESP_ERR_INVALID_MSG:
Validate message keys/values (e.g., null keys may fail in librdkafka 2.1.0+).Metrics (Unchanged but Useful)
Use librdkafka metrics via FFI:
$metrics = $producer->getMetrics();
// Access via $metrics->['outgoing-byte-rate'] etc.
Custom Serializers (Updated for PHP 8.3/8.4) Extend with typed properties:
class JsonProducer extends Producer {
public function __construct(string $brokers, array $config = []) {
parent::__construct($brokers, $config);
}
public function produceJson(string $topic, array $data): void {
parent::produce($topic, json_encode($data, JSON_THROW_ON_ERROR));
}
}
Middleware for Messages (Unchanged) Intercept messages before/after processing:
$consumer->onMessage(function (Message $message) {
$message->value = gzdecode($message->value);
// PHP 8.3+ typed property access:
$message->headers['processed-at'] = now()->toIso8601String();
});
Schema Registry Integration (Unchanged but Recommended)
Combine with confluent-php-client for Avro/Protobuf:
$schema = (new SchemaRegistryClient
How can I help you explore Laravel packages today?