Installation Add the bundle via Composer:
composer require ecommit/amqp-bundle
Enable the bundle in config/bundles.php:
Ecommit\AmqpBundle\EcommitAmqpBundle::class => ['all' => true],
Configuration Publish the default config:
php bin/console config:dump-reference EcommitAmqpBundle
Configure config/packages/ecommit_amqp.yaml:
ecommit_amqp:
host: 'amqp://guest:guest@localhost:5672'
exchange: 'your_exchange'
queue: 'your_queue'
routing_key: 'your.routing.key'
First Use Case
Inject the AmqpProducer service in a controller or command:
use Ecommit\AmqpBundle\Producer\AmqpProducerInterface;
class ExampleController extends AbstractController
{
public function __construct(private AmqpProducerInterface $amqpProducer) {}
public function sendMessage()
{
$this->amqpProducer->publish('Hello AMQP!');
return new Response('Message sent!');
}
}
Message Production
Use AmqpProducerInterface to publish messages:
$this->amqpProducer->publish(
json_encode(['event' => 'order.created', 'data' => $order]),
['content_type' => 'application/json']
);
Message Consumption
Create a consumer service implementing AmqpConsumerInterface:
use Ecommit\AmqpBundle\Consumer\AmqpConsumerInterface;
class OrderConsumer implements AmqpConsumerInterface
{
public function consume($message)
{
$data = json_decode($message, true);
// Process $data['event'] and $data['data']
}
}
Register the consumer in services.yaml:
services:
App\Consumer\OrderConsumer:
tags:
- { name: ecommit_amqp.consumer, queue: 'order_queue' }
Error Handling
Implement AmqpErrorHandlerInterface for custom error logic:
use Ecommit\AmqpBundle\Error\AmqpErrorHandlerInterface;
class CustomErrorHandler implements AmqpErrorHandlerInterface
{
public function handle(\PhpAmqpLib\Exception\AMQPException $e)
{
// Log, retry, or notify
}
}
Configure in config/packages/ecommit_amqp.yaml:
ecommit_amqp:
error_handler: 'App\Error\CustomErrorHandler'
AmqpTransport to integrate with Symfony Messenger:
# config/packages/messenger.yaml
messenger:
transports:
amqp:
dsn: '%env(AMQP_DSN)%'
options:
exchange: ['your_exchange', 'direct']
AmqpRetryStrategy for transient failures:
ecommit_amqp:
retry_strategy:
max_retries: 3
delay: 1000
Connection Management
AmqpProducer::close() in shutdown events or implement __destruct() in consumers.Message Serialization
publish() uses serialize(). Explicitly set content_type for JSON:
$this->amqpProducer->publish(json_encode($data), ['content_type' => 'application/json']);
Queue Binding
php bin/console debug:container Ecommit\AmqpBundle\Manager\AmqpManager.Consumer Lifecycle
eccommit_amqp.consumer tag includes queue and concurrency (default: 1).Enable AMQP Debugging
Add to config/packages/ecommit_amqp.yaml:
ecommit_amqp:
debug: true
Logs will appear in var/log/dev.log.
Check Queue Status
Use php-amqplib CLI tools or RabbitMQ Management UI to verify:
Custom Serializers
Override AmqpProducer to support custom formats:
class CustomAmqpProducer extends AbstractAmqpProducer
{
protected function serialize($message, array $headers = [])
{
return json_encode($message);
}
}
Register as a service with the amqp.producer tag.
Dynamic Routing
Use AmqpProducer::publishWithRoutingKey() for dynamic keys:
$this->amqpProducer->publishWithRoutingKey(
$message,
'dynamic.' . $userId . '.key'
);
Middleware Support
Extend AmqpProducer to add middleware (e.g., logging, validation):
$producer = new AmqpProducer($connection, $exchange);
$producer->addMiddleware(new LoggingMiddleware());
How can I help you explore Laravel packages today?