iamfarhad/laravel-rabbitmq
Production-ready RabbitMQ queue driver for Laravel with native Queue integration. Built on ext-amqp with connection/channel pooling, configurable topology, Horizon hooks, Octane-safe resets, and optional high-performance basic_consume workers plus admin Artisan commands.
## Getting Started
### Minimal Setup
1. **Installation**
```bash
composer require iamfarhad/laravel-rabbitmq:^1.2.0
Publish the config (optional but recommended for customization):
php artisan vendor:publish --provider="Farhad\RabbitMQ\RabbitMQServiceProvider" --tag="config"
Configure .env
QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
First Use Case: Dispatch a Job
use App\Jobs\ProcessOrder;
ProcessOrder::dispatch($orderId); // Uses RabbitMQ as the default queue
Verify Connection
Check the rabbitmq:monitor command for connection health:
php artisan rabbitmq:monitor
queue as the default exchange/queue name (configurable in config/rabbitmq.php).config/rabbitmq.php under queues:
'queues' => [
'orders' => [
'exchange' => 'orders_exchange',
'queue' => 'orders_queue',
'binding_key' => 'order.*',
'durable' => true,
'dead_letter_exchange' => 'orders_dlq',
'dead_letter_routing_key' => 'dlq.order',
],
],
$this->rabbitMQ->queue('dynamic_queue', ['durable' => true]);
ProcessOrder::dispatch($orderId)->onQueue('orders');
priority option (requires RabbitMQ priority plugin):
ProcessOrder::dispatch($orderId)->onQueue('high_priority')->priority(1);
ProcessOrder::dispatch($orderId)->delay(now()->addMinutes(10));
php artisan queue:work rabbitmq --queue=orders
php artisan queue:work rabbitmq --daemon --sleep=3 --tries=3
$this->rabbitMQ->consume('orders', function ($job) {
// Custom processing logic
$job->delete(); // Manually acknowledge
});
$connection = $this->rabbitMQ->connection();
$channel = $connection->channel();
config/rabbitmq.php):
'pool' => [
'connections' => 5,
'channels' => 10,
],
$this->rabbitMQ->publish('order.created', ['order_id' => 123]);
$this->rabbitMQ->subscribe('order.*', function ($message) {
// Handle message
});
ProcessOrder::dispatch($orderId)->retryAfter(5);
ProcessOrder::dispatch($orderId)->retryUntil(function () {
return DB::table('orders')->where('id', $orderId)->exists();
});
database/sync drivers with rabbitmq in QUEUE_CONNECTION.event(new OrderCreated($orderId))->onQueue('events');
chunk() for large datasets:
User::chunk(100, function ($users) {
ProcessUsers::dispatch($users)->onQueue('batch_users');
});
GenerateReport::dispatch($reportId)->onQueue('reports');
$stats = $this->rabbitMQ->stats();
// Access: $stats->connections, $stats->channels, $stats->messages
rabbitmq:monitor command or custom middleware.Connection Leaks
try-catch blocks close resources:
try {
$channel = $this->rabbitMQ->channel();
// Work with channel
} finally {
$channel->close();
}
RabbitMQ::withChannel() for scoped channel management:
$this->rabbitMQ->withChannel(function ($channel) {
// Channel auto-closes after block
});
Message Loss
$this->rabbitMQ->consume('queue', function ($job) {
try {
// Process job
$job->ack(); // Explicit acknowledge
} catch (\Exception $e) {
$job->nack(); // Negative acknowledge (requeue)
}
});
auto_ack: false in queue config for manual control.Exponential Backoff Misconfiguration
config/rabbitmq.php:
'retry' => [
'max_attempts' => 5,
'backoff' => [1, 2, 3, 5, 8], // Custom delays in seconds
],
Queue Binding Errors
binding_key or missing exchanges can cause silent failures.php artisan rabbitmq:monitor
declareQueue() to ensure queues/exchanges exist:
$this->rabbitMQ->declareQueue('orders', ['durable' => true]);
Memory Leaks in Consumers
sleep() between batches or limit consumer runtime:
php artisan queue:work rabbitmq --sleep=5 --memory=128
Compatibility Issues
composer update laravel/framework --with-all-dependencies
Enable Debug Logging
Add to .env:
RABBITMQ_LOG_LEVEL=debug
View logs with:
tail -f storage/logs/laravel.log
RabbitMQ Management UI
Access http://localhost:15672 (default credentials: guest/guest) to inspect queues, messages, and connections.
Common Errors
AMQPConnectionException: Check credentials, network, or RabbitMQ service status.ChannelException: Likely a channel leak or misconfiguration. Run rabbitmq:monitor to check pool stats.NotFoundException: Queue/exchange not declared. Use declareQueue() or verify bindings.Configuration Overrides Override settings per environment:
config(['rabbitmq.queues.orders.durable' => env('RABBITMQ_ORDERS_DURABLE', true)]);
Dynamic Queue Routing Route jobs dynamically based on payload:
ProcessOrder::dispatch($orderId)
How can I help you explore Laravel packages today?