enqueue/amqp-tools
AMQP Tools for PHP Enqueue: adds practical utilities not covered by the AMQP spec, built on top of AMQP concepts. Works with any amqp-interop compatible transport. Links to docs, support, and issue tracker included.
Installation:
composer require enqueue/amqp-tools
Ensure php-amqplib is installed (php-amqplib extension or via PECL).
Basic Connection:
use Enqueue\AmqpTools\AmqpConnectionFactory;
$connectionFactory = new AmqpConnectionFactory();
$connection = $connectionFactory->createConnection([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
]);
First Use Case: Publish a message to a queue:
$channel = $connection->createChannel();
$channel->queueDeclare('test_queue', false, true, false, false);
$channel->basicPublish('', 'test_queue', '', [
'message' => 'Hello, AMQP!'
]);
php-enqueue/amqp-tools tests for real-world usage.php-enqueue/amqp-ext for Laravel-specific helpers.Producer Pattern:
AmqpConnectionFactory to create reusable connections.class MessagePublisher
{
protected $connection;
public function __construct(AmqpConnectionFactory $factory)
{
$this->connection = $factory->createConnection(config('amqp'));
}
public function publish(string $queue, array $data): void
{
$channel = $this->connection->createChannel();
$channel->basicPublish('', $queue, '', json_encode($data));
}
}
Consumer Pattern:
AmqpConsumer or AmqpExt\Consumer (from amqp-ext) for Laravel.amqp-ext:use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\Client\Producer;
$connectionFactory = new AmqpConnectionFactory();
$producer = new Producer($connectionFactory, [
'connection' => 'default',
]);
$producer->send(new Message('test_queue', json_encode($data)));
Laravel Service Provider: Bind the connection factory and producer/consumer to the container:
public function register()
{
$this->app->singleton(AmqpConnectionFactory::class, function ($app) {
return new AmqpConnectionFactory();
});
$this->app->bind(Producer::class, function ($app) {
return new Producer(
$app->make(AmqpConnectionFactory::class),
['connection' => 'default']
);
});
}
Queue Configuration:
Store AMQP settings in config/amqp.php:
return [
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'user' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', '/'),
];
Retry Logic:
Use Enqueue\AmqpTools\RetryStrategy for transient failures:
$retryStrategy = new RetryStrategy();
$connection = $retryStrategy->createConnection($connectionFactory, config('amqp'));
Monitoring: Log connection/disconnection events:
$connection->addConnectionListener(function ($connection, $event) {
Log::debug("AMQP Connection {$event->getType()}");
});
Connection Leaks:
try-finally or Laravel's illuminate/support Manager to auto-close:
$channel = $connection->createChannel();
try {
$channel->basicPublish('', 'queue', '', $data);
} finally {
$channel->close();
$connection->close();
}
Serialization:
json_encode with non-UTF8) can corrupt messages.$message = json_encode($data, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);
Consumer Lag:
$channel->basic_qos(null, 100, null); // Prefetch 100 messages
Race Conditions:
basic_ack/basic_nack with no_ack=false and implement idempotent logic.Enable AMQP Debugging:
Set AMQP_DEBUG environment variable or configure php-amqplib:
$connection = new AMQPConnection([
'host' => 'localhost',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
'insist' => false,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'heartbeat' => 0,
'keepalive' => false,
'loglevel' => AMQP_ERR, // Enable debug logs
]);
Check Queue Metrics:
Use rabbitmqctl or management plugin to inspect queues:
rabbitmqctl list_queues name messages consumers
Custom Connection Factories:
Extend AmqpConnectionFactory for advanced scenarios (e.g., SSL/TLS):
class SslAmqpConnectionFactory extends AmqpConnectionFactory
{
public function createConnection(array $params): AMQPConnection
{
$params['ssl_options'] = [
'cafile' => '/path/to/cert.pem',
'local_cert' => '/path/to/client-cert.pem',
'local_key' => '/path/to/client-key.pem',
'verify_peer' => true,
'passphrase' => 'password',
];
return parent::createConnection($params);
}
}
Message Transformers:
Create a decorator for Enqueue\Message\Message to add metadata:
class MetadataMessage implements MessageInterface
{
protected $message;
protected $metadata;
public function __construct(MessageInterface $message, array $metadata)
{
$this->message = $message;
$this->metadata = $metadata;
}
public function getBody(): string
{
return $this->message->getBody() . json_encode($this->metadata);
}
}
Event Listeners: Subscribe to AMQP events for observability:
$connection->addConnectionListener(function ($connection, $event) {
if ($event->getType() === AMQPConnection::EVENT_DISCONNECTED) {
Log::error("AMQP Disconnected: {$event->getReason()}");
// Trigger reconnection logic
}
});
''). Ensure your queues are bound to the correct exchange if using named exchanges.passive: false, durable: true when declaring queues to persist across restarts:
$channel->queueDeclare('critical_queue', false, true, false, false);
heartbeat and keepalive:
$connection = new AMQPConnection([
'heartbeat' => 60, // Send heartbeats every 60s
'keepalive' => true,
]);
How can I help you explore Laravel packages today?