## Getting Started
### Minimal Setup
1. **Installation**:
```bash
composer require php-amqplib/rabbitmq-bundle
Enable the bundle in config/bundles.php:
return [
// ...
PhpAmqpLib\RabbitMqBundle\RabbitMqBundle::class => ['all' => true],
];
Configuration:
Add RabbitMQ connection details to config/packages/rabbitmq.yaml:
rabbit_mq:
hosts:
default:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
connections:
default: ~
producers:
default: ~
consumers:
default: ~
First Use Case: Publish a message from a controller:
use PhpAmqpLib\RabbitMqBundle\RabbitMq\Producer;
class MessageController extends AbstractController
{
public function send(Producer $producer): Response
{
$producer->publish(
'message',
'Hello RabbitMQ!'
);
return new Response('Message sent!');
}
}
Consuming Messages:
Create a consumer service (e.g., src/Message/MessageConsumer):
use PhpAmqpLib\RabbitMqBundle\RabbitMq\ConsumerInterface;
class MessageConsumer implements ConsumerInterface
{
public function execute(array $body, string $json): void
{
// Process message
}
}
Register it in config/packages/rabbitmq.yaml:
consumers:
default:
callback: 'service:message_consumer'
queue: 'message'
exchange: 'message'
routing_key: 'message'
Direct Publishing:
$producer->publish('exchange', 'routing_key', $message, $properties = []);
Use for simple fire-and-forget messaging.
Delayed Messages (via x-delay header):
# config/packages/rabbitmq.yaml
producers:
delayed:
exchange: 'delayed'
routing_key: 'delayed.key'
headers:
'x-delay': 10000 # 10 seconds
$producer->publish('delayed', $message);
Batch Publishing:
$producer->publishBatch('exchange', 'routing_key', [$msg1, $msg2, $msg3]);
Asynchronous Consumption:
Use Symfony’s Messenger component to decouple consumers:
# config/packages/messenger.yaml
framework:
messenger:
transports:
rabbitmq: '%env(RABBITMQ_DSN)%'
routing:
'App\Message\ProcessMessage': rabbitmq
// In a consumer service
use Symfony\Component\Messenger\MessageBusInterface;
class AsyncConsumer
{
public function __construct(private MessageBusInterface $bus)
{}
public function execute(array $body, string $json): void
{
$this->bus->dispatch(new ProcessMessage($json));
}
}
Prefetch Count (limit unacknowledged messages):
consumers:
default:
prefetch_count: 10
$rpc = new \PhpAmqpLib\RabbitMqBundle\RabbitMq\Rpc\RpcClient($connection);
$result = $rpc->call('rpc_queue', 'Hello', 1000); // Timeout: 1s
producers:
fanout:
exchange: 'fanout_exchange'
exchange_type: 'fanout'
$producer->publish('fanout', '', $message); // No routing key
producers:
topic:
exchange: 'topic_exchange'
exchange_type: 'topic'
$producer->publish('topic', 'logs.error', $message);
Dependency Injection:
Inject Producer or ConsumerInterface directly into services:
public function __construct(private Producer $producer) {}
Event Listeners: Trigger messages on events:
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class OrderEventSubscriber implements EventSubscriberInterface
{
public function __construct(private Producer $producer) {}
public static function getSubscribedEvents(): array
{
return [OrderCreatedEvent::class => 'onOrderCreated'];
}
public function onOrderCreated(OrderCreatedEvent $event): void
{
$this->producer->publish('order_events', 'order.created', $event->getOrder());
}
}
rabbitmq.yaml:
consumers:
default:
queue:
dead_letter_exchange: 'dead_letter'
dead_letter_routing_key: 'dead.letter'
Use for failed message recovery.symfony/monolog-bundle to log message flow:
$this->logger->info('Message published', ['exchange' => 'exchange', 'routing_key' => 'key']);
reconnect config:
rabbit_mq:
hosts:
default:
reconnect: true
reconnect_delay: 5000 # 5s
execute() in try-catch).ack()/nack() explicitly:
public function execute(array $body, string $json): void
{
try {
// Process
$this->getConsumer()->ack();
} catch (\Exception $e) {
$this->getConsumer()->nack(false, false, 'Error: ' . $e->getMessage());
}
}
json_encode() or a serializer:
$producer->publish('exchange', 'key', json_encode($object));
MessageSerializer for custom logic.onShutdown():
public function onShutdown(): void
{
$this->getConnection()->close();
}
rabbit_mq:
logging: true
Logs will appear in var/log/dev.log.http://localhost:15672 (default credentials: guest/guest).
| Error | Cause | Solution |
|---|---|---|
Connection refused |
RabbitMQ not running | Start RabbitMQ (sudo service rabbitmq-server start) |
Access refused |
Incorrect credentials | Verify user/password in config |
Not found (exchange/queue) |
Exchange/queue doesn’t exist | Declare exchanges/queues in config |
Precondition failed |
Queue/exchange already exists | Use ignore_declare: true in config |
Channel error |
Invalid routing key | Validate routing keys against bindings |
ConsumerInterface:
$consumer = $this->createMock(Consumer
How can I help you explore Laravel packages today?