kwn/php-rdkafka-stubs
IDE helper stubs for php-rdkafka (librdkafka) extension. Adds classes, methods and constants for Kafka producers/consumers to improve autocompletion and static analysis in PHP projects without requiring the extension at dev time.
Installation:
composer require --dev kwn/php-rdkafka-stubs
Add to your composer.json under autoload-dev:
"autoload-dev": {
"files": ["vendor/kwn/php-rdkafka-stubs/stubs/rdkafka.php"]
}
Run composer dump-autoload.
Verify Compatibility:
Check your php-rdkafka version with:
php -m | grep rdkafka
Ensure the stubs match your installed version (check composer.json of the stubs package).
First Use Case: Use the stubs with a simple Kafka producer/consumer:
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092');
$producer = new \RdKafka\Producer($conf);
$topic = $producer->newTopic('test-topic');
$topic->produce(0, 0, "Hello Kafka!");
$producer->flush(10000);
Your IDE (PHPStorm/VSCode) should now provide autocompletion and type hints for \RdKafka\* classes.
Producer Workflow:
newTopic() with error handling:
try {
$topic = $producer->newTopic('events');
} catch (\RdKafka\Exception $e) {
log::error("Topic creation failed: " . $e->getMessage());
}
$producer->on('event.log', function (\RdKafka\Event $event) {
if ($event->err) {
error_log("Producer error: " . $event->errstr());
}
});
Consumer Workflow:
newConsumer() with consumer groups:
$conf->set('group.id', 'my-group');
$consumer = new \RdKafka\Consumer($conf);
$consumer->subscribe(['test-topic']);
while ($message = $consumer->consume(1000)) {
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR: /* Valid message */
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$consumer->close();
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
Error Handling:
$producer->on('error', function (\RdKafka\Event $event) {
Sentry\captureException(new \Exception($event->errstr()));
});
$this->app->singleton(\RdKafka\Producer::class, function ($app) {
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list', config('kafka.brokers'));
return new \RdKafka\Producer($conf);
});
config/kafka.php and load them dynamically:
$conf = new \RdKafka\Conf();
foreach (config('kafka.producer') as $key => $value) {
$conf->set($key, $value);
}
php-rdkafka-mock for unit testing:
composer require --dev php-rdkafka-mock
Mock producers/consumers in tests:
$mockProducer = new \RdKafka\Mock\Producer();
$this->app->instance(\RdKafka\Producer::class, $mockProducer);
Stub Version Mismatch:
php-rdkafka version, causing IDE errors.php-rdkafka version in composer.json:
"kwn/php-rdkafka-stubs": "dev-main#0.0.0-20260401"
php -r "var_dump(\RdKafka\Producer::class);" to verify stubs are loaded.IDE-Specific Quirks:
Settings > PHP > Strict types for better stub utilization.php.intellisense.stubs is enabled in settings.json:
"php.intellisense.stubs": ["vendor/kwn/php-rdkafka-stubs/stubs/rdkafka.php"]
Memory Leaks:
close() or use __destruct():
public function __destruct() {
if ($this->producer) {
$this->producer->close();
}
}
Async Callback Delays:
while (true) {
$producer->flush(1000);
sleep(1);
}
$conf->set('debug', 'all');
$conf->set('log_level', LOG_DEBUG);
$producer->on('event', function (\RdKafka\Event $event) {
file_put_contents(
'kafka_debug.log',
sprintf("[%s] %s\n", $event->type, $event->str()),
FILE_APPEND
);
});
getMetadata() to diagnose connection issues:
$metadata = $producer->getMetadata();
if ($metadata->err) {
throw new \Exception("Metadata error: " . $metadata->errstr());
}
Custom Stubs:
Extend stubs for internal classes by creating a stubs/rdkafka-extended.php:
<?php
// Extend RdKafka\Producer with custom methods
class_alias(\RdKafka\Producer, \RdKafka\ProducerExtended);
class RdKafka\ProducerExtended extends \RdKafka\Producer {
public function customMethod() {
return "Extended!";
}
}
Add to autoload-dev in composer.json.
Type-Safe Config: Create a wrapper class for type-safe configuration:
class KafkaConfig {
public static function producer(array $overrides = []): \RdKafka\Conf {
$conf = new \RdKafka\Conf();
$defaults = [
'metadata.broker.list' => 'localhost:9092',
'queue.buffering.max.messages' => 100000,
];
foreach (array_merge($defaults, $overrides) as $key => $value) {
$conf->set($key, $value);
}
return $conf;
}
}
Retry Logic: Implement exponential backoff for transient failures:
function produceWithRetry(\RdKafka\Producer $producer, string $topic, string $message, int $maxRetries = 3) {
$retries = 0;
while ($retries < $maxRetries) {
try {
$producer->newTopic($topic)->produce(0, 0, $message);
return true;
} catch (\Exception $e) {
$retries++;
if ($retries === $maxRetries) throw $e;
usleep(100 * pow(2, $retries));
}
}
return false;
}
How can I help you explore Laravel packages today?