enqueue/amqp-bunny
AMQP transport for Enqueue using the Bunny PHP AMQP client. Implements amqp-interop for working with RabbitMQ-compatible brokers, providing a lightweight driver with Enqueue’s messaging/queue ecosystem.
Install the package:
composer require enqueue/amqp-bunny
Ensure bunny/bunny (AMQP client) and queue-interop/amqp-interop are also installed.
Configure the transport:
Add to config/queue.php under connections:
'amqp-bunny' => [
'dsn' => env('AMQP_DSN', 'amqp://guest:guest@localhost:5672/%2f'),
'options' => [
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
],
'queue' => env('AMQP_QUEUE', 'default'),
'exchange' => env('AMQP_EXCHANGE', 'default'),
'routing_key' => env('AMQP_ROUTING_KEY', 'default'),
'prefetch_count' => env('AMQP_PREFETCH', 1),
],
First use case: Push a job to the queue:
use Enqueue\AmqpBunny\AmqpBunnyConnectionFactory;
use Enqueue\Client\Producer;
$connectionFactory = new AmqpBunnyConnectionFactory();
$connection = $connectionFactory->createConnection(['dsn' => config('queue.connections.amqp-bunny.dsn')]);
$producer = new Producer($connection);
$producer->send(new Message('Hello, AMQP!'));
Producer-Consumer Pattern:
$producer = $connection->createProducer();
$producer->send(new Message('Job payload', ['priority' => 5]));
$consumer = $connection->createConsumer(config('queue.connections.amqp-bunny.queue'));
while ($message = $consumer->receive()) {
try {
// Process message
$message->ack();
} catch (\Exception $e) {
$message->reject();
}
}
Laravel Queue Integration: Extend Laravel’s queue system to use AMQP:
// config/queue.php
'default' => env('QUEUE_CONNECTION', 'amqp-bunny'),
Dispatch jobs as usual:
dispatch(new ProcessPodcast());
Error Handling:
Use Message::reject() with a requeue flag for transient failures:
if ($message->getPayload() === 'fail') {
$message->reject(false); // Do not requeue
}
HorizonServiceProvider and overriding connection().$consumer = $connection->createConsumer('dlq');
$consumer->setMessageHandler(function (Message $message) {
// Log and reprocess
});
$connection = $connectionFactory->createConnection(['dsn' => $dsn]);
$producer = new Producer($connection);
Connection Timeouts:
read_write_timeout in config.config/queue.php:
'options' => [
'read_write_timeout' => 10.0,
],
Message Ordering:
priority headers or a single-consumer queue for ordered processing.Consumer Lag:
prefetch_count can overwhelm workers. Start with 1 and adjust based on workload.Queue Binding:
routing_key:
$connection->createChannel()->queueBind(
config('queue.connections.amqp-bunny.queue'),
config('queue.connections.amqp-bunny.exchange'),
config('queue.connections.amqp-bunny.routing_key')
);
BUNNY_DEBUG environment variable to log raw AMQP commands:
export BUNNY_DEBUG=1
bunny/bunny's Connection::isConnected() to verify connectivity before producing/consuming.Custom Message Serialization: Override the default JSON serializer:
$producer->setSerializer(new \Enqueue\AmqpBunny\Serializer\JsonSerializer());
Or implement MessageSerializerInterface for custom formats (e.g., Protobuf).
Middleware: Add processing logic before/after message handling:
$consumer->setMiddleware([
new \Enqueue\AmqpBunny\Middleware\LoggingMiddleware(),
new class implements Middleware {
public function handle(Message $message, callable $next) {
// Pre-processing
$result = $next($message);
// Post-processing
return $result;
}
},
]);
Dynamic Queue Routing:
Use Message::setProperty() to dynamically route messages:
$message->setProperty('x-priority', 10); // High priority
$producer->send($message);
How can I help you explore Laravel packages today?