Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Redis Messenger Laravel Package

symfony/redis-messenger

Redis transport integration for Symfony Messenger. Use Redis streams/lists to send, receive, and retry messages with Messenger workers. Part of the Symfony ecosystem; issues and PRs handled in the main symfony/symfony repository.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup for Laravel

  1. Install Dependencies:

    composer require symfony/messenger redis symfony/redis-messenger
    pecl install redis
    
  2. Configure Redis Connection (in config/redis.php):

    'client' => [
        'dsn' => env('REDIS_DSN', 'redis://localhost:6379'),
        'options' => [
            'prefix' => env('REDIS_PREFIX', 'laravel_'),
        ],
    ],
    
  3. Set Up Symfony Messenger (in config/messenger.php):

    'transports' => [
        'async' => [
            'dsn' => env('MESSENGER_TRANSPORT_DSN', 'redis://default'),
            'options' => [
                'queue_name' => 'laravel_async',
                'serializer' => Symfony\Component\Serializer\Serializer::class,
            ],
        ],
    ],
    
  4. Create a Message Class (e.g., app/Messages/ProcessOrder.php):

    namespace App\Messages;
    
    class ProcessOrder
    {
        public function __construct(public string $orderId) {}
    }
    
  5. Dispatch a Message (in a Laravel controller):

    use Symfony\Component\Messenger\MessageBusInterface;
    
    public function __construct(private MessageBusInterface $bus) {}
    
    public function handleOrder(Order $order) {
        $this->bus->dispatch(new ProcessOrder($order->id));
    }
    
  6. Create a Message Handler (e.g., app/Handlers/ProcessOrderHandler.php):

    namespace App\Handlers;
    
    use App\Messages\ProcessOrder;
    use Symfony\Component\Messenger\Attribute\AsMessageHandler;
    
    #[AsMessageHandler]
    class ProcessOrderHandler
    {
        public function __invoke(ProcessOrder $message) {
            // Process order logic here
        }
    }
    
  7. Run the Worker (in a separate terminal):

    php bin/console messenger:consume async -vv
    

First Use Case: Background Job Offloading

Replace a blocking Laravel job (e.g., sending emails) with a Symfony message:

// Dispatch
$this->bus->dispatch(new SendWelcomeEmail($user->email));

// Handler
#[AsMessageHandler]
class SendWelcomeEmailHandler
{
    public function __invoke(SendWelcomeEmail $message) {
        Mail::to($message->email)->send(new WelcomeEmail());
    }
}

Implementation Patterns

Workflow: Laravel + Symfony Messenger Integration

  1. Dispatch Messages:

    • Use Symfony’s MessageBusInterface in Laravel controllers/services.
    • Example: Dispatch after user registration.
    $this->bus->dispatch(new NotifyUserRegistered($user->id));
    
  2. Consume Messages:

    • Run workers with messenger:consume (or integrate with Laravel’s queue:work via custom logic).
    • Example: Consume async queue with retries.
    php bin/console messenger:consume async --limit=10 --time-limit=300
    
  3. Handle Failures:

    • Leverage Symfony’s retry logic (configured in messenger.yaml).
    failure_transport: failed
    transports:
        failed:
            dsn: 'redis://default?queue_name=failed'
            options:
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
    
  4. Monitoring:

    • Use Symfony’s TransportStatistics or integrate with Laravel’s queue:failed-table.
    • Example: Track message delays.
    use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
    
    $serializer = new Serializer([new JsonEncoder()]);
    $transport = new RedisTransport($redisClient, $serializer);
    $stats = $transport->getStatistics();
    

Patterns for Common Scenarios

1. Fanout Publishing (Pub/Sub)

Use Redis pub/sub for real-time updates (e.g., notifications):

// Publisher (dispatch)
$this->bus->dispatch(new UserUpdated($user->id));

// Subscriber (handler)
#[AsMessageHandler]
class UserUpdatedSubscriber
{
    public function __invoke(UserUpdated $message) {
        // Broadcast to WebSocket clients
        broadcast(new UserUpdatedEvent($message->userId));
    }
}

2. Delayed Messages

Use Redis streams or delay option:

# messenger.yaml
transports:
    delayed:
        dsn: 'redis://default?queue_name=delayed'
        options:
            delay: 60000 # 1 minute delay

3. Batch Processing

Process messages in batches for efficiency:

// Handler
#[AsMessageHandler]
class BatchOrderProcessor
{
    public function __invoke(iterable $messages) {
        foreach ($messages as $message) {
            // Process each order
        }
    }
}

Configure in messenger.yaml:

routing:
    'App\Messages\ProcessOrder': batch

4. Message Prioritization

Use multiple queues with Symfony’s priority option:

transports:
    high_priority:
        dsn: 'redis://default?queue_name=high_priority'
        options:
            priority: 1

Integration Tips

  1. Laravel Service Provider: Bind Symfony Messenger components in AppServiceProvider:

    public function register() {
        $this->app->bind(MessageBusInterface::class, function ($app) {
            return new MessageBus([
                new Dispatcher([
                    new ProcessOrderHandler(),
                    // Add other handlers
                ]),
            ]);
        });
    }
    
  2. Queue Worker Integration: Extend Laravel’s queue:work to use Symfony Messenger:

    // app/Console/Kernel.php
    protected function schedule(Schedule $schedule) {
        $schedule->command('messenger:consume async --time-limit=60')
                 ->everyMinute();
    }
    
  3. Redis Connection Sharing: Reuse Laravel’s Redis connection:

    $redis = $this->app->make(Redis::class);
    $transport = new RedisTransport($redis, new Serializer([new JsonEncoder()]));
    
  4. Message Serialization: Customize serialization for complex objects:

    use Symfony\Component\Serializer\Encoder\JsonEncoder;
    use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
    
    $serializer = new Serializer([
        new ObjectNormalizer(),
    ], [new JsonEncoder()]);
    

Gotchas and Tips

Pitfalls

  1. Message Duplication:

    • Fixed in v7.4.8+, but ensure you’re using a recent version.
    • Workaround: Use idempotent handlers or deduplication logic.
  2. Redis Connection Issues:

    • Symfony Messenger doesn’t auto-reconnect. Handle failures gracefully:
    try {
        $this->bus->dispatch($message);
    } catch (RedisException $e) {
        Log::error("Redis connection failed: " . $e->getMessage());
        // Fallback to database queue
    }
    
  3. Missing TransportMessageIdStamp:

    • Ensure your message classes are serializable. Use #[AsMessageHandler] to auto-add stamps.
  4. Sentinel/Cluster Configuration:

    • DSN must include sentinel or cluster scheme:
    transports:
        redis:
            dsn: 'redis+sentinel://user:pass@host1:26379,host2:26379/?database=0'
    
  5. PHP Redis Extension Version:

    • Requires ext-redis 6.1+. Check with:
    php -m | grep redis
    

Debugging Tips

  1. Enable Verbose Logging:

    php bin/console messenger:consume async -vv
    
  2. Inspect Redis Keys: Use redis-cli to debug:

    redis-cli KEYS "laravel_*"
    redis-cli LRANGE "laravel_async" 0 -1
    
  3. Check Failed Messages:

    php bin/console messenger:failed:list
    php bin/console messenger:failed:remove <message-id>
    
  4. Monitor Worker Performance: Use Symfony’s TransportStatistics:

    $stats = $transport->getStatistics();
    Log::info('Messages in queue:', [$stats->getMessageCount()]);
    

Configuration Quirks

  1. Queue Naming:

    • Default queue name is messenger. Customize via queue_name option:
    transports:
        async:
            dsn: 'redis://default'
            options:
                queue_name: 'laravel_async'
    
  2. Serialization:

    • Symfony uses Symfony\Component\Serializer\Serializer. For Laravel’s native objects, extend the normalizer:
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
anousss007/vigilance
supportpal/eloquent-model
ardenexal/fhir-models
laravel-at/laravel-image-sanitize
romalytar/yammi-audit-log-laravel
ardenexal/fhir-validation
arshaviras/weather-widget
laravel-chronicle/core
sunchayn/nimbus
daikazu/eloquent-salesforce-objects
unseen-codes/chat
romalytar/yammi-jobs-monitoring-laravel
kisame76/filament-db-table-state
nqxcode/laravel-lucene-search
dpfx/laravel-livewire-wizards
workos/workos-php-laravel
sofa/laravel-global-scope
nawasara/auth-primitives
adhocrat-io/arkhe-main
make-dev/orca-harpoon