mateusjunges/laravel-kafka
Laravel Kafka brings a clean Laravel-friendly API for producing and consuming Kafka messages, with an emphasis on developer experience and easier testing. Ideal for integrating Kafka streams and event-driven workflows into your Laravel applications.
No breaking changes. Notable additions:
Junges\Kafka\Contracts\ContextAware will now have their context forwarded as headers when messages are sent to the DLQ. See the configuring consumer options docs for details.withFlushCallback() on the producer builder to be notified when async messages are flushed.[@internal](https://github.com/internal) annotations from public interfaces and traits, making them safe to implement/use in userland code.MessageBatch, sendBatch, produceBatch). Use Kafka::asyncPublish() instead for better performanceenableBatching(), withBatchSizeLimit(), withBatchReleaseInterval()). Process messages individually in your consumer handlerBatchMessageConsumer, HandlesBatchConfiguration, BatchConfig, NullBatchConfig, CallableBatchConsumer, etc.BatchMessagePublished, MessageBatchPublished, PublishingMessageBatchThe only breaking change in this version was the change in the Junges\Kafka\Contracts\Handler contract signature.
The handle method now requires a second parameter of type Junges\Kafka\Contracts\MessageConsumer.
Here's the updated signature:
class MyHandler implements Handler {
- public function __invoke(ConsumerMessage $message): void {
+ public function __invoke(ConsumerMessage $message, MessageConsumer $consumer): void {
// Process message here...
}
}
If you are handling your messages using a closure, no changes are needed as the closure signature already supports the second parameter.
\Junges\Kafka\Contracts\CanProduceMessages contract was renamed to \Junges\Kafka\Contracts\MessageProducer\Junges\Kafka\Contracts\KafkaProducerMessage contract was renamed to \Junges\Kafka\Contracts\ProducerMessage\Junges\Kafka\Contracts\CanConsumeMessages was renamed to \Junges\Kafka\Contracts\MessageConsumer\Junges\Kafka\Contracts\KafkaConsumerMessage was renamed to \Junges\Kafka\Contracts\ConsumerMessage\Junges\Kafka\Contracts\CanPublishMessagesToKafka contract was removed.\Junges\Kafka\Contracts\CanConsumeMessagesFromKafka was removed.\Junges\Kafka\Contracts\CanConsumeBatchMessages contract was renamed to \Junges\Kafka\Contracts\BatchMessageConsumer\Junges\Kafka\Contracts\CanConsumeMessages contract was renamed to \Junges\Kafka\Contracts\MessageConsumer\Junges\Kafka\Contracts\Manager used by \Junges\Kafka\Factory classwithSasl method signature was changed.The withSasl method now accepts all SASL parameters instead of a Sasl object.
public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT');
In v2 handler functions and handler classes require a \Junges\Kafka\Contracts\MessageConsumer as a second argument.
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
- ->withHandler(function(ConsumerMessage $message) {
+ ->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) {
//
})
createConsumer methodThe Kafka::createConsumer method has been renamed to just consumer
publishOn methodThe Kafka::publishOn method has been renamed to publish, and it does not accept the $topics parameter anymore.
Please chain a call to onTopic to specify in which topic the message should be published.
\Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic-name');
onStopConsuming callbacksTo set onStopConsuming callbacks you need to define them while building the consumer, instead of after calling the build method as in v1.13.x:
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
+ ->onStopConsuming(static function () {
+ // Do something when the consumer stop consuming messages
+ })
->build()
- ->onStopConsuming(static function () {
- // Do something when the consumer stop consuming messages
- })
PHP 8.2 Required
This package now requires PHP 8.2 or higher.
You can use tools such as rector to upgrade your app to PHP 8.2.
How can I help you explore Laravel packages today?