enqueue/amqp-lib
AMQP transport for Enqueue implementing amqp-interop on top of php-amqplib. Connect to RabbitMQ/AMQP brokers to publish and consume messages, with links to docs, support, and issue tracking. MIT licensed.
Installation:
composer require enqueue/amqp-lib enqueue/amqp-ext
Ensure php-amqplib is installed (php-amqplib/php-amqplib).
Configure RabbitMQ Connection:
Add to config/queue.php under connections:
'amqp' => [
'driver' => 'amqp',
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'user' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', '/'),
'options' => [
'ssl_options' => [
'cafile' => env('AMQP_SSL_CAFILE', null),
],
],
],
First Use Case: Dispatch a job using the AMQP queue:
use Illuminate\Support\Facades\Queue;
Queue::connection('amqp')->push(new YourJobClass);
Verify Connection: Run a worker:
php artisan queue:work --queue=default --connection=amqp
Job Dispatching:
Use Laravel’s Queue::connection('amqp')->push() for AMQP-specific queues.
Queue::connection('amqp')->push(
new ProcessPodcast,
'podcasts' // Custom queue name
);
Consuming Messages:
Extend Enqueue\AmqpLib\AmqpContext for custom AMQP logic:
$context = new AmqpContext([
'host' => 'localhost',
'user' => 'guest',
'pass' => 'guest',
]);
$consumer = new AmqpConsumer($context, 'your_queue');
$consumer->consume(function (Message $message) {
// Process message
$message->ack();
});
Delayed Jobs:
Use delay() with AMQP’s built-in scheduling:
Queue::connection('amqp')->later(now()->addMinutes(10), new YourJob);
Fanout Exchanges: For pub/sub patterns, configure a fanout exchange:
$context = new AmqpContext([...]);
$exchange = new FanoutExchange($context, 'events');
$exchange->publish(new AmqpMessage('data'));
event(new PodcastProcessed($podcast))
->onQueue('podcast_events', 'amqp');
'amqp' => [
'driver' => 'amqp',
'options' => [
'dead_letter_exchange' => 'failed_jobs',
],
],
amqp-tools for queue inspection:
composer require enqueue/amqp-tools
php vendor/bin/amqp-tools list-queues
Connection Handling:
try {
$context->createProducer()->send($message);
} catch (AmqpConnectionException $e) {
retry()->times(3)->then(function () use ($message) {
$context->createProducer()->send($message);
});
}
reconnect_delay in options for graceful retries.Message Persistence:
$queue = new Queue($context, 'your_queue', Queue::DURABLE);
Consumer Lag:
$consumer->setPrefetchCount(10); // Process 10 messages at a time
SSL/TLS:
ssl_options are correctly configured. Test with:
openssl s_client -connect localhost:5671 -showcerts
$context = new AmqpContext([...], [
'logger' => new MonologLogger(new Logger('amqp')),
]);
/var/log/rabbitmq/rabbit@hostname.log.Custom Serializers: Override the default JSON serializer:
$context = new AmqpContext([...], [
'serializer' => new YourCustomSerializer(),
]);
Middleware: Add AMQP-specific middleware to Laravel’s queue:
Queue::connection('amqp')->middleware([
new AmqpLoggingMiddleware(),
]);
Plugin System:
Extend AmqpContext to add custom plugins (e.g., metrics):
$context = new AmqpContext([...]);
$context->addPlugin(new AmqpMetricsPlugin());
How can I help you explore Laravel packages today?