Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Laravel Kafka Laravel Package

mateusjunges/laravel-kafka

Laravel Kafka makes it easy to produce and consume Kafka messages in Laravel with a clean, expressive API and improved testability. Build producers and consumers quickly, integrate with your app workflows, and avoid painful Kafka testing setups.

View on GitHub
Deep Wiki
Context7

title: Custom serializers weight: 4

Serialization is the process of converting messages to bytes. Deserialization is the inverse process - converting a stream of bytes into and object. In a nutshell, it transforms the content into readable and interpretable information.

<x-sponsors.request-sponsor/>

Basically, in order to prepare the message for transmission from the producer we use serializers. This package supports three serializers out of the box:

  • NullSerializer / NullDeserializer
  • JsonSerializer / JsonDeserializer
  • AvroSerializer / JsonDeserializer

If the default JsonSerializer does not fulfill your needs, you can make use of custom serializers.

To create a custom serializer, you need to create a class that implements the \Junges\Kafka\Contracts\MessageSerializer contract. This interface force you to declare the serialize method.

You can inform your producer which serializer should be used with the usingSerializer method:

$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer(new MyCustomSerializer());

To create a custom serializer, you need to create a class that implements the \Junges\Kafka\Contracts\MessageSerializer contract. This interface force you to declare the serialize method.

Using AVRO serializer

To use the AVRO serializer, add the AVRO serializer:

use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;

$cachedRegistry = new CachedRegistry(
    new BlockingRegistry(
        new PromisingRegistry(
            new Client(['base_uri' => 'kafka-schema-registry:9081'])
        )
    ),
    new AvroObjectCacheAdapter()
);

$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);

//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
    'test-topic',
    new \Junges\Kafka\Message\KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
    'test-topic',
    new \Junges\Kafka\Message\KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
);

$serializer = new \Junges\Kafka\Message\Serializers\AvroSerializer($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);

$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer($serializer);
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle
atriumphp/atrium
sandermuller/package-boost-laravel
sandermuller/boost-skills
redaxo/core
yusufgenc/filament-api-forge
l3aro/rating-star-for-filament
leek/filament-subtenant-scope
anil/file-picker
broqit/fields-ai