mateusjunges/confluent-schema-registry-api
PHP 7.4+ client for Confluent Schema Registry REST API. Offers low-level PSR-7 request builders plus high-level async/sync abstractions, with optional caching support. Built on Guzzle and Avro PHP for schema handling.
composer require flix-tech/confluent-schema-registry-api
use GuzzleHttp\Client;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
$client = new Client([
'base_uri' => config('services.schema_registry.url'),
'timeout' => 5.0,
]);
$registry = new BlockingRegistry(
new PromisingRegistry($client)
);
use FlixTech\AvroPhp\AvroSchema;
$schema = AvroSchema::parse(file_get_contents('path/to/schema.avsc'));
$schemaId = $registry->register('user-events-value', $schema);
BlockingRegistry (synchronous, Laravel-friendly)PromisingRegistry (asynchronous, for event-driven workflows)CachedRegistry (for performance optimization)AppServiceProvider:
$this->app->singleton('schema.registry', fn() => new BlockingRegistry(
new PromisingRegistry(new Client(['base_uri' => config('services.schema_registry.url')]))
));
public function __construct(private BlockingRegistry $registry) {}
BlockingRegistry in Laravel’s HTTP layer or service classes.public function storeSchema(Request $request, BlockingRegistry $registry) {
$schema = AvroSchema::parse($request->input('schema'));
$schemaId = $registry->register('orders-value', $schema);
return response()->json(['schema_id' => $schemaId]);
}
FormRequest:
public function rules() {
return ['schema' => 'required|json'];
}
public function getSchemaForTopic(string $subject, BlockingRegistry $registry) {
return cache()->remember("schema:{$subject}", now()->addHours(1), function() use ($subject, $registry) {
return $registry->latestSchema($subject);
});
}
schemaCompatibility() to enforce backward/forward compatibility.public function updateSchema(Request $request, BlockingRegistry $registry) {
$currentSchema = $registry->latestSchema('user-events-value');
$newSchema = AvroSchema::parse($request->input('schema'));
if (!$registry->schemaCompatibility('user-events-value', $currentSchema, $newSchema, 'BACKWARD')) {
throw new \RuntimeException('Schema is not backward compatible!');
}
return $registry->register('user-events-value', $newSchema);
}
PromisingRegistry in Laravel queues for non-blocking schema operations.public function handle(RegisterSchemaJob $job, PromisingRegistry $registry) {
$promise = $registry->register($job->subject, $job->schema);
$promise->then(
fn($schemaId) => $job->resolve($schemaId),
fn($exception) => $job->fail($exception)
);
}
config/services.php:
'schema_registry' => [
'url' => env('SCHEMA_REGISTRY_URL', 'http://localhost:8081'),
'cache' => env('SCHEMA_REGISTRY_CACHE', 'array'), // 'array', 'doctrine', or 'simple_cache'
],
$this->app->bind('schema.registry.cache', function($app) {
$config = config('services.schema_registry.cache');
return match($config) {
'doctrine' => new DoctrineCacheAdapter($app->make(\Doctrine\Common\Cache\CacheProvider::class)),
'simple_cache' => new SimpleCacheAdapter($app->make(\Psr\SimpleCache\CacheInterface::class)),
default => new ArrayCacheAdapter(),
};
});
$response = $this->post('/schemas', ['schema' => '{"type": "string"}']);
$response->assertJson(['schema_id' => 1]);
md5 hash for caching schemas may collide. Use a stronger hash (e.g., sha256) if schemas are large or complex:
$registry = new CachedRegistry($promisingRegistry, new AvroObjectCacheAdapter(), fn(AvroSchema $schema) => hash('sha256', (string)$schema));
BlockingRegistry uses wait() internally, which can block Laravel’s request lifecycle. Prefer PromisingRegistry for long-running operations (e.g., background jobs).AvroException. Validate schemas before registration:
try {
$schema = AvroSchema::parse($rawSchema);
} catch (\Exception $e) {
throw new \InvalidArgumentException('Invalid Avro schema', 0, $e);
}
CachedRegistry does not auto-invalidate cache on schema updates. Manually clear cache when needed:
cache()->forget("schema:{$subject}");
$client = new Client([
'base_uri' => config('services.schema_registry.url'),
'middleware' => [
new \GuzzleHttp\Middleware::tap(function ($request) {
\Log::debug('Schema Registry Request', ['url' => (string)$request->getUri(), 'method' => $request->getMethod()]);
}),
],
]);
4xx/5xx errors (e.g., schema.registry.log in Docker setups).CacheAdapterInterface for Laravel’s cache:
use FlixTech\SchemaRegistryApi\Registry\Cache\CacheAdapterInterface;
use Illuminate\Contracts\Cache\Store;
class LaravelCacheAdapter implements CacheAdapterInterface {
public function __construct(private Store $cache) {}
public function fetch($key) { /* ... */ }
public function save($key, $value, $ttl) { /* ... */ }
public function delete($key) { /* ... */ }
}
public function handle($request, Closure $next) {
$schema = AvroSchema::parse($request->input('schema'));
if (!$schema->isValid()) {
abort(422, 'Invalid schema');
}
return $next($request);
}
$registry->register($subject, $schema)->then(
fn($schemaId) => event(new SchemaRegistered($subject, $schemaId))
);
PromisingRegistry operations in queues may hit Laravel’s queue.worker.timeout (default: 60s). Increase if needed:
'queue' => [
'worker' => [
'timeout' => 300, // 5 minutes
],
],
BlockingRegistry to PromisingRegistry explicitly:
$this->app->bind(BlockingRegistry::class, fn($app) => new BlockingRegistry(
$app->make(PromisingRegistry::class)
));
$mockRegistry = Mockery::mock(BlockingRegistry::class);
$mockRegistry->shouldReceive('register')->andReturn
How can I help you explore Laravel packages today?