enqueue/amqp-ext
AMQP transport for Enqueue implementing amqp-interop using the PHP amqp extension. Connect Enqueue to RabbitMQ/AMQP brokers with a native extension-based driver. Documentation and support links included.
Installation:
composer require enqueue/amqp-ext
Ensure ext-amqp is enabled in your PHP environment (php -m | grep amqp).
Basic Usage:
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
$connectionFactory = new AmqpConnectionFactory('amqp://guest:guest@localhost');
$context = new AmqpContext($connectionFactory);
$producer = $context->createProducer();
$producer->send(new \Enqueue\Message\Message('Hello, AMQP!'), ['queue' => 'test']);
First Use Case:
$producer->send(new \Enqueue\Message\Message('Delayed'), [
'queue' => 'delayed',
'delay' => 1000 // 1 second
]);
AmqpConnectionFactory for connection configuration (host, credentials, SSL).AmqpContext for managing producers/consumers.Producer-Consumer Pattern:
// Producer
$producer = $context->createProducer();
$producer->send(new \Enqueue\Message\Message('Task'), ['queue' => 'tasks']);
// Consumer (with manual acknowledgment)
$consumer = $context->createConsumer('tasks');
$consumer->consume(function (\Enqueue\Message\Message $message, \Enqueue\Context $context) {
// Process message
$context->ack($message);
});
Connection Management:
AmqpContext for multiple producers/consumers.AmqpConnectionFactory:
$factory = new AmqpConnectionFactory('amqp://user:pass@localhost', [
'connection_options' => [
'read_timeout' => 5.0,
'write_timeout' => 5.0,
],
]);
Error Handling:
try-catch for Enqueue\Exception\ExceptionInterface.AmqpContext::createConsumer() with a callable for graceful error recovery.Laravel Integration: Bind the context to Laravel’s service container:
$app->singleton(AmqpContext::class, function ($app) {
return new AmqpContext(new AmqpConnectionFactory(config('queue.amqp.url')));
});
Use Laravel’s queue system with enqueue/laravel:
composer require enqueue/laravel
Retry Logic:
Leverage Enqueue\AmqpExt\AmqpConnectionFactory with retry_strategy:
$factory->setRetryStrategy(new \Enqueue\Retry\ExponentialRetryStrategy());
Monitoring:
Enable AMQP metrics via amqp-tools:
$context = new AmqpContext($factory, new \Enqueue\AmqpExt\AmqpTools());
Connection Leaks:
AmqpContext as a singleton or ensure close() is called:
$context->close();
Message Serialization:
Enqueue\Message\JsonMessage for complex payloads:
$message = new \Enqueue\Message\JsonMessage(['data' => 'complex']);
Consumer Lag:
$consumer = $context->createConsumer('queue', [
'prefetch_count' => 10, // Default: 1
]);
SSL/TLS:
$factory = new AmqpConnectionFactory('amqp://user:pass@localhost', [
'connection_options' => [
'ssl_options' => [
'cafile' => '/path/to/ca.crt',
'verify_peer' => true,
],
],
]);
Enable AMQP Debugging:
putenv('AMQP_DEBUG=1');
Logs will appear in stderr.
Check Connection Status:
if (!$context->isConnected()) {
$context->reconnect();
}
Custom Message Handlers:
Enqueue\AmqpExt\AmqpMessage for custom headers/serialization.Middleware:
Enqueue\AmqpExt\Middleware\Middleware to intercept messages:
$context = new AmqpContext($factory, [], [
new class implements \Enqueue\Middleware\Middleware {
public function handle(\Enqueue\Message\Message $message, callable $next) {
// Pre-processing
$result = $next($message);
// Post-processing
return $result;
}
},
]);
Plugin System:
enqueue/amqp-tools for advanced features like:
Default Exchange:
The transport uses a direct exchange by default. For other exchange types (e.g., fanout), configure via AmqpContext:
$context = new AmqpContext($factory, [], [
'exchange' => [
'name' => 'custom_exchange',
'type' => 'fanout',
],
]);
Queue Declarations:
Ensure queues/exchanges are declared before use. Use AmqpTools for declarative setup:
$tools = new \Enqueue\AmqpExt\AmqpTools($context);
$tools->declareQueue('critical_tasks', ['durable' => true]);
How can I help you explore Laravel packages today?