Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Rabbitmq Bundle Laravel Package

cmobi/rabbitmq-bundle

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Install the Bundle:

    composer require cmobi/rabbitmq-bundle
    

    Enable the bundle in config/bundles.php:

    return [
        // ...
        Cmobi\RabbitMqBundle\CmobiRabbitMqBundle::class => ['all' => true],
    ];
    
  2. Configure RabbitMQ (config/packages/cmobi_rabbitmq.yaml):

    cmobi_rabbitmq:
        host: 'localhost'
        port: 5672
        user: 'guest'
        password: 'guest'
        vhost: '/'
        lazy: true  # Recommended for production
    
  3. First Use Case: Publishing a Message Inject the RabbitMqProducer service and publish a message:

    use Cmobi\RabbitMqBundle\Producer\RabbitMqProducer;
    
    class MyService
    {
        public function __construct(private RabbitMqProducer $producer)
        {}
    
        public function sendMessage()
        {
            $this->producer->publish(
                'my_queue',
                json_encode(['key' => 'value'])
            );
        }
    }
    

Implementation Patterns

Core Workflows

  1. Producer Pattern (Publish Messages)

    • Use RabbitMqProducer for one-off or batched messages:
      $producer->publish('queue_name', $message, [
          'content_type' => 'application/json',
          'delivery_mode' => AMQP_MSG_PERSISTENT, // Ensure durability
      ]);
      
    • Bulk Publishing: Loop and publish with error handling:
      foreach ($messages as $msg) {
          try {
              $producer->publish('queue', $msg);
          } catch (\Exception $e) {
              $this->handleFailure($msg, $e);
          }
      }
      
  2. Consumer Pattern (Subscribe to Queues)

    • Define a consumer class implementing RabbitMqConsumerInterface:
      use Cmobi\RabbitMqBundle\Consumer\RabbitMqConsumerInterface;
      
      class MyConsumer implements RabbitMqConsumerInterface
      {
          public function execute($message, $deliveryInfo, $properties)
          {
              // Process $message (e.g., json_decode)
              return AMQP_CONSUME_ACK; // Acknowledge
          }
      }
      
    • Register the consumer in services.yaml:
      services:
          App\Consumer\MyConsumer:
              tags:
                  - { name: 'cmobi_rabbitmq.consumer', queue: 'my_queue' }
      
  3. Exchange/Routing Key Usage

    • Publish to an exchange with routing keys:
      $producer->publishToExchange(
          'my_exchange',
          'routing.key',
          $message,
          ['content_type' => 'text/plain']
      );
      
    • Bind queues to exchanges in config:
      cmobi_rabbitmq:
          exchanges:
              my_exchange:
                  type: direct
                  queues:
                      - my_queue
                  bindings:
                      - { queue: 'my_queue', routing_key: 'routing.key' }
      
  4. Dependency Injection

    • Prefer constructor injection for producers/consumers:
      public function __construct(
          private RabbitMqProducer $producer,
          private RabbitMqConsumerRegistry $consumerRegistry
      ) {}
      
    • Access the connection directly (if needed):
      $connection = $this->producer->getConnection();
      

Integration Tips

  1. Symfony Events Bind RabbitMQ actions to Symfony events (e.g., kernel.terminate for graceful shutdown):

    use Symfony\Component\EventDispatcher\EventSubscriberInterface;
    
    class RabbitMqShutdownSubscriber implements EventSubscriberInterface
    {
        public static function getSubscribedEvents()
        {
            return ['kernel.terminate' => 'shutdown'];
        }
    
        public function shutdown()
        {
            $this->producer->closeConnection();
        }
    }
    
  2. Retry Logic Implement exponential backoff for transient failures:

    $attempts = 0;
    $maxAttempts = 3;
    while ($attempts < $maxAttempts) {
        try {
            $producer->publish('queue', $message);
            break;
        } catch (\Exception $e) {
            $attempts++;
            sleep(2 ** $attempts); // Exponential backoff
        }
    }
    
  3. Monitoring Log message metadata (e.g., routing key, timestamp) for debugging:

    $producer->publish('queue', $message, [
        'app_metadata' => [
            'user_id' => auth()->id(),
            'timestamp' => now()->toIso8601String(),
        ],
    ]);
    
  4. Testing Use a test container (e.g., rabbitmq:3-management) in phpunit.xml:

    <env name="RABBITMQ_HOST" value="localhost"/>
    <env name="RABBITMQ_PORT" value="5672"/>
    

    Mock the producer in unit tests:

    $this->mockBuilder(RabbitMqProducer::class)
         ->disableOriginalConstructor()
         ->getMock()
         ->method('publish')
         ->willReturn(true);
    

Gotchas and Tips

Pitfalls

  1. Connection Management

    • Lazy Connections: The lazy: true setting (default) delays connection until first use. Ensure connections are closed explicitly during shutdown to avoid leaks:
      $producer->closeConnection(); // Call this on app shutdown
      
    • Connection Pooling: The bundle does not implement pooling. For high-throughput apps, manage connections externally (e.g., via a singleton).
  2. Message Persistence

    • Non-Persistent Messages: By default, messages may be lost if RabbitMQ restarts. Always set delivery_mode: AMQP_MSG_PERSISTENT for critical messages:
      $producer->publish('queue', $message, ['delivery_mode' => AMQP_MSG_PERSISTENT]);
      
  3. Consumer Lifecycle

    • Manual Acknowledgment: Consumers must explicitly return AMQP_CONSUME_ACK or AMQP_CONSUME_REJECT. Forgetting this causes messages to pile up.
    • Consumer Tags: Each consumer must have a unique tag. Avoid hardcoding tags; use UUIDs or dynamic values:
      $consumerTag = uniqid();
      $channel->consume($this->execute(...), $consumerTag);
      
  4. Configuration Overrides

    • Environment Variables: The bundle does not natively support env vars. Override config manually in config/packages/cmobi_rabbitmq.yaml:
      cmobi_rabbitmq:
          host: '%env(RABBITMQ_HOST)%'
      
  5. Queue Declarations

    • Auto-Declaration: The bundle declares queues/exchanges on first use. For production, pre-declare queues with proper durability/TTL:
      cmobi_rabbitmq:
          queues:
              my_queue:
                  durable: true
                  exclusive: false
                  auto_delete: false
                  arguments:
                      'x-message-ttl': 86400000 # 24h TTL in ms
      

Debugging Tips

  1. Enable Debugging Set debug: true in config to log connection events:

    cmobi_rabbitmq:
        debug: true
    
  2. Check RabbitMQ Management UI

    • Access http://localhost:15672 (default credentials: guest/guest) to inspect queues, messages, and consumer status.
  3. Common Errors

    • ConnectionRefusedError: Verify RabbitMQ is running and credentials are correct.
    • Channel.Close: Often indicates a consumer error. Check logs for AMQP_* exceptions.
    • NotFound for Queues/Exchanges: Ensure queues/exchanges are declared before use or pre-configured in cmobi_rabbitmq.yaml.
  4. Logging Add a custom logger to the producer:

    $producer->setLogger($this->container->get('logger'));
    

Extension Points

  1. Custom Producers/Consumers Extend the base classes to add functionality:

    class CustomProducer extends RabbitMqProducer
    {
        public function publishWithRetry($queue, $message, $retries = 3)
        {
            // Implement retry logic
        }
    }
    

    Register the custom service in services.yaml:

    services:
        App\Producer\CustomProducer:
            parent: 'cmobi_rabbitmq.producer'
            arguments: ['@cmobi_rabbitmq.connection']
    
  2. Middleware for Messages Add preprocessing/postprocessing via a decorator:

    class LoggingProducer
    
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui
babelqueue/php-sdk
facebook/capi-param-builder-php
babelqueue/symfony
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle