queue-interop/amqp-interop
Set of PHP interfaces and contracts for AMQP messaging interoperability. Standardizes producers, consumers, contexts and messages so different AMQP client libraries and queue implementations can be swapped without changing application code.
Installation Add the package via Composer:
composer require queue-interop/amqp-interop
Ensure your Laravel project uses PHP 7.1+ (compatible with the package).
Basic Setup
Register the AMQP transport in Laravel’s config/queue.php:
'connections' => [
'amqp' => [
'driver' => 'amqp',
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'user' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', '/'),
'queue' => env('AMQP_QUEUE', 'laravel'),
'options' => [
'ssl' => env('AMQP_SSL', false),
'read_write_timeout' => 15,
],
],
],
First Use Case: Dispatching a Job
Create a job (e.g., SendEmailJob) and dispatch it to the AMQP queue:
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class SendEmailJob implements ShouldQueue
{
use Dispatchable, Queueable;
public function handle()
{
// Job logic
}
}
// Dispatch
SendEmailJob::dispatch();
Verify Queue Connection Run the queue worker:
php artisan queue:work amqp --queue=laravel
Queue Configuration
env() to switch queues per environment (e.g., dev, staging, prod).$connection = app('amqp.connection');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->queue_declare('logs_queue', false, false, false, false);
$channel->queue_bind('logs_queue', 'logs');
Job Prioritization
SendEmailJob::dispatch()->onQueue('high_priority')->priority(10);
Delayed Jobs
delay():
SendEmailJob::dispatch()->delay(now()->addMinutes(10));
Consumer Groups
php artisan queue:work amqp --queue=laravel --daemon
Publish/Subscribe
// Publisher (e.g., in a controller)
$connection->channel()->basic_publish(
'', 'logs', new AMQPMessage('Event data')
);
// Consumer (in a worker)
$channel->consume('logs_queue', function ($msg) {
// Handle message
}, ['no_ack' => false]);
use Illuminate\Support\Facades\Event;
Event::listen('order.created', function ($order) {
$connection->channel()->basic_publish(
'', 'orders', new AMQPMessage(json_encode($order))
);
});
app/Exceptions/Handler.php:
public function register()
{
$this->reportable(function (JobFailedException $e) {
if ($e->job instanceof SendEmailJob) {
// Log or alert for critical failures
}
});
}
php-amqplib’s built-in metrics or integrate with tools like Prometheus via middleware.Connection Management
try {
$channel = $connection->channel();
} catch (AMQPConnectionException $e) {
$connection = new AMQPConnection($host, $port, $user, $pass);
$channel = $connection->channel();
}
read_write_timeout in config to avoid indefinite hangs.Message Serialization
json_encode()/json_decode() or a library like spatie/array-to-object:
$message = new AMQPMessage(json_encode($data));
$data = json_decode($channel->basic_get()->body, true);
Ack/Nack Behavior
ack/nack messages in consumers:
$channel->consume('queue', function ($msg) {
try {
// Process
$msg->ack();
} catch (\Exception $e) {
$msg->nack(false, false, 5); // Requeue after 5 retries
}
});
Queue Visibility
basic_get() with no_ack: false and manually ack after processing.SSL/TLS Issues
config/queue.php:
'options' => [
'ssl' => [
'local_cert' => env('AMQP_SSL_CERT'),
'passphrase' => env('AMQP_SSL_PASSPHRASE'),
'verify_peer' => false, // Disable for testing only!
],
],
$connection->setReadWriteTimeout(15);
$connection->setDebug(true); // Logs raw AMQP traffic
php-amqplib's CLI tools or GUI tools like RabbitMQ Management Plugin to inspect queues/exchanges.$channel->consume('queue', function ($msg) {
logger()->debug('Received:', ['body' => $msg->body]);
});
Illuminate\Queue\AmqpQueue to add broker-specific logic:
class CustomAmqpQueue extends AmqpQueue
{
public function pushRaw($job, $data, $queue = null)
{
// Custom logic (e.g., set headers)
$message = new AMQPMessage($data, [
'app_id' => 'laravel',
'priority' => $job->priority,
]);
$this->publish($message, $queue);
}
}
class LogAmqpJobMiddleware
{
public function handle($job, $next)
{
logger()->info('AMQP Job dispatched', ['job' => get_class($job)]);
return $next($job);
}
}
Register in app/Console/Kernel.php:
protected function schedule(Schedule $schedule)
{
$schedule->job(SendEmailJob::class)
->middleware(LogAmqpJobMiddleware::class);
}
amqp.message in Laravel 8+) via service providers:
public function boot()
{
event(new AMQPMessageEvent($message));
}
How can I help you explore Laravel packages today?