queue-interop/amqp-interop
AMQP interop interfaces for PHP message queues. Defines common contracts to work with AMQP brokers (e.g., RabbitMQ) across different clients and frameworks, enabling portable producers/consumers, exchanges, queues, and message handling without vendor lock-in.
Installation Add the package via Composer:
composer require queue-interop/amqp-interop
Ensure your Laravel project uses PHP 7.1+ (compatible with the package).
Basic Setup
Register the AMQP transport in Laravel’s config/queue.php:
'connections' => [
'amqp' => [
'driver' => 'amqp',
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'user' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', '/'),
'queue' => env('AMQP_QUEUE', 'laravel'),
'options' => [
'ssl' => env('AMQP_SSL', false),
'read_write_timeout' => 15,
],
],
],
First Use Case: Dispatching a Job
Create a job (e.g., SendEmailJob) and dispatch it to the AMQP queue:
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class SendEmailJob implements ShouldQueue
{
use Dispatchable, Queueable;
public function handle()
{
// Job logic
}
}
// Dispatch
SendEmailJob::dispatch();
Verify Queue Connection Run the queue worker:
php artisan queue:work amqp --queue=laravel
Queue Configuration
env() to switch queues per environment (e.g., dev, staging, prod).$connection = app('amqp.connection');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->queue_declare('logs_queue', false, false, false, false);
$channel->queue_bind('logs_queue', 'logs');
Job Prioritization
SendEmailJob::dispatch()->onQueue('high_priority')->priority(10);
Delayed Jobs
delay():
SendEmailJob::dispatch()->delay(now()->addMinutes(10));
Consumer Groups
php artisan queue:work amqp --queue=laravel --daemon
Publish/Subscribe
// Publisher (e.g., in a controller)
$connection->channel()->basic_publish(
'', 'logs', new AMQPMessage('Event data')
);
// Consumer (in a worker)
$channel->consume('logs_queue', function ($msg) {
// Handle message
}, ['no_ack' => false]);
use Illuminate\Support\Facades\Event;
Event::listen('order.created', function ($order) {
$connection->channel()->basic_publish(
'', 'orders', new AMQPMessage(json_encode($order))
);
});
app/Exceptions/Handler.php:
public function register()
{
$this->reportable(function (JobFailedException $e) {
if ($e->job instanceof SendEmailJob) {
// Log or alert for critical failures
}
});
}
php-amqplib’s built-in metrics or integrate with tools like Prometheus via middleware.Connection Management
try {
$channel = $connection->channel();
} catch (AMQPConnectionException $e) {
$connection = new AMQPConnection($host, $port, $user, $pass);
$channel = $connection->channel();
}
read_write_timeout in config to avoid indefinite hangs.Message Serialization
json_encode()/json_decode() or a library like spatie/array-to-object:
$message = new AMQPMessage(json_encode($data));
$data = json_decode($channel->basic_get()->body, true);
Ack/Nack Behavior
ack/nack messages in consumers:
$channel->consume('queue', function ($msg) {
try {
// Process
$msg->ack();
} catch (\Exception $e) {
$msg->nack(false, false, 5); // Requeue after 5 retries
}
});
Queue Visibility
basic_get() with no_ack: false and manually ack after processing.SSL/TLS Issues
config/queue.php:
'options' => [
'ssl' => [
'local_cert' => env('AMQP_SSL_CERT'),
'passphrase' => env('AMQP_SSL_PASSPHRASE'),
'verify_peer' => false, // Disable for testing only!
],
],
$connection->setReadWriteTimeout(15);
$connection->setDebug(true); // Logs raw AMQP traffic
php-amqplib's CLI tools or GUI tools like RabbitMQ Management Plugin to inspect queues/exchanges.$channel->consume('queue', function ($msg) {
logger()->debug('Received:', ['body' => $msg->body]);
});
Illuminate\Queue\AmqpQueue to add broker-specific logic:
class CustomAmqpQueue extends AmqpQueue
{
public function pushRaw($job, $data, $queue = null)
{
// Custom logic (e.g., set headers)
$message = new AMQPMessage($data, [
'app_id' => 'laravel',
'priority' => $job->priority,
]);
$this->publish($message, $queue);
}
}
class LogAmqpJobMiddleware
{
public function handle($job, $next)
{
logger()->info('AMQP Job dispatched', ['job' => get_class($job)]);
return $next($job);
}
}
Register in app/Console/Kernel.php:
protected function schedule(Schedule $schedule)
{
$schedule->job(SendEmailJob::class)
->middleware(LogAmqpJobMiddleware::class);
}
amqp.message in Laravel 8+) via service providers:
public function boot()
{
event(new AMQPMessageEvent($message));
}
How can I help you explore Laravel packages today?