symfony/redis-messenger
Redis transport integration for Symfony Messenger. Use Redis streams/lists to send, receive, and retry messages with Messenger workers. Part of the Symfony ecosystem; issues and PRs handled in the main symfony/symfony repository.
Install Dependencies:
composer require symfony/messenger redis symfony/redis-messenger
pecl install redis
Configure Redis Connection (in config/redis.php):
'client' => [
'dsn' => env('REDIS_DSN', 'redis://localhost:6379'),
'options' => [
'prefix' => env('REDIS_PREFIX', 'laravel_'),
],
],
Set Up Symfony Messenger (in config/messenger.php):
'transports' => [
'async' => [
'dsn' => env('MESSENGER_TRANSPORT_DSN', 'redis://default'),
'options' => [
'queue_name' => 'laravel_async',
'serializer' => Symfony\Component\Serializer\Serializer::class,
],
],
],
Create a Message Class (e.g., app/Messages/ProcessOrder.php):
namespace App\Messages;
class ProcessOrder
{
public function __construct(public string $orderId) {}
}
Dispatch a Message (in a Laravel controller):
use Symfony\Component\Messenger\MessageBusInterface;
public function __construct(private MessageBusInterface $bus) {}
public function handleOrder(Order $order) {
$this->bus->dispatch(new ProcessOrder($order->id));
}
Create a Message Handler (e.g., app/Handlers/ProcessOrderHandler.php):
namespace App\Handlers;
use App\Messages\ProcessOrder;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class ProcessOrderHandler
{
public function __invoke(ProcessOrder $message) {
// Process order logic here
}
}
Run the Worker (in a separate terminal):
php bin/console messenger:consume async -vv
Replace a blocking Laravel job (e.g., sending emails) with a Symfony message:
// Dispatch
$this->bus->dispatch(new SendWelcomeEmail($user->email));
// Handler
#[AsMessageHandler]
class SendWelcomeEmailHandler
{
public function __invoke(SendWelcomeEmail $message) {
Mail::to($message->email)->send(new WelcomeEmail());
}
}
Dispatch Messages:
MessageBusInterface in Laravel controllers/services.$this->bus->dispatch(new NotifyUserRegistered($user->id));
Consume Messages:
messenger:consume (or integrate with Laravel’s queue:work via custom logic).async queue with retries.php bin/console messenger:consume async --limit=10 --time-limit=300
Handle Failures:
messenger.yaml).failure_transport: failed
transports:
failed:
dsn: 'redis://default?queue_name=failed'
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
Monitoring:
TransportStatistics or integrate with Laravel’s queue:failed-table.use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
$serializer = new Serializer([new JsonEncoder()]);
$transport = new RedisTransport($redisClient, $serializer);
$stats = $transport->getStatistics();
Use Redis pub/sub for real-time updates (e.g., notifications):
// Publisher (dispatch)
$this->bus->dispatch(new UserUpdated($user->id));
// Subscriber (handler)
#[AsMessageHandler]
class UserUpdatedSubscriber
{
public function __invoke(UserUpdated $message) {
// Broadcast to WebSocket clients
broadcast(new UserUpdatedEvent($message->userId));
}
}
Use Redis streams or delay option:
# messenger.yaml
transports:
delayed:
dsn: 'redis://default?queue_name=delayed'
options:
delay: 60000 # 1 minute delay
Process messages in batches for efficiency:
// Handler
#[AsMessageHandler]
class BatchOrderProcessor
{
public function __invoke(iterable $messages) {
foreach ($messages as $message) {
// Process each order
}
}
}
Configure in messenger.yaml:
routing:
'App\Messages\ProcessOrder': batch
Use multiple queues with Symfony’s priority option:
transports:
high_priority:
dsn: 'redis://default?queue_name=high_priority'
options:
priority: 1
Laravel Service Provider:
Bind Symfony Messenger components in AppServiceProvider:
public function register() {
$this->app->bind(MessageBusInterface::class, function ($app) {
return new MessageBus([
new Dispatcher([
new ProcessOrderHandler(),
// Add other handlers
]),
]);
});
}
Queue Worker Integration:
Extend Laravel’s queue:work to use Symfony Messenger:
// app/Console/Kernel.php
protected function schedule(Schedule $schedule) {
$schedule->command('messenger:consume async --time-limit=60')
->everyMinute();
}
Redis Connection Sharing: Reuse Laravel’s Redis connection:
$redis = $this->app->make(Redis::class);
$transport = new RedisTransport($redis, new Serializer([new JsonEncoder()]));
Message Serialization: Customize serialization for complex objects:
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
$serializer = new Serializer([
new ObjectNormalizer(),
], [new JsonEncoder()]);
Message Duplication:
Redis Connection Issues:
try {
$this->bus->dispatch($message);
} catch (RedisException $e) {
Log::error("Redis connection failed: " . $e->getMessage());
// Fallback to database queue
}
Missing TransportMessageIdStamp:
#[AsMessageHandler] to auto-add stamps.Sentinel/Cluster Configuration:
sentinel or cluster scheme:transports:
redis:
dsn: 'redis+sentinel://user:pass@host1:26379,host2:26379/?database=0'
PHP Redis Extension Version:
ext-redis 6.1+. Check with:php -m | grep redis
Enable Verbose Logging:
php bin/console messenger:consume async -vv
Inspect Redis Keys:
Use redis-cli to debug:
redis-cli KEYS "laravel_*"
redis-cli LRANGE "laravel_async" 0 -1
Check Failed Messages:
php bin/console messenger:failed:list
php bin/console messenger:failed:remove <message-id>
Monitor Worker Performance:
Use Symfony’s TransportStatistics:
$stats = $transport->getStatistics();
Log::info('Messages in queue:', [$stats->getMessageCount()]);
Queue Naming:
messenger. Customize via queue_name option:transports:
async:
dsn: 'redis://default'
options:
queue_name: 'laravel_async'
Serialization:
Symfony\Component\Serializer\Serializer. For Laravel’s native objects, extend the normalizer:
How can I help you explore Laravel packages today?