Installation:
composer require php-amqplib/rabbitmq-bundle
Register the bundle in AppKernel.php:
new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
Configure RabbitMQ (config/packages/old_sound_rabbit_mq.yaml):
old_sound_rabbit_mq:
connections:
default:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
lazy: false
producers:
upload_picture_producer:
connection: default
exchange_options: {name: 'uploads', type: direct}
consumers:
upload_picture_consumer:
connection: default
exchange_options: {name: 'uploads', type: direct}
queue_options: {name: 'upload.picture'}
callback: upload_picture_callback
First Producer Use Case: In a controller or service, inject and use the producer:
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class UploadController extends AbstractController
{
public function uploadPicture(ProducerInterface $producer): void
{
$msg = ['user_id' => 123, 'path' => '/image.jpg'];
$producer->publish(serialize($msg), 'upload.picture');
}
}
First Consumer CLI Command: Run the consumer (50 messages at a time):
php bin/console rabbitmq:consumer upload_picture_consumer -m 50
Producer-Consumer Pattern:
upload_picture_producer).
$producer->publish(serialize($data), 'routing_key');
upload_picture_consumer).
Define a callback in config/packages/old_sound_rabbit_mq.yaml:
callback: App\Service\UploadPictureConsumer
Implement the callback as a service:
class UploadPictureConsumer implements ConsumerInterface
{
public function execute(Message $message)
{
$data = unserialize($message->body);
// Process $data...
}
}
Message Serialization:
serialize()/unserialize() for simplicity.json_encode() + json_decode()).Error Handling:
try {
$producer->publish($msg);
} catch (Exception $e) {
$this->logger->error('RabbitMQ publish failed', ['error' => $e->getMessage()]);
}
nack() for failed messages (requeue or dead-letter):
if (!$this->processMessage($message)) {
$message->nack(false); // Reject and requeue
}
Dynamic Routing:
$connection = $this->get('old_sound_rabbit_mq.default_producer_connection');
$channel = $connection->channel();
$channel->queue_bind('queue_name', 'exchange_name', 'tenant_routing_key');
Background Processing:
php bin/console rabbitmq:consumer consumer_name --limit=0 --timeout=30
Symfony Events:
kernel.terminate for async tasks):
$eventDispatcher->addListener('kernel.terminate', function () {
$this->get('old_sound_rabbit_mq.producer')->publish($data);
});
Doctrine Integration:
$entityManager->getEventManager()->addEventListener(
[PreUpdate::class, PostUpdate::class],
new RabbitMqEventListener($this->get('old_sound_rabbit_mq.producer'))
);
Retry Logic:
$attempts = 0;
while ($attempts < 3) {
try {
$this->execute($message);
break;
} catch (Exception $e) {
$attempts++;
sleep(2 ** $attempts);
}
}
Monitoring:
monolog):
# config/packages/monolog.yaml
handlers:
rabbitmq:
type: stream
path: "%kernel.logs_dir%/rabbitmq.log"
level: debug
rabbitmq:status command to check connections:
php bin/console rabbitmq:status
Connection Management:
lazy: true in config to avoid connection overhead on demand.
connections:
default:
lazy: true
heartbeat in config:
connections:
default:
heartbeat: 30
Message Ordering:
priority in queue options if needed:
queue_options:
name: 'priority_queue'
flags: {x-max-priority: 10}
Consumer Stuck:
try-catch).--timeout in CLI to auto-stop:
php bin/console rabbitmq:consumer consumer_name --timeout=60
Serialization Issues:
$producer->publish(json_encode($data), 'routing_key');
In consumer:
$data = json_decode($message->body, true);
Permissions:
rabbitmqctl set_permissions -p / guest ".*" ".*" ".*"
Check Queue Status:
rabbitmqctl list_queues
rabbitmqctl list_exchanges
Inspect Messages:
rabbitmq:consumer with --limit=1 to peek:
php bin/console rabbitmq:consumer consumer_name --limit=1
rabbitmqctl get_queue <queue_name>.Enable Debugging:
debug: true in config to log connection details:
old_sound_rabbit_mq:
debug: true
Common Errors:
ConnectionException: Verify RabbitMQ server is running and credentials are correct.ChannelException: Check for misconfigured exchanges/queues (e.g., typo in names).AMQPTimeoutException: Increase timeout in config or check network latency.Custom Producers/Consumers:
ProducerInterface/ConsumerInterface for reusable logic:
class AsyncEmailProducer implements ProducerInterface
{
public function publish($msg, $routingKey = '')
{
$this->get('old_sound_rabbit_mq.email_producer')->publish($msg, $routingKey);
}
}
Dynamic Exchanges/Queues:
$channel = $this->get('old_sound_rabbit_mq.default_producer_connection')->channel();
$channel->exchange_declare('dynamic_exchange', 'direct', false, false, false);
$channel->queue_declare('dynamic_queue', false, false, false, false);
$channel->queue_bind('dynamic_queue', 'dynamic_exchange', 'routing_key');
Middleware for Messages:
$producer->publish(
json_encode(['data' => $data, 'timestamp' => time()]),
'routing_key'
);
Testing:
How can I help you explore Laravel packages today?