Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Messenger Adapter Laravel Package

adtechpotok/messenger-adapter

Symfony Messenger transport built on Enqueue, enabling send/receive via supported brokers (e.g., AMQP/RabbitMQ). Configure Messenger with an enqueue:// DSN, route messages, and consume workers; supports queue/exchange options and per-message topic overrides.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Installation

    composer require enqueue/messenger-adapter
    

    Ensure php-enqueue/enqueue and symfony/messenger are also installed.

  2. Configure Enqueue Bundle Add the EnqueueBundle to config/bundles.php and set up .env:

    ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
    
  3. Configure Messenger Transport Define a transport in config/packages/messenger.yaml:

    framework:
        messenger:
            transports:
                amqp: enqueue://default
    
  4. Route a Message Route a message class to the amqp transport in config/packages/framework.yaml:

    framework:
        messenger:
            routing:
                'App\Message\MyMessage': amqp
    
  5. Dispatch a Message

    use App\Message\MyMessage;
    use Symfony\Component\Messenger\MessageBusInterface;
    
    public function __construct(private MessageBusInterface $bus) {}
    
    public function handle(): void
    {
        $this->bus->dispatch(new MyMessage());
    }
    
  6. Consume Messages

    php bin/console messenger:consume-messages amqp
    

First Use Case

Use this package to offload long-running tasks (e.g., sending emails, processing files) to a queue. For example:

// Dispatch a message to process an order
$this->bus->dispatch(new ProcessOrder($orderId));

Then run the consumer in the background:

php bin/console messenger:consume-messages amqp --time-limit=300

Implementation Patterns

Workflows

  1. Producer-Consumer Pattern

    • Producers: Dispatch messages from controllers, services, or commands.
      $this->bus->dispatch(new SendWelcomeEmail($userId));
      
    • Consumers: Handle messages in dedicated classes (annotated with @AsMessageHandler).
      use Symfony\Component\Messenger\Attribute\AsMessageHandler;
      
      #[AsMessageHandler]
      public function __invoke(SendWelcomeEmail $message)
      {
          // Send email logic
      }
      
  2. Retry Failed Messages Configure retry logic in messenger.yaml:

    framework:
        messenger:
            transports:
                amqp:
                    dsn: enqueue://default
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
    
  3. Delayed Messages Use the delay option when dispatching:

    $this->bus->dispatch(
        (new SendReminderEmail($userId))->setDelay(3600) // Delay by 1 hour
    );
    
  4. Batching Process messages in batches for efficiency:

    php bin/console messenger:consume-messages amqp --limit=50
    

Integration Tips

  • Symfony Commands: Use the MessengerCommandBus trait to dispatch messages from commands.
  • Doctrine Events: Dispatch messages in Doctrine lifecycle callbacks (e.g., postPersist).
  • API Endpoints: Dispatch messages asynchronously in API controllers to improve response times.
  • Testing: Use MessengerTestTrait or mock the MessageBusInterface in unit tests:
    $this->bus->expects($this)->dispatch($message);
    

Gotchas and Tips

Pitfalls

  1. DSN Configuration

    • Ensure the ENQUEUE_DSN in .env matches your broker (e.g., amqp://, redis://, sqs://).
    • For RabbitMQ, default credentials (guest/guest) may not work in production. Use a dedicated user:
      ENQUEUE_DSN=amqp://user:password@localhost:5672/%2f
      
  2. Transport Naming

    • The transport name in messenger.yaml (e.g., amqp) must match the DSN prefix (enqueue://default).
    • Mismatches will result in silent failures or unexpected behavior.
  3. Consumer Lifecycle

    • Consumers must be registered as services in services.yaml:
      services:
          App\MessageHandler\MyMessageHandler:
              tags: ['messenger.message_handler']
      
    • Forgetting this tag will cause messages to be ignored.
  4. Message Serialization

    • By default, messages are serialized with Symfony’s Serializer. Ensure your message classes are serializable (e.g., avoid private properties without getters/setters).
    • Customize serialization in messenger.yaml:
      framework:
          messenger:
              serialization:
                  serializer: messenger.transport.symfony_serializer
                  format: 'json'
                  ignore_exception: false
      
  5. Queue Visibility

    • Messages may become "invisible" if the consumer crashes. Use the --time-limit flag to manage this:
      php bin/console messenger:consume-messages amqp --time-limit=60
      

Debugging

  1. Check Queue Status Use Enqueue’s CLI tools to inspect queues:

    enqueue:consume --dsn=amqp://guest:guest@localhost:5672/%2f
    

    Or use a GUI tool like RabbitMQ Management or RedisInsight.

  2. Log Consumption Enable debug logging in config/packages/monolog.yaml:

    monolog:
        handlers:
            messenger:
                type: stream
                path: "%kernel.logs_dir%/%kernel.environment%.messenger.log"
                level: debug
    
  3. Handle Exceptions Wrap message handling in try-catch blocks to avoid consumer crashes:

    public function __invoke(MyMessage $message)
    {
        try {
            // Handle message
        } catch (\Exception $e) {
            throw new TransportException($e->getMessage(), $e->getCode(), $e);
        }
    }
    

Extension Points

  1. Custom Transports Extend the EnqueueTransport class to support custom brokers or configurations:

    use Enqueue\Symfony\Transport\EnqueueTransport;
    
    class CustomEnqueueTransport extends EnqueueTransport
    {
        public function __construct(ConnectionInterface $connection, array $options = [])
        {
            $options['queue']['routingKey']['name'] = 'custom_queue';
            parent::__construct($connection, $options);
        }
    }
    
  2. Middleware Add middleware to the transport for logging, validation, or transformation:

    framework:
        messenger:
            transports:
                amqp:
                    dsn: enqueue://default
                    middleware:
                        - 'App\Middleware\LogMessageMiddleware'
    
  3. Dynamic Routing Use dynamic routing based on message attributes:

    #[AsMessageHandler]
    public function __invoke(DynamicMessage $message)
    {
        $transport = $message->getTransport();
        $this->bus->dispatch($message, [$transport]);
    }
    
  4. Environment-Specific Configs Use Symfony’s parameter bags to switch transports between environments:

    # config/packages/dev/messenger.yaml
    framework:
        messenger:
            transports:
                amqp: enqueue://default
    # config/packages/prod/messenger.yaml
    framework:
        messenger:
            transports:
                amqp: enqueue://prod_amqp_connection
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle
atriumphp/atrium
sandermuller/package-boost-laravel
sandermuller/boost-skills
redaxo/core
yusufgenc/filament-api-forge
l3aro/rating-star-for-filament
leek/filament-subtenant-scope