Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Amqp Tools Laravel Package

enqueue/amqp-tools

AMQP Tools for PHP Enqueue: adds practical utilities not covered by the AMQP spec, built on top of AMQP concepts. Works with any amqp-interop compatible transport. Links to docs, support, and issue tracker included.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Installation:

    composer require enqueue/amqp-tools
    

    Ensure php-amqplib is installed (php-amqplib extension or via PECL).

  2. Basic Connection:

    use Enqueue\AmqpTools\AmqpConnectionFactory;
    
    $connectionFactory = new AmqpConnectionFactory();
    $connection = $connectionFactory->createConnection([
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
    ]);
    
  3. First Use Case: Publish a message to a queue:

    $channel = $connection->createChannel();
    $channel->queueDeclare('test_queue', false, true, false, false);
    
    $channel->basicPublish('', 'test_queue', '', [
        'message' => 'Hello, AMQP!'
    ]);
    

Where to Look First


Implementation Patterns

Workflows

  1. Producer Pattern:

    • Use AmqpConnectionFactory to create reusable connections.
    • Declare queues/exchanges in a service class for consistency.
    class MessagePublisher
    {
        protected $connection;
    
        public function __construct(AmqpConnectionFactory $factory)
        {
            $this->connection = $factory->createConnection(config('amqp'));
        }
    
        public function publish(string $queue, array $data): void
        {
            $channel = $this->connection->createChannel();
            $channel->basicPublish('', $queue, '', json_encode($data));
        }
    }
    
  2. Consumer Pattern:

    • Use AmqpConsumer or AmqpExt\Consumer (from amqp-ext) for Laravel.
    • Example with amqp-ext:
    use Enqueue\AmqpExt\AmqpConnectionFactory;
    use Enqueue\Client\Producer;
    
    $connectionFactory = new AmqpConnectionFactory();
    $producer = new Producer($connectionFactory, [
        'connection' => 'default',
    ]);
    $producer->send(new Message('test_queue', json_encode($data)));
    
  3. Laravel Service Provider: Bind the connection factory and producer/consumer to the container:

    public function register()
    {
        $this->app->singleton(AmqpConnectionFactory::class, function ($app) {
            return new AmqpConnectionFactory();
        });
    
        $this->app->bind(Producer::class, function ($app) {
            return new Producer(
                $app->make(AmqpConnectionFactory::class),
                ['connection' => 'default']
            );
        });
    }
    

Integration Tips

  • Queue Configuration: Store AMQP settings in config/amqp.php:

    return [
        'host' => env('AMQP_HOST', 'localhost'),
        'port' => env('AMQP_PORT', 5672),
        'user' => env('AMQP_USER', 'guest'),
        'password' => env('AMQP_PASSWORD', 'guest'),
        'vhost' => env('AMQP_VHOST', '/'),
    ];
    
  • Retry Logic: Use Enqueue\AmqpTools\RetryStrategy for transient failures:

    $retryStrategy = new RetryStrategy();
    $connection = $retryStrategy->createConnection($connectionFactory, config('amqp'));
    
  • Monitoring: Log connection/disconnection events:

    $connection->addConnectionListener(function ($connection, $event) {
        Log::debug("AMQP Connection {$event->getType()}");
    });
    

Gotchas and Tips

Pitfalls

  1. Connection Leaks:

    • Issue: Forgetting to close channels/connections can exhaust AMQP server resources.
    • Fix: Use try-finally or Laravel's illuminate/support Manager to auto-close:
      $channel = $connection->createChannel();
      try {
          $channel->basicPublish('', 'queue', '', $data);
      } finally {
          $channel->close();
          $connection->close();
      }
      
  2. Serialization:

    • Issue: AMQP messages must be serialized to strings. Poor serialization (e.g., json_encode with non-UTF8) can corrupt messages.
    • Fix: Validate data before publishing:
      $message = json_encode($data, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);
      
  3. Consumer Lag:

    • Issue: Consumers may fall behind if processing is slow.
    • Fix: Use prefetch count to limit unacknowledged messages:
      $channel->basic_qos(null, 100, null); // Prefetch 100 messages
      
  4. Race Conditions:

    • Issue: Multiple consumers may process the same message if not handled carefully.
    • Fix: Use basic_ack/basic_nack with no_ack=false and implement idempotent logic.

Debugging

  • Enable AMQP Debugging: Set AMQP_DEBUG environment variable or configure php-amqplib:

    $connection = new AMQPConnection([
        'host' => 'localhost',
        'login' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'insist' => false,
        'connection_timeout' => 3.0,
        'read_write_timeout' => 3.0,
        'heartbeat' => 0,
        'keepalive' => false,
        'loglevel' => AMQP_ERR, // Enable debug logs
    ]);
    
  • Check Queue Metrics: Use rabbitmqctl or management plugin to inspect queues:

    rabbitmqctl list_queues name messages consumers
    

Extension Points

  1. Custom Connection Factories: Extend AmqpConnectionFactory for advanced scenarios (e.g., SSL/TLS):

    class SslAmqpConnectionFactory extends AmqpConnectionFactory
    {
        public function createConnection(array $params): AMQPConnection
        {
            $params['ssl_options'] = [
                'cafile' => '/path/to/cert.pem',
                'local_cert' => '/path/to/client-cert.pem',
                'local_key' => '/path/to/client-key.pem',
                'verify_peer' => true,
                'passphrase' => 'password',
            ];
            return parent::createConnection($params);
        }
    }
    
  2. Message Transformers: Create a decorator for Enqueue\Message\Message to add metadata:

    class MetadataMessage implements MessageInterface
    {
        protected $message;
        protected $metadata;
    
        public function __construct(MessageInterface $message, array $metadata)
        {
            $this->message = $message;
            $this->metadata = $metadata;
        }
    
        public function getBody(): string
        {
            return $this->message->getBody() . json_encode($this->metadata);
        }
    }
    
  3. Event Listeners: Subscribe to AMQP events for observability:

    $connection->addConnectionListener(function ($connection, $event) {
        if ($event->getType() === AMQPConnection::EVENT_DISCONNECTED) {
            Log::error("AMQP Disconnected: {$event->getReason()}");
            // Trigger reconnection logic
        }
    });
    

Config Quirks

  • Default Exchange: AMQPTools defaults to an empty exchange (''). Ensure your queues are bound to the correct exchange if using named exchanges.
  • Durable Queues: Set passive: false, durable: true when declaring queues to persist across restarts:
    $channel->queueDeclare('critical_queue', false, true, false, false);
    
  • Lazy Connections: For high-latency environments, configure heartbeat and keepalive:
    $connection = new AMQPConnection([
        'heartbeat' => 60, // Send heartbeats every 60s
        'keepalive' => true,
    ]);
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui
babelqueue/php-sdk
facebook/capi-param-builder-php
babelqueue/symfony
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle