flix-tech/confluent-schema-registry-api
PHP 7.4+ client for Confluent Schema Registry REST API. Provides low-level PSR-7 request builders and higher-level synchronous/asynchronous APIs, with optional caching and Avro support via flix-tech/avro-php for easier schema management.
Install the package:
composer require flix-tech/confluent-schema-registry-api
Basic synchronous usage (most common for Laravel):
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use GuzzleHttp\Client;
use FlixTech\Avro\AvroSchema;
$client = new Client(['base_uri' => env('SCHEMA_REGISTRY_URL')]);
$registry = new BlockingRegistry(new PromisingRegistry($client));
$schema = AvroSchema::parse('{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}]}');
$schemaId = $registry->register('user-subject', $schema);
First use case: Register a schema for a Kafka topic and retrieve its ID for serialization.
// Register schema (creates subject if missing)
$schemaId = $registry->register('user-subject', $schema);
// Get schema by ID
$schema = $registry->schemaForId($schemaId);
// Get latest version for a subject
$latestVersion = $registry->latestVersion('user-subject');
// Get schema by subject/version
$schema = $registry->schemaForSubjectAndVersion('user-subject', $latestVersion);
// Check if schema is compatible with existing subject
$isCompatible = $registry->isCompatible('user-subject', $schema);
// Cache schema IDs by schema hash (default: md5)
$cachedRegistry = new CachedRegistry(
$registry,
new AvroObjectCacheAdapter(),
fn(AvroSchema $schema) => sha1((string)$schema)
);
// Cache with Doctrine Cache
$cachedRegistry = new CachedRegistry(
$registry,
new DoctrineCacheAdapter(new FilesystemCache('/path/to/cache'))
);
// config/schema-registry.php
return [
'url' => env('SCHEMA_REGISTRY_URL'),
'cache' => env('SCHEMA_REGISTRY_CACHE', 'array'),
];
// app/Providers/SchemaRegistryServiceProvider.php
public function register()
{
$this->app->singleton(SchemaRegistry::class, function ($app) {
$client = new Client(['base_uri' => config('schema-registry.url')]);
$registry = new BlockingRegistry(new PromisingRegistry($client));
return match (config('schema-registry.cache')) {
'doctrine' => new CachedRegistry($registry, new DoctrineCacheAdapter($app['cache.store'])),
default => $registry,
};
});
}
// Listen to schema registration events
SchemaRegistry::register('user-subject', $schema)
->then(fn($schemaId) => event(new SchemaRegistered($schemaId, 'user-subject')))
->otherwise(fn(SchemaRegistryException $e) => report($e));
// app/Jobs/RegisterUserSchema.php
public function handle()
{
$schemaId = app(SchemaRegistry::class)->register(
'user-subject',
AvroSchema::parse($this->schemaJson)
);
// Use $schemaId in Kafka producer
}
Schema Hash Collisions
md5() hashing may cause collisions for large schemas.sha1() or custom hash functions:
$registry = new CachedRegistry($promisingRegistry, new AvroObjectCacheAdapter(), 'sha1');
Subject Not Found Errors
try {
$registry->schemaForSubject('user-subject');
} catch (SchemaNotFoundException $e) {
// Handle missing subject
}
Avro Schema Parsing
AvroException before reaching Schema Registry.$schema = AvroSchema::parse($json);
$schema->validate();
Enable Guzzle Debugging
$client = new Client([
'base_uri' => config('schema-registry.url'),
'debug' => true,
]);
Schema Registry API Logs
/subjects/{subject}/versions endpoint for version history:
$versions = $registry->schemaVersions('user-subject');
Caching Issues
app('cache')->forget("schema_registry_{$subject}");
Custom Cache Adapters
CacheAdapterInterface for Redis/Memcached:
class RedisCacheAdapter implements CacheAdapterInterface {
public function fetch($key) { /* ... */ }
public function save($key, $value, $ttl = null) { /* ... */ }
}
Schema Validation Middleware
$registry = new BlockingRegistry(new PromisingRegistry($client));
$registry->setValidator(function(AvroSchema $schema) {
if (str_contains($schema->toString(), 'deprecated')) {
throw new \RuntimeException('Deprecated fields not allowed');
}
});
Async Error Handling
otherwise() for promise rejections:
$promise->otherwise(function(SchemaRegistryException $e) {
Log::error('Schema registration failed', ['error' => $e->getMessage()]);
});
Batch Operations
$promise = $registry->register('user-subject', $schema1)
->then(fn($id1) => $registry->register('user-subject', $schema2));
Schema ID Caching
$cache->remember("schema_id_{$subject}_{$version}", 3600, fn() =>
$registry->schemaId($subject, $schema)
);
Connection Pooling
$client = new Client(['base_uri' => config('schema-registry.url'), 'timeout' => 5]);
How can I help you explore Laravel packages today?