Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Message Outbox For Doctrine Laravel Package

eventsauce/message-outbox-for-doctrine

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Steps

  1. Install Dependencies

    composer require doctrine/dbal eventsauce/eventsauce eventsauce/message-outbox-for-doctrine
    
  2. Set Up Database Schema Run the provided migration or define the message_outbox table manually:

    Schema::create('message_outbox', function (Blueprint $table) {
        $table->id();
        $table->string('message_type');
        $table->text('message_body');
        $table->string('aggregate_id');
        $table->string('aggregate_type');
        $table->timestamp('occurred_on');
        $table->timestamp('created_at')->useCurrent();
        $table->timestamp('processed_at')->nullable();
        $table->index(['aggregate_id', 'aggregate_type']);
        $table->index(['processed_at']);
    });
    
  3. First Use Case: Publishing an Event

    use EventSauce\MessageOutbox\MessageOutbox;
    use EventSauce\MessageOutboxForDoctrine\DoctrineMessageOutbox;
    use Doctrine\DBAL\Connection;
    
    // In a service or repository
    $connection = app(Connection::class);
    $outbox = new DoctrineMessageOutbox($connection, new MessageOutbox());
    
    // Publish an event (e.g., after saving an entity)
    $outbox->publish(
        new UserRegistered('user-123', 'user@example.com'),
        new \DateTimeImmutable(),
        'user-123',
        'User'
    );
    
  4. Process Outbox Events Create a Laravel command to poll and process pending events:

    use Illuminate\Console\Command;
    use EventSauce\MessageOutbox\MessageOutbox;
    
    class ProcessOutboxEvents extends Command
    {
        protected $signature = 'events:process-outbox';
        protected $description = 'Process pending events from the outbox';
    
        public function handle()
        {
            $outbox = new DoctrineMessageOutbox(app(Connection::class), new MessageOutbox());
            $outbox->processPendingMessages(function (array $messages) {
                foreach ($messages as $message) {
                    // Dispatch to your message bus or queue
                    dispatch(new HandleOutboxEvent($message['payload']));
                    $outbox->markAsPublished($message['id']);
                }
            });
        }
    }
    

Implementation Patterns

1. Transaction-Safe Event Publishing

Pattern: Publish events after database commits by leveraging Doctrine transactions.

use Doctrine\DBAL\Connection;

public function registerUser(User $user)
{
    $connection = app(Connection::class);
    $outbox = new DoctrineMessageOutbox($connection, new MessageOutbox());

    $connection->beginTransaction();
    try {
        // Save user to DB
        $user->save();

        // Publish event *after* commit
        $connection->commit();
        $outbox->publish(
            new UserRegistered($user->id, $user->email),
            new \DateTimeImmutable(),
            $user->id,
            'User'
        );
    } catch (\Exception $e) {
        $connection->rollBack();
        throw $e;
    }
}

2. Event-Driven Workflows with Laravel Queues

Pattern: Combine the outbox with Laravel queues for async processing.

// Publish to outbox
$outbox->publish($event, $occurredOn, $aggregateId, $aggregateType);

// Process outbox in a queue job
ProcessOutboxEvents::dispatch();

3. Event Serialization

Pattern: Use EventSauce’s MessageConverter for consistent serialization.

use EventSauce\EventSauce;
use EventSauce\MessageConverter\JsonMessageConverter;

$converter = new JsonMessageConverter();
$eventSauce = new EventSauce($messageBus, $eventStore, $converter);

// Pass to DoctrineMessageOutbox
$outbox = new DoctrineMessageOutbox($connection, $eventSauce);

4. Aggregate Root Tracking

Pattern: Use aggregate_id and aggregate_type to track events per entity.

// In a repository
$outbox->publish(
    new OrderShipped($order->id),
    new \DateTimeImmutable(),
    $order->id,
    'Order'
);

5. Polling Consumer with Retries

Pattern: Implement a robust polling loop with exponential backoff.

public function handle()
{
    $outbox = new DoctrineMessageOutbox(app(Connection::class), new MessageOutbox());
    $attempts = 0;
    $maxAttempts = 3;

    while ($attempts < $maxAttempts) {
        try {
            $outbox->processPendingMessages(function (array $messages) {
                foreach ($messages as $message) {
                    $this->dispatchToBroker($message['payload']);
                    $outbox->markAsPublished($message['id']);
                }
            });
            break;
        } catch (\Exception $e) {
            $attempts++;
            sleep(2 ** $attempts); // Exponential backoff
        }
    }
}

6. Integration with Laravel Events (Hybrid Approach)

Pattern: Use the outbox alongside Laravel’s event system for flexibility.

// Publish to both outbox and Laravel events
event(new UserRegisteredEvent($user));
$outbox->publish(
    new UserRegistered($user->id, $user->email),
    new \DateTimeImmutable(),
    $user->id,
    'User'
);

Gotchas and Tips

Pitfalls

  1. Transaction Management

    • Gotcha: Forgetting to commit the Doctrine transaction before publishing events.
    • Fix: Ensure publish() is called after commit() in your transaction block.
    • Tip: Use a service layer to abstract this logic.
  2. Event Serialization Mismatches

    • Gotcha: Events serialized differently between publishing and processing.
    • Fix: Use EventSauce’s MessageConverter consistently.
    • Tip: Test serialization/deserialization in unit tests.
  3. Outbox Polling Deadlocks

    • Gotcha: Long-running consumers blocking new events.
    • Fix: Limit batch size (e.g., 100 messages per poll) and use short timeouts.
    • Tip: Use Laravel’s DB::transaction() for atomic processing.
  4. Duplicate Events

    • Gotcha: Events reprocessed after consumer crashes.
    • Fix: Implement idempotency in your event handlers (e.g., check processed_at).
    • Tip: Use aggregate_id + occurred_on as a composite key for deduplication.
  5. Schema Changes

    • Gotcha: Breaking changes to the outbox table schema.
    • Fix: Use migrations with backward-compatible changes (e.g., add nullable columns).
    • Tip: Archive old events before schema changes.
  6. Performance Bottlenecks

    • Gotcha: High event volume causing DB locks.
    • Fix: Batch inserts and use connection pooling.
    • Tip: Monitor message_outbox table size and optimize queries.

Debugging Tips

  1. Check Unprocessed Events

    SELECT * FROM message_outbox WHERE processed_at IS NULL ORDER BY created_at DESC;
    
  2. Log Event Payloads Add logging in the processPendingMessages callback:

    Log::debug('Processing event', ['payload' => $message['payload']]);
    
  3. Simulate Failures Temporarily modify the consumer to fail randomly to test retries:

    if (rand(0, 10) === 0) {
        throw new \RuntimeException('Simulated failure');
    }
    

Extension Points

  1. Custom Message Converter Extend JsonMessageConverter for custom serialization:

    use EventSauce\MessageConverter\MessageConverter;
    
    class CustomMessageConverter implements MessageConverter
    {
        public function toMessage(string $messageType, string $messageBody): Message
        {
            // Custom logic
        }
    }
    
  2. Outbox Table Customization Override the default schema in your migration:

    $table->string('custom_field')->nullable();
    
  3. Event Filtering Filter events before processing:

    $outbox->processPendingMessages(function (array $messages) {
        $filtered = array_filter($messages, fn($m) => strpos($m['message_type'], 'Order') !== false);
        // Process $filtered
    });
    
  4. Async Processing with Queues Dispatch outbox processing to a queue job:

    ProcessOutboxJob::dispatch()->onQueue('events');
    

Laravel-Specific Quirks

  1. Doctrine DBAL in Laravel
    • Gotcha: Laravel’s DB facade doesn’t work directly with Doctrine.
    • Fix: Use doctrine/dbal explicitly:
      $connection = Doctrine\DBAL\DriverManager::getConnection([
          'url' => 'mysql://user:
      
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
nasirkhan/laravel-sharekit
directorytree/privacy-filter-classifier
directorytree/privacy-filter
datacore/hub-sdk
develia/commons
cuci/prototurk-sdk
cuci/prototurk-sdk-symfony
develia/geo-bundle
dreamzy/livewire-charts
touchestate-sdk/php-sdk
22h/doctrine-garbage-collection-bundle
agtp/agtp-php
agtp/mod-php
splash/sonata-admin
splash/metadata
splash/openapi
splash/scopes
splash/toolkit
testo/output-teamcity
testo/bridge-symfony