Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Confluent Schema Registry Api Laravel Package

flix-tech/confluent-schema-registry-api

PHP 7.4+ client for Confluent Schema Registry REST API. Provides low-level PSR-7 request builders and higher-level synchronous/asynchronous APIs, with optional caching and Avro support via flix-tech/avro-php for easier schema management.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Install the package:

    composer require flix-tech/confluent-schema-registry-api
    
  2. Basic synchronous usage (most common for Laravel):

    use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
    use GuzzleHttp\Client;
    use FlixTech\Avro\AvroSchema;
    
    $client = new Client(['base_uri' => env('SCHEMA_REGISTRY_URL')]);
    $registry = new BlockingRegistry(new PromisingRegistry($client));
    
    $schema = AvroSchema::parse('{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}]}');
    $schemaId = $registry->register('user-subject', $schema);
    
  3. First use case: Register a schema for a Kafka topic and retrieve its ID for serialization.


Implementation Patterns

Core Workflows

Schema Registration & Retrieval

// Register schema (creates subject if missing)
$schemaId = $registry->register('user-subject', $schema);

// Get schema by ID
$schema = $registry->schemaForId($schemaId);

// Get latest version for a subject
$latestVersion = $registry->latestVersion('user-subject');

// Get schema by subject/version
$schema = $registry->schemaForSubjectAndVersion('user-subject', $latestVersion);

Schema Validation

// Check if schema is compatible with existing subject
$isCompatible = $registry->isCompatible('user-subject', $schema);

Caching Strategies

// Cache schema IDs by schema hash (default: md5)
$cachedRegistry = new CachedRegistry(
    $registry,
    new AvroObjectCacheAdapter(),
    fn(AvroSchema $schema) => sha1((string)$schema)
);

// Cache with Doctrine Cache
$cachedRegistry = new CachedRegistry(
    $registry,
    new DoctrineCacheAdapter(new FilesystemCache('/path/to/cache'))
);

Laravel Integration Patterns

Service Provider Setup

// config/schema-registry.php
return [
    'url' => env('SCHEMA_REGISTRY_URL'),
    'cache' => env('SCHEMA_REGISTRY_CACHE', 'array'),
];

// app/Providers/SchemaRegistryServiceProvider.php
public function register()
{
    $this->app->singleton(SchemaRegistry::class, function ($app) {
        $client = new Client(['base_uri' => config('schema-registry.url')]);

        $registry = new BlockingRegistry(new PromisingRegistry($client));

        return match (config('schema-registry.cache')) {
            'doctrine' => new CachedRegistry($registry, new DoctrineCacheAdapter($app['cache.store'])),
            default => $registry,
        };
    });
}

Event-Driven Schema Management

// Listen to schema registration events
SchemaRegistry::register('user-subject', $schema)
    ->then(fn($schemaId) => event(new SchemaRegistered($schemaId, 'user-subject')))
    ->otherwise(fn(SchemaRegistryException $e) => report($e));

Schema Registry in Jobs/Commands

// app/Jobs/RegisterUserSchema.php
public function handle()
{
    $schemaId = app(SchemaRegistry::class)->register(
        'user-subject',
        AvroSchema::parse($this->schemaJson)
    );

    // Use $schemaId in Kafka producer
}

Gotchas and Tips

Common Pitfalls

  1. Schema Hash Collisions

    • Default md5() hashing may cause collisions for large schemas.
    • Fix: Use sha1() or custom hash functions:
      $registry = new CachedRegistry($promisingRegistry, new AvroObjectCacheAdapter(), 'sha1');
      
  2. Subject Not Found Errors

    • Always check if a subject exists before operations:
      try {
          $registry->schemaForSubject('user-subject');
      } catch (SchemaNotFoundException $e) {
          // Handle missing subject
      }
      
  3. Avro Schema Parsing

    • Invalid Avro schemas throw AvroException before reaching Schema Registry.
    • Tip: Validate schemas locally first:
      $schema = AvroSchema::parse($json);
      $schema->validate();
      

Debugging Tips

  1. Enable Guzzle Debugging

    $client = new Client([
        'base_uri' => config('schema-registry.url'),
        'debug' => true,
    ]);
    
  2. Schema Registry API Logs

    • Check /subjects/{subject}/versions endpoint for version history:
      $versions = $registry->schemaVersions('user-subject');
      
  3. Caching Issues

    • Clear cache when schema definitions change:
      app('cache')->forget("schema_registry_{$subject}");
      

Extension Points

  1. Custom Cache Adapters

    • Implement CacheAdapterInterface for Redis/Memcached:
      class RedisCacheAdapter implements CacheAdapterInterface {
          public function fetch($key) { /* ... */ }
          public function save($key, $value, $ttl = null) { /* ... */ }
      }
      
  2. Schema Validation Middleware

    $registry = new BlockingRegistry(new PromisingRegistry($client));
    $registry->setValidator(function(AvroSchema $schema) {
        if (str_contains($schema->toString(), 'deprecated')) {
            throw new \RuntimeException('Deprecated fields not allowed');
        }
    });
    
  3. Async Error Handling

    • Use otherwise() for promise rejections:
      $promise->otherwise(function(SchemaRegistryException $e) {
          Log::error('Schema registration failed', ['error' => $e->getMessage()]);
      });
      

Performance Considerations

  1. Batch Operations

    • Register multiple schemas in a transaction:
      $promise = $registry->register('user-subject', $schema1)
          ->then(fn($id1) => $registry->register('user-subject', $schema2));
      
  2. Schema ID Caching

    • Cache schema IDs by subject+version for faster lookups:
      $cache->remember("schema_id_{$subject}_{$version}", 3600, fn() =>
          $registry->schemaId($subject, $schema)
      );
      
  3. Connection Pooling

    • Reuse Guzzle client instances:
      $client = new Client(['base_uri' => config('schema-registry.url'), 'timeout' => 5]);
      
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
babenkoivan/elastic-client
innmind/static-analysis
innmind/coding-standard
datacore/hub-sdk
alengo/sulu-http-cache-bundle
develia/commons
cuci/prototurk-sdk
cuci/prototurk-sdk-symfony
develia/geo-bundle
dreamzy/livewire-charts
touchestate-sdk/php-sdk
22h/doctrine-garbage-collection-bundle
imbo/imbo-coding-standard
visualbuilder/filament-lottie
servicioslineaonce/starter-kit
atomcoder/laravel-reorderable
irajul/filament-shadcn-theme
agtp/agtp-php
agtp/mod-php
centraldesktop/protobuf-php