Installation Add the package via Composer:
composer require xlabs/rabbitmqbundle
Register the bundle in config/app.php under providers:
Xlabs\RabbitMQBundle\RabbitMQServiceProvider::class,
Configuration Publish the default config:
php artisan vendor:publish --provider="Xlabs\RabbitMQBundle\RabbitMQServiceProvider" --tag=config
Update config/rabbitmq.php with your RabbitMQ server details (host, port, credentials, etc.).
First Use Case: Publishing a Message
Inject the RabbitMQConnection service and publish a message:
use Xlabs\RabbitMQBundle\RabbitMQConnection;
public function __construct(RabbitMQConnection $rabbitMQ)
{
$this->rabbitMQ = $rabbitMQ;
}
public function sendMessage()
{
$this->rabbitMQ->publish(
'exchange_name',
'routing_key',
'message_body',
['content_type' => 'application/json']
);
}
First Use Case: Consuming a Message
Define a consumer class and register it in config/rabbitmq.php under consumers:
'consumers' => [
'my_consumer' => [
'queue' => 'my_queue',
'class' => \App\Consumers\MyConsumer::class,
'callback' => 'handleMessage',
],
],
Implement the consumer:
namespace App\Consumers;
class MyConsumer
{
public function handleMessage($message)
{
// Process message
return $message; // Acknowledge
}
}
Producer Workflow
publish() for direct exchanges or queuePublish() for queues.$this->rabbitMQ->publish('exchange', 'routing_key', $message, [
'content_type' => 'application/json',
'expiration' => 5000, // 5 seconds in ms
]);
$this->rabbitMQ->txSelect();
foreach ($messages as $message) {
$this->rabbitMQ->publish('exchange', 'key', $message);
}
$this->rabbitMQ->txCommit();
Consumer Workflow
config/rabbitmq.php with their queues, classes, and methods.prefetch_count in the config.try-catch in consumer methods and log failures.
public function handleMessage($message)
{
try {
// Process logic
} catch (\Exception $e) {
\Log::error('Consumer error: ' . $e->getMessage());
throw $e; // Reject message
}
}
RabbitMQConnection directly to declare queues/exchanges and bind consumers at runtime.Integration with Laravel Jobs
// Inside consumer
dispatch(new ProcessOrder($orderData));
// Inside job
$this->rabbitMQ->publish('orders', 'new_order', $orderData);
Exchange and Queue Management
$this->rabbitMQ->declareExchange('my_exchange', 'direct', false, false, false);
$this->rabbitMQ->declareQueue('my_queue', false, false, false);
$this->rabbitMQ->bindQueue('my_queue', 'my_exchange', 'routing.key');
Message Serialization
'serializer' => [
'class' => \App\Services\CustomSerializer::class,
'method' => 'serialize',
],
CustomSerializer:
class CustomSerializer
{
public function serialize($data) { /* ... */ }
public function unserialize($data) { /* ... */ }
}
Retry Mechanism
requeue or implement a retry queue:
public function handleMessage($message)
{
if ($failed) {
$this->rabbitMQ->reject($message, false); // Requeue
}
}
Monitoring and Metrics
\Log::info('Message published', ['exchange' => 'exchange_name', 'routing_key' => 'key']);
Connection Management
reconnect() or implement a heartbeat in the config:
'connection' => [
'heartbeat' => 60, // seconds
],
while (true) {
try {
$this->rabbitMQ->consume();
} catch (\Exception $e) {
sleep(5);
$this->rabbitMQ->reconnect();
}
}
Message Acknowledgment
auto_ack: false in config and manually acknowledge:
'consumers' => [
'my_consumer' => [
'auto_ack' => false,
// ...
],
],
public function handleMessage($message) {
// Process
return $message; // Acknowledges
}
Queue Binding
$this->rabbitMQ->bindQueue('queue', 'exchange', 'routing.key');
declareQueue() with passive: true to check if a queue exists:
$this->rabbitMQ->declareQueue('queue', true, false, false, false);
Configuration Overrides
php artisan config:clear
'host' => env('RABBITMQ_HOST', 'localhost'),
Enable Debugging
debug: true in config to log connection and message details:
'debug' => env('RABBITMQ_DEBUG', false),
Check Connection
$this->rabbitMQ->isConnected();
Inspect Queues/Exchanges
$queues = $this->rabbitMQ->listQueues();
$exchanges = $this->rabbitMQ->listExchanges();
Message Inspection
\Log::debug('Raw message', ['body' => $message->body]);
Custom Middleware
'middleware' => [
\App\Middleware\LoggingMiddleware::class,
],
class LoggingMiddleware
{
public function handle($message, \Closure $next)
{
\Log::info('Message processed', ['body' => $message->body]);
return $next($message);
}
}
Event Dispatching
event(new OrderProcessed($orderData));
Dynamic Queue Declaration
$queueName = 'dynamic_queue_' . $userId;
$this->rabbitMQ->declareQueue($queueName, false, false, false, ['exclusive' => true]);
Plugin System
$this->rabbitMQ->plugin('monitoring')->track('event_name');
How can I help you explore Laravel packages today?