mateusjunges/laravel-kafka
Laravel Kafka brings a clean Laravel-friendly API for producing and consuming Kafka messages, with an emphasis on developer experience and easier testing. Ideal for integrating Kafka streams and event-driven workflows into your Laravel applications.
Serialization is the process of converting messages to bytes. Deserialization is the inverse process - converting a stream of bytes into and object. In a nutshell, it transforms the content into readable and interpretable information.
<x-sponsors.request-sponsor/>
Basically, in order to prepare the message for transmission from the producer we use serializers. This package supports three serializers out of the box:
If the default JsonSerializer does not fulfill your needs, you can make use of custom serializers.
To create a custom serializer, you need to create a class that implements the \Junges\Kafka\Contracts\MessageSerializer contract. This interface force you to declare the serialize method.
You can inform your producer which serializer should be used with the usingSerializer method:
$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer(new MyCustomSerializer());
To create a custom serializer, you need to create a class that implements the \Junges\Kafka\Contracts\MessageSerializer contract.
This interface force you to declare the serialize method.
To use the AVRO serializer, add the AVRO serializer:
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => 'kafka-schema-registry:9081'])
)
),
new AvroObjectCacheAdapter()
);
$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
);
$serializer = new \Junges\Kafka\Message\Serializers\AvroSerializer($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);
$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer($serializer);
How can I help you explore Laravel packages today?