symfony/redis-messenger
Redis transport integration for Symfony Messenger, enabling queueing and async message handling backed by Redis. Part of the Symfony ecosystem, with links to contributing, issue reporting, and pull requests in the main Symfony repository.
Install Dependencies:
composer require symfony/messenger redis symfony/redis-messenger
pecl install redis # Ensure PHP Redis extension is installed
Configure Redis Connection:
Add Redis DSN to .env:
MESSENGER_TRANSPORT_DSN=redis://localhost:6379
Or for Sentinel/Cluster:
MESSENGER_TRANSPORT_DSN=redis+sentinel://user:pass@sentinel1:26379,sentinel2:26379/?master=master-set
Register Messenger in Laravel:
In config/app.php, add:
'providers' => [
// ...
Symfony\Component\Messenger\Bridge\Laravel\MessengerServiceProvider::class,
],
Publish config:
php artisan vendor:publish --provider="Symfony\Component\Messenger\Bridge\Laravel\MessengerServiceProvider"
Define a Message and Handler:
// app/Messages/SendNotification.php
namespace App\Messages;
class SendNotification { /* payload */ }
// app/Handlers/SendNotificationHandler.php
namespace App\Handlers;
use App\Messages\SendNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class SendNotificationHandler {
public function __invoke(SendNotification $message) {
// Logic here
}
}
Dispatch a Message:
use Symfony\Component\Messenger\MessageBusInterface;
$bus = app(MessageBusInterface::class);
$bus->dispatch(new SendNotification($data));
Run Worker:
php artisan messenger:consume async -vv
Laravel Integration:
Use Symfony’s MessageBus via Laravel’s service container:
$bus = app(\Symfony\Component\Messenger\MessageBusInterface::class);
$bus->dispatch(new YourMessage($payload));
Or wrap in a Laravel command:
use Symfony\Component\Messenger\MessageBusInterface;
class DispatchJob implements ShouldQueue {
public function handle() {
$bus = app(MessageBusInterface::class);
$bus->dispatch(new YourMessage());
}
}
Batching:
Use Symfony’s BatchHandler to process messages in chunks:
# config/messenger.yaml
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
batch_handler: true
Worker Configuration: Run workers with retry logic:
php artisan messenger:consume async --limit=10 --time-limit=300
For high availability, use multiple workers with unique hostnames:
php artisan messenger:consume async --hostname=worker-1
Middleware: Add middleware for logging, validation, or retries:
use Symfony\Component\Messenger\Middleware\HandleMessage;
use Symfony\Component\Messenger\Middleware\SendMessageToTransport;
$middleware = [
new HandleMessage(),
new SendMessageToTransport(),
new YourCustomMiddleware(), // e.g., logging
];
$bus = new MessageBus($middleware);
Retry Logic:
Configure retries in messenger.yaml:
failure_transport: failed
transports:
failed:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
Failed Messages: Inspect failed messages via Redis CLI:
redis-cli LRANGE failed:queue 0 -1
Or use Symfony’s FailedMessage class to reprocess:
$failedMessage = $bus->retry($failedMessage);
DelayStamp to schedule messages:
use Symfony\Component\Messenger\Stamp\DelayStamp;
$bus->dispatch(
new YourMessage(),
[new DelayStamp(60000)] // 60 seconds delay
);
Route messages to specific queues:
# config/messenger.yaml
routing:
'App\Messages\HighPriorityMessage': async
'App\Messages\LowPriorityMessage': low_priority
Define multiple transports:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
low_priority:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low_priority_queue
Create a Laravel ShouldQueue job that dispatches Symfony messages:
class DispatchSymfonyMessageJob implements ShouldQueue {
public function handle() {
$bus = app(MessageBusInterface::class);
$bus->dispatch(new SymfonyMessage());
}
}
Use Laravel’s queue:work alongside Symfony Messenger:
php artisan queue:work --queue=laravel_queue &
php artisan messenger:consume async --queue=symfony_queue &
Convert Laravel events to Symfony messages:
class OrderCreatedListener {
public function handle(OrderCreated $event) {
$bus = app(MessageBusInterface::class);
$bus->dispatch(new ProcessOrderMessage($event->order));
}
}
Message Duplication:
TransportMessageIdStamp (enabled by default in Symfony Messenger) to deduplicate:
# config/messenger.yaml
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
message_id_generator: Symfony\Component\Messenger\Transport\Serialization\MessageIdGeneratorInterface
Connection Management:
public function register() {
$this->app->afterResolving(MessageBusInterface::class, function () {
$transport = $this->app->get('messenger.transport.async');
$transport->getConnection()->close();
});
}
Sentinel/Cluster Auth:
MESSENGER_TRANSPORT_DSN=redis+sentinel://user:pass@sentinel1:26379,...?master=master-set
Serialization:
Symfony\Component\Messenger\Transport\Serialization\SerializerInterface with custom serializers (e.g., IgBinarySerializer):
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
serializer: Symfony\Component\Messenger\Transport\Serialization\IgBinarySerializer
Worker Stuck on Messages:
failure_transport and monitor failed messages:
failure_transport: failed
transports:
failed:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
Redis CLI Inspection:
redis-cli LRANGE async:queue 0 -1
redis-cli LPOS async:queue 0
Symfony Debug Toolbar:
composer require symfony/messenger-messenger-bundle
Logging:
# config/packages/monolog.yaml
handlers:
messenger:
type: stream
path: "%kernel.logs_dir%
How can I help you explore Laravel packages today?