Install the Bundle:
composer require cmobi/rabbitmq-bundle
Enable the bundle in config/bundles.php:
return [
// ...
Cmobi\RabbitMqBundle\CmobiRabbitMqBundle::class => ['all' => true],
];
Configure RabbitMQ (config/packages/cmobi_rabbitmq.yaml):
cmobi_rabbitmq:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
lazy: true # Recommended for production
First Use Case: Publishing a Message
Inject the RabbitMqProducer service and publish a message:
use Cmobi\RabbitMqBundle\Producer\RabbitMqProducer;
class MyService
{
public function __construct(private RabbitMqProducer $producer)
{}
public function sendMessage()
{
$this->producer->publish(
'my_queue',
json_encode(['key' => 'value'])
);
}
}
Producer Pattern (Publish Messages)
RabbitMqProducer for one-off or batched messages:
$producer->publish('queue_name', $message, [
'content_type' => 'application/json',
'delivery_mode' => AMQP_MSG_PERSISTENT, // Ensure durability
]);
foreach ($messages as $msg) {
try {
$producer->publish('queue', $msg);
} catch (\Exception $e) {
$this->handleFailure($msg, $e);
}
}
Consumer Pattern (Subscribe to Queues)
RabbitMqConsumerInterface:
use Cmobi\RabbitMqBundle\Consumer\RabbitMqConsumerInterface;
class MyConsumer implements RabbitMqConsumerInterface
{
public function execute($message, $deliveryInfo, $properties)
{
// Process $message (e.g., json_decode)
return AMQP_CONSUME_ACK; // Acknowledge
}
}
services.yaml:
services:
App\Consumer\MyConsumer:
tags:
- { name: 'cmobi_rabbitmq.consumer', queue: 'my_queue' }
Exchange/Routing Key Usage
$producer->publishToExchange(
'my_exchange',
'routing.key',
$message,
['content_type' => 'text/plain']
);
cmobi_rabbitmq:
exchanges:
my_exchange:
type: direct
queues:
- my_queue
bindings:
- { queue: 'my_queue', routing_key: 'routing.key' }
Dependency Injection
public function __construct(
private RabbitMqProducer $producer,
private RabbitMqConsumerRegistry $consumerRegistry
) {}
$connection = $this->producer->getConnection();
Symfony Events
Bind RabbitMQ actions to Symfony events (e.g., kernel.terminate for graceful shutdown):
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class RabbitMqShutdownSubscriber implements EventSubscriberInterface
{
public static function getSubscribedEvents()
{
return ['kernel.terminate' => 'shutdown'];
}
public function shutdown()
{
$this->producer->closeConnection();
}
}
Retry Logic Implement exponential backoff for transient failures:
$attempts = 0;
$maxAttempts = 3;
while ($attempts < $maxAttempts) {
try {
$producer->publish('queue', $message);
break;
} catch (\Exception $e) {
$attempts++;
sleep(2 ** $attempts); // Exponential backoff
}
}
Monitoring Log message metadata (e.g., routing key, timestamp) for debugging:
$producer->publish('queue', $message, [
'app_metadata' => [
'user_id' => auth()->id(),
'timestamp' => now()->toIso8601String(),
],
]);
Testing
Use a test container (e.g., rabbitmq:3-management) in phpunit.xml:
<env name="RABBITMQ_HOST" value="localhost"/>
<env name="RABBITMQ_PORT" value="5672"/>
Mock the producer in unit tests:
$this->mockBuilder(RabbitMqProducer::class)
->disableOriginalConstructor()
->getMock()
->method('publish')
->willReturn(true);
Connection Management
lazy: true setting (default) delays connection until first use. Ensure connections are closed explicitly during shutdown to avoid leaks:
$producer->closeConnection(); // Call this on app shutdown
Message Persistence
delivery_mode: AMQP_MSG_PERSISTENT for critical messages:
$producer->publish('queue', $message, ['delivery_mode' => AMQP_MSG_PERSISTENT]);
Consumer Lifecycle
AMQP_CONSUME_ACK or AMQP_CONSUME_REJECT. Forgetting this causes messages to pile up.$consumerTag = uniqid();
$channel->consume($this->execute(...), $consumerTag);
Configuration Overrides
config/packages/cmobi_rabbitmq.yaml:
cmobi_rabbitmq:
host: '%env(RABBITMQ_HOST)%'
Queue Declarations
cmobi_rabbitmq:
queues:
my_queue:
durable: true
exclusive: false
auto_delete: false
arguments:
'x-message-ttl': 86400000 # 24h TTL in ms
Enable Debugging
Set debug: true in config to log connection events:
cmobi_rabbitmq:
debug: true
Check RabbitMQ Management UI
http://localhost:15672 (default credentials: guest/guest) to inspect queues, messages, and consumer status.Common Errors
ConnectionRefusedError: Verify RabbitMQ is running and credentials are correct.Channel.Close: Often indicates a consumer error. Check logs for AMQP_* exceptions.NotFound for Queues/Exchanges: Ensure queues/exchanges are declared before use or pre-configured in cmobi_rabbitmq.yaml.Logging Add a custom logger to the producer:
$producer->setLogger($this->container->get('logger'));
Custom Producers/Consumers Extend the base classes to add functionality:
class CustomProducer extends RabbitMqProducer
{
public function publishWithRetry($queue, $message, $retries = 3)
{
// Implement retry logic
}
}
Register the custom service in services.yaml:
services:
App\Producer\CustomProducer:
parent: 'cmobi_rabbitmq.producer'
arguments: ['@cmobi_rabbitmq.connection']
Middleware for Messages Add preprocessing/postprocessing via a decorator:
class LoggingProducer
How can I help you explore Laravel packages today?