Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Rabbitmq Bundle Laravel Package

constantable/rabbitmq-bundle

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Installation:

    composer require constantable/rabbitmq-bundle
    

    Add to config/bundles.php:

    return [
        // ...
        Constantable\RabbitMqBundle\RabbitMqBundle::class => ['all' => true],
    ];
    
  2. Configure config/packages/rabbitmq.yaml:

    rabbit_mq:
        hosts:
            default:
                host:     'localhost'
                port:     5672
                user:     'guest'
                password: 'guest'
                vhost:    '/'
        connections:
            default: ~
        producers:
            upload_picture_producer:
                connection:       default
                exchange_options: { name: 'upload_pictures', type: direct }
        consumers:
            upload_picture_consumer:
                connection:       default
                exchange_options: { name: 'upload_pictures', type: direct }
                queue_options:    { name: 'upload_pictures' }
                callback:         'app.callback.upload_picture'
    
  3. First Use Case: Publish a message from a controller:

    use Symfony\Component\HttpFoundation\Response;
    
    class UploadController extends AbstractController
    {
        public function uploadPicture(): Response
        {
            $message = ['user_id' => 123, 'image_path' => '/path/to/image.jpg'];
            $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($message));
            return new Response('Message published!');
        }
    }
    
  4. Consume Messages via CLI:

    php bin/console rabbitmq:consumer upload_picture_consumer
    

Implementation Patterns

Core Workflows

  1. Producer-Consumer Pattern:

    • Producers: Publish messages to exchanges (e.g., upload_picture_producer).
      $producer = $this->get('old_sound_rabbit_mq.upload_picture_producer');
      $producer->publish(serialize($data), ['content_type' => 'application/json']);
      
    • Consumers: Process messages from queues (configured in rabbitmq.yaml).
      consumers:
          image_processor:
              callback: 'App\Consumer\ImageProcessor::handle'
      
  2. Message Serialization:

    • Use serialize() for simple data or JSON for structured payloads:
      $producer->publish(json_encode($data), ['content_type' => 'application/json']);
      
    • Deserialize in the consumer callback:
      public static function handle($message, $messageInfo, RabbitMqConnection $connection)
      {
          $data = json_decode($message, true);
          // Process $data
      }
      
  3. Error Handling:

    • Reject messages with nack() and optionally requeue:
      if ($data['user_id'] === 0) {
          $connection->reject($messageInfo->getDeliveryTag(), false, false);
          return;
      }
      $connection->ack($messageInfo->getDeliveryTag());
      
  4. Dynamic Routing:

    • Bind queues to exchanges with routing keys:
      exchanges:
          logs:
              name:     'logs'
              type:     topic
      queues:
          error_logs:
              name:     'error_logs'
              exchange: 'logs'
              routing_key: 'error.#'
      
  5. Prefetch Count:

    • Limit concurrent messages per consumer (configure in rabbitmq.yaml):
      consumers:
          worker:
              prefetch_count: 10
      

Integration Tips

  1. Symfony Events:

    • Trigger events on message publish/consume:
      $producer->publish($message);
      $this->get('event_dispatcher')->dispatch(new MessagePublishedEvent($message));
      
  2. Doctrine Integration:

    • Use transactions for atomicity:
      $entityManager->beginTransaction();
      try {
          $producer->publish($message);
          $entityManager->commit();
      } catch (\Exception $e) {
          $entityManager->rollback();
          throw $e;
      }
      
  3. Retry Logic:

    • Implement dead-letter exchanges (DLX) for failed messages:
      queues:
          failed_jobs:
              name:     'failed_jobs'
              exchange: 'dead_letter_exchange'
      
  4. Monitoring:

    • Log message metadata (e.g., messageInfo->getDeliveryTag()) for debugging.

Gotchas and Tips

Pitfalls

  1. Connection Management:

    • Issue: RabbitMQ connections may drop silently. Consumers may hang without errors.
    • Fix: Use rabbitmq:setup to verify connections and wrap consumers in retry logic:
      try {
          $connection->consume();
      } catch (\Exception $e) {
          sleep(5);
          retry();
      }
      
  2. Serialization Mismatches:

    • Issue: Serialized data in producers must match deserialization in consumers.
    • Fix: Standardize on JSON or serialize() across the app. Avoid mixing formats.
  3. Consumer Lifecycle:

    • Issue: Consumers run indefinitely in CLI. Terminate gracefully with SIGINT:
      php bin/console rabbitmq:consumer worker --stop-when-empty
      
  4. Configuration Overrides:

    • Issue: Bundle config may conflict with php-amqplib defaults.
    • Fix: Explicitly set all required options in rabbitmq.yaml (e.g., host, port).
  5. Queue Visibility:

    • Issue: Messages may be "stuck" if consumers crash without acknowledging.
    • Fix: Set queue_options: { flags: { 'x-message-ttl': 300000 } } (5-minute TTL) to auto-delete stale messages.

Debugging Tips

  1. RabbitMQ Management UI:

    • Access http://localhost:15672 (default credentials: guest/guest) to inspect queues, exchanges, and messages.
  2. Logging:

    • Enable debug logging in config/packages/monolog.yaml:
      handlers:
          rabbitmq:
              type: stream
              path: "%kernel.logs_dir%/rabbitmq.log"
              level: debug
      
  3. Consumer Debugging:

    • Use --limit to test consumers with a subset of messages:
      php bin/console rabbitmq:consumer worker --limit=10
      
  4. Message Inspection:

    • Dump raw messages in the consumer callback:
      error_log('Raw message: ' . $message);
      

Extension Points

  1. Custom Producers/Consumers:

    • Extend OldSound\RabbitMqBundle\Producer or OldSound\RabbitMqBundle\Consumer for reusable logic:
      class AsyncEmailProducer extends Producer
      {
          public function sendEmail($to, $subject, $body)
          {
              $this->publish(json_encode(compact('to', 'subject', 'body')));
          }
      }
      
  2. Middleware:

    • Intercept messages with a subscriber:
      class MessageLoggerSubscriber implements EventSubscriberInterface
      {
          public static function getSubscribedEvents()
          {
              return [
                  'rabbitmq.message.publish' => 'onPublish',
              ];
          }
      
          public function onPublish(PublishEvent $event)
          {
              $this->logger->info('Published message', ['data' => $event->getMessage()]);
          }
      }
      
  3. Dynamic Exchanges:

    • Create exchanges/queues programmatically:
      $channel = $connection->getChannel();
      $channel->exchangeDeclare('dynamic_exchange', 'topic', false, false, false);
      
  4. Health Checks:

    • Add a Symfony health check endpoint:
      class RabbitMqHealthCheck extends AbstractHealthIndicator
      {
          public function check(HealthIndicatorInterface $indicator, ?string $name = null, array $options = []): bool
          {
              return $this->connection->isConnected();
          }
      }
      
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
emuniq/filament-browser-notifications
syriable/filament-translator
hungnm28/livewire-form
wenprise/eloquent
crudly/encrypted
fadion/bouncy
cuci/prototurk-sdk
gos/pubsub-router-bundle
cuci/prototurk-sdk-symfony
clementtalleu/easyadmin-markdown-bundle
codeflextech/permission-manager
karnoweb/livewire-datepicker
sayedenam/sayed-dashboard
milito/query-filter
apiboxsym/user-bundle
apiboxsym/health-check-bundle
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui