mateusjunges/confluent-schema-registry-api
PHP 7.4+ client for Confluent Schema Registry REST API. Provides low-level PSR-7 request builders and high-level synchronous/asynchronous abstractions, with optional caching support, to register, fetch, and manage Avro schemas.
Start by installing the package via Composer:
composer require "flix-tech/confluent-schema-registry-api=^9.0"
(Despite the GitHub user mateusjunges, the actual package remains flix-tech/confluent-schema-registry-api.)
Ensure PHP 8+ and Guzzle 7+ are available. The package requires flix-tech/avro-php for parsing Avro schemas—this is essential if you'll be registering or retrieving Avro schemas.
Begin with the synchronous BlockingRegistry, which is simplest for most day-to-day tasks:
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use GuzzleHttp\Client;
$registry = new BlockingRegistry(
new Client(['base_uri' => 'http://schema-registry:8081'])
);
$schema = AvroSchema::parse('{"type": "string"}');
$schemaId = $registry->register('my-topic-value', $schema);
First use case: registering a schema for a Kafka topic’s key/value, or verifying it already exists.
Use BlockingRegistry for most operations: It abstracts async complexity and returns scalar values (e.g., int schema ID) directly. Chain with PromisingRegistry only if you need async support (e.g., Laravel queues or custom async logic).
Leverage caching early: Wrap your registry in CachedRegistry with AvroObjectCacheAdapter to avoid repeated lookups. Perfect for read-heavy workloads (e.g., schema lookup in serializers/deserializers).
$cached = new CachedRegistry(
new BlockingRegistry($client),
new AvroObjectCacheAdapter()
);
Schema validation in producers/consumers: In your app’s Kafka client wrapper, validate schemas before publishing:
try {
$cached->schemaId($subject, $schema);
} catch (SchemaRegistryException $e) {
// Register if missing
$cached->register($subject, $schema);
}
Integrate with Laravel: Create a singleton binding in AppServiceProvider:
$this->app->singleton(\FlixTech\SchemaRegistryApi\Registry\BlockingRegistry::class, function () {
return new BlockingRegistry(
new \GuzzleHttp\Client(['base_uri' => config('kafka.schema_registry.uri')])
);
});
Version control & CI: Use schemaVersion() to compare schema versions across environments before deployment.
$schema must be AvroSchema instance: Passing plain strings or arrays will cause type errors—always parse with AvroSchema::parse().
Caching nuance: CachedRegistry caches by schema content, but subject+schema combinations are not automatically cached together. If you query .schemaId('a', $s) and later .schemaForSubjectAndVersion('b', $v), they may not share cache unless explicitly handled.
Subject naming matters: Kafka topic names like orders often imply subject names orders-key and orders-value. Confirm your naming convention—this package does not auto-suffix.
Exceptions don’t unwrap automatically: In async mode, the promise resolves with either an int or a SchemaRegistryException. Don’t forget to check the type before assuming success.
Version 9.x is a fork update: It updates guzzlehttp/promises to v2.0 and Docker tooling. If upgrading from older versions, test compatibility with your existing async code—especially custom wait() usage.
Debugging tip: Enable Guzzle’s debug option to trace raw API calls:
new Client(['base_uri' => '...', 'debug' => true])
Low-level API for edge cases: Use Requests\Functions::register() when you need to customize headers (e.g., auth tokens, custom Content-Type) not handled by the high-level API.
How can I help you explore Laravel packages today?