adtechpotok/messenger-adapter
Symfony Messenger transport built on Enqueue, enabling send/receive via supported brokers (e.g., AMQP/RabbitMQ). Configure Messenger with an enqueue:// DSN, route messages, and consume workers; supports queue/exchange options and per-message topic overrides.
Installation
composer require enqueue/messenger-adapter
Ensure php-enqueue/enqueue and symfony/messenger are also installed.
Configure Enqueue Bundle
Add the EnqueueBundle to config/bundles.php and set up .env:
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
Configure Messenger Transport
Define a transport in config/packages/messenger.yaml:
framework:
messenger:
transports:
amqp: enqueue://default
Route a Message
Route a message class to the amqp transport in config/packages/framework.yaml:
framework:
messenger:
routing:
'App\Message\MyMessage': amqp
Dispatch a Message
use App\Message\MyMessage;
use Symfony\Component\Messenger\MessageBusInterface;
public function __construct(private MessageBusInterface $bus) {}
public function handle(): void
{
$this->bus->dispatch(new MyMessage());
}
Consume Messages
php bin/console messenger:consume-messages amqp
Use this package to offload long-running tasks (e.g., sending emails, processing files) to a queue. For example:
// Dispatch a message to process an order
$this->bus->dispatch(new ProcessOrder($orderId));
Then run the consumer in the background:
php bin/console messenger:consume-messages amqp --time-limit=300
Producer-Consumer Pattern
$this->bus->dispatch(new SendWelcomeEmail($userId));
@AsMessageHandler).
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
public function __invoke(SendWelcomeEmail $message)
{
// Send email logic
}
Retry Failed Messages
Configure retry logic in messenger.yaml:
framework:
messenger:
transports:
amqp:
dsn: enqueue://default
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
Delayed Messages
Use the delay option when dispatching:
$this->bus->dispatch(
(new SendReminderEmail($userId))->setDelay(3600) // Delay by 1 hour
);
Batching Process messages in batches for efficiency:
php bin/console messenger:consume-messages amqp --limit=50
MessengerCommandBus trait to dispatch messages from commands.postPersist).MessengerTestTrait or mock the MessageBusInterface in unit tests:
$this->bus->expects($this)->dispatch($message);
DSN Configuration
ENQUEUE_DSN in .env matches your broker (e.g., amqp://, redis://, sqs://).guest/guest) may not work in production. Use a dedicated user:
ENQUEUE_DSN=amqp://user:password@localhost:5672/%2f
Transport Naming
messenger.yaml (e.g., amqp) must match the DSN prefix (enqueue://default).Consumer Lifecycle
services.yaml:
services:
App\MessageHandler\MyMessageHandler:
tags: ['messenger.message_handler']
Message Serialization
Serializer. Ensure your message classes are serializable (e.g., avoid private properties without getters/setters).messenger.yaml:
framework:
messenger:
serialization:
serializer: messenger.transport.symfony_serializer
format: 'json'
ignore_exception: false
Queue Visibility
--time-limit flag to manage this:
php bin/console messenger:consume-messages amqp --time-limit=60
Check Queue Status Use Enqueue’s CLI tools to inspect queues:
enqueue:consume --dsn=amqp://guest:guest@localhost:5672/%2f
Or use a GUI tool like RabbitMQ Management or RedisInsight.
Log Consumption
Enable debug logging in config/packages/monolog.yaml:
monolog:
handlers:
messenger:
type: stream
path: "%kernel.logs_dir%/%kernel.environment%.messenger.log"
level: debug
Handle Exceptions Wrap message handling in try-catch blocks to avoid consumer crashes:
public function __invoke(MyMessage $message)
{
try {
// Handle message
} catch (\Exception $e) {
throw new TransportException($e->getMessage(), $e->getCode(), $e);
}
}
Custom Transports
Extend the EnqueueTransport class to support custom brokers or configurations:
use Enqueue\Symfony\Transport\EnqueueTransport;
class CustomEnqueueTransport extends EnqueueTransport
{
public function __construct(ConnectionInterface $connection, array $options = [])
{
$options['queue']['routingKey']['name'] = 'custom_queue';
parent::__construct($connection, $options);
}
}
Middleware Add middleware to the transport for logging, validation, or transformation:
framework:
messenger:
transports:
amqp:
dsn: enqueue://default
middleware:
- 'App\Middleware\LogMessageMiddleware'
Dynamic Routing Use dynamic routing based on message attributes:
#[AsMessageHandler]
public function __invoke(DynamicMessage $message)
{
$transport = $message->getTransport();
$this->bus->dispatch($message, [$transport]);
}
Environment-Specific Configs Use Symfony’s parameter bags to switch transports between environments:
# config/packages/dev/messenger.yaml
framework:
messenger:
transports:
amqp: enqueue://default
# config/packages/prod/messenger.yaml
framework:
messenger:
transports:
amqp: enqueue://prod_amqp_connection
How can I help you explore Laravel packages today?