Installation:
composer require constantable/rabbitmq-bundle
Add to config/bundles.php:
return [
// ...
Constantable\RabbitMqBundle\RabbitMqBundle::class => ['all' => true],
];
Configure config/packages/rabbitmq.yaml:
rabbit_mq:
hosts:
default:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
connections:
default: ~
producers:
upload_picture_producer:
connection: default
exchange_options: { name: 'upload_pictures', type: direct }
consumers:
upload_picture_consumer:
connection: default
exchange_options: { name: 'upload_pictures', type: direct }
queue_options: { name: 'upload_pictures' }
callback: 'app.callback.upload_picture'
First Use Case: Publish a message from a controller:
use Symfony\Component\HttpFoundation\Response;
class UploadController extends AbstractController
{
public function uploadPicture(): Response
{
$message = ['user_id' => 123, 'image_path' => '/path/to/image.jpg'];
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($message));
return new Response('Message published!');
}
}
Consume Messages via CLI:
php bin/console rabbitmq:consumer upload_picture_consumer
Producer-Consumer Pattern:
upload_picture_producer).
$producer = $this->get('old_sound_rabbit_mq.upload_picture_producer');
$producer->publish(serialize($data), ['content_type' => 'application/json']);
rabbitmq.yaml).
consumers:
image_processor:
callback: 'App\Consumer\ImageProcessor::handle'
Message Serialization:
serialize() for simple data or JSON for structured payloads:
$producer->publish(json_encode($data), ['content_type' => 'application/json']);
public static function handle($message, $messageInfo, RabbitMqConnection $connection)
{
$data = json_decode($message, true);
// Process $data
}
Error Handling:
nack() and optionally requeue:
if ($data['user_id'] === 0) {
$connection->reject($messageInfo->getDeliveryTag(), false, false);
return;
}
$connection->ack($messageInfo->getDeliveryTag());
Dynamic Routing:
exchanges:
logs:
name: 'logs'
type: topic
queues:
error_logs:
name: 'error_logs'
exchange: 'logs'
routing_key: 'error.#'
Prefetch Count:
rabbitmq.yaml):
consumers:
worker:
prefetch_count: 10
Symfony Events:
$producer->publish($message);
$this->get('event_dispatcher')->dispatch(new MessagePublishedEvent($message));
Doctrine Integration:
$entityManager->beginTransaction();
try {
$producer->publish($message);
$entityManager->commit();
} catch (\Exception $e) {
$entityManager->rollback();
throw $e;
}
Retry Logic:
queues:
failed_jobs:
name: 'failed_jobs'
exchange: 'dead_letter_exchange'
Monitoring:
messageInfo->getDeliveryTag()) for debugging.Connection Management:
rabbitmq:setup to verify connections and wrap consumers in retry logic:
try {
$connection->consume();
} catch (\Exception $e) {
sleep(5);
retry();
}
Serialization Mismatches:
serialize() across the app. Avoid mixing formats.Consumer Lifecycle:
SIGINT:
php bin/console rabbitmq:consumer worker --stop-when-empty
Configuration Overrides:
php-amqplib defaults.rabbitmq.yaml (e.g., host, port).Queue Visibility:
queue_options: { flags: { 'x-message-ttl': 300000 } } (5-minute TTL) to auto-delete stale messages.RabbitMQ Management UI:
http://localhost:15672 (default credentials: guest/guest) to inspect queues, exchanges, and messages.Logging:
config/packages/monolog.yaml:
handlers:
rabbitmq:
type: stream
path: "%kernel.logs_dir%/rabbitmq.log"
level: debug
Consumer Debugging:
--limit to test consumers with a subset of messages:
php bin/console rabbitmq:consumer worker --limit=10
Message Inspection:
error_log('Raw message: ' . $message);
Custom Producers/Consumers:
OldSound\RabbitMqBundle\Producer or OldSound\RabbitMqBundle\Consumer for reusable logic:
class AsyncEmailProducer extends Producer
{
public function sendEmail($to, $subject, $body)
{
$this->publish(json_encode(compact('to', 'subject', 'body')));
}
}
Middleware:
class MessageLoggerSubscriber implements EventSubscriberInterface
{
public static function getSubscribedEvents()
{
return [
'rabbitmq.message.publish' => 'onPublish',
];
}
public function onPublish(PublishEvent $event)
{
$this->logger->info('Published message', ['data' => $event->getMessage()]);
}
}
Dynamic Exchanges:
$channel = $connection->getChannel();
$channel->exchangeDeclare('dynamic_exchange', 'topic', false, false, false);
Health Checks:
class RabbitMqHealthCheck extends AbstractHealthIndicator
{
public function check(HealthIndicatorInterface $indicator, ?string $name = null, array $options = []): bool
{
return $this->connection->isConnected();
}
}
How can I help you explore Laravel packages today?