mateusjunges/laravel-kafka
Laravel Kafka makes it easy to produce and consume Kafka messages in Laravel with a clean, expressive API and improved testability. Build producers and consumers quickly, integrate with your app workflows, and avoid painful Kafka testing setups.
No breaking changes. Notable additions:
Junges\Kafka\Contracts\ContextAware will now have their context forwarded as headers when messages are sent to the DLQ. See the configuring consumer options docs for details.withFlushCallback() on the producer builder to be notified when async messages are flushed.[@internal](https://github.com/internal) annotations from public interfaces and traits, making them safe to implement/use in userland code.MessageBatch, sendBatch, produceBatch). Use Kafka::asyncPublish() instead for better performanceenableBatching(), withBatchSizeLimit(), withBatchReleaseInterval()). Process messages individually in your consumer handlerBatchMessageConsumer, HandlesBatchConfiguration, BatchConfig, NullBatchConfig, CallableBatchConsumer, etc.BatchMessagePublished, MessageBatchPublished, PublishingMessageBatchThe only breaking change in this version was the change in the Junges\Kafka\Contracts\Handler contract signature.
The handle method now requires a second parameter of type Junges\Kafka\Contracts\MessageConsumer.
Here's the updated signature:
class MyHandler implements Handler {
- public function __invoke(ConsumerMessage $message): void {
+ public function __invoke(ConsumerMessage $message, MessageConsumer $consumer): void {
// Process message here...
}
}
If you are handling your messages using a closure, no changes are needed as the closure signature already supports the second parameter.
\Junges\Kafka\Contracts\CanProduceMessages contract was renamed to \Junges\Kafka\Contracts\MessageProducer\Junges\Kafka\Contracts\KafkaProducerMessage contract was renamed to \Junges\Kafka\Contracts\ProducerMessage\Junges\Kafka\Contracts\CanConsumeMessages was renamed to \Junges\Kafka\Contracts\MessageConsumer\Junges\Kafka\Contracts\KafkaConsumerMessage was renamed to \Junges\Kafka\Contracts\ConsumerMessage\Junges\Kafka\Contracts\CanPublishMessagesToKafka contract was removed.\Junges\Kafka\Contracts\CanConsumeMessagesFromKafka was removed.\Junges\Kafka\Contracts\CanConsumeBatchMessages contract was renamed to \Junges\Kafka\Contracts\BatchMessageConsumer\Junges\Kafka\Contracts\CanConsumeMessages contract was renamed to \Junges\Kafka\Contracts\MessageConsumer\Junges\Kafka\Contracts\Manager used by \Junges\Kafka\Factory classwithSasl method signature was changed.The withSasl method now accepts all SASL parameters instead of a Sasl object.
public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT');
In v2 handler functions and handler classes require a \Junges\Kafka\Contracts\MessageConsumer as a second argument.
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
- ->withHandler(function(ConsumerMessage $message) {
+ ->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) {
//
})
createConsumer methodThe Kafka::createConsumer method has been renamed to just consumer
publishOn methodThe Kafka::publishOn method has been renamed to publish, and it does not accept the $topics parameter anymore.
Please chain a call to onTopic to specify in which topic the message should be published.
\Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic-name');
onStopConsuming callbacksTo set onStopConsuming callbacks you need to define them while building the consumer, instead of after calling the build method as in v1.13.x:
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
+ ->onStopConsuming(static function () {
+ // Do something when the consumer stop consuming messages
+ })
->build()
- ->onStopConsuming(static function () {
- // Do something when the consumer stop consuming messages
- })
PHP 8.2 Required
This package now requires PHP 8.2 or higher.
You can use tools such as rector to upgrade your app to PHP 8.2.
How can I help you explore Laravel packages today?