eventsauce/message-outbox-for-doctrine
Install Dependencies
composer require doctrine/dbal eventsauce/eventsauce eventsauce/message-outbox-for-doctrine
Set Up Database Schema
Run the provided migration or define the message_outbox table manually:
Schema::create('message_outbox', function (Blueprint $table) {
$table->id();
$table->string('message_type');
$table->text('message_body');
$table->string('aggregate_id');
$table->string('aggregate_type');
$table->timestamp('occurred_on');
$table->timestamp('created_at')->useCurrent();
$table->timestamp('processed_at')->nullable();
$table->index(['aggregate_id', 'aggregate_type']);
$table->index(['processed_at']);
});
First Use Case: Publishing an Event
use EventSauce\MessageOutbox\MessageOutbox;
use EventSauce\MessageOutboxForDoctrine\DoctrineMessageOutbox;
use Doctrine\DBAL\Connection;
// In a service or repository
$connection = app(Connection::class);
$outbox = new DoctrineMessageOutbox($connection, new MessageOutbox());
// Publish an event (e.g., after saving an entity)
$outbox->publish(
new UserRegistered('user-123', 'user@example.com'),
new \DateTimeImmutable(),
'user-123',
'User'
);
Process Outbox Events Create a Laravel command to poll and process pending events:
use Illuminate\Console\Command;
use EventSauce\MessageOutbox\MessageOutbox;
class ProcessOutboxEvents extends Command
{
protected $signature = 'events:process-outbox';
protected $description = 'Process pending events from the outbox';
public function handle()
{
$outbox = new DoctrineMessageOutbox(app(Connection::class), new MessageOutbox());
$outbox->processPendingMessages(function (array $messages) {
foreach ($messages as $message) {
// Dispatch to your message bus or queue
dispatch(new HandleOutboxEvent($message['payload']));
$outbox->markAsPublished($message['id']);
}
});
}
}
Pattern: Publish events after database commits by leveraging Doctrine transactions.
use Doctrine\DBAL\Connection;
public function registerUser(User $user)
{
$connection = app(Connection::class);
$outbox = new DoctrineMessageOutbox($connection, new MessageOutbox());
$connection->beginTransaction();
try {
// Save user to DB
$user->save();
// Publish event *after* commit
$connection->commit();
$outbox->publish(
new UserRegistered($user->id, $user->email),
new \DateTimeImmutable(),
$user->id,
'User'
);
} catch (\Exception $e) {
$connection->rollBack();
throw $e;
}
}
Pattern: Combine the outbox with Laravel queues for async processing.
// Publish to outbox
$outbox->publish($event, $occurredOn, $aggregateId, $aggregateType);
// Process outbox in a queue job
ProcessOutboxEvents::dispatch();
Pattern: Use EventSauce’s MessageConverter for consistent serialization.
use EventSauce\EventSauce;
use EventSauce\MessageConverter\JsonMessageConverter;
$converter = new JsonMessageConverter();
$eventSauce = new EventSauce($messageBus, $eventStore, $converter);
// Pass to DoctrineMessageOutbox
$outbox = new DoctrineMessageOutbox($connection, $eventSauce);
Pattern: Use aggregate_id and aggregate_type to track events per entity.
// In a repository
$outbox->publish(
new OrderShipped($order->id),
new \DateTimeImmutable(),
$order->id,
'Order'
);
Pattern: Implement a robust polling loop with exponential backoff.
public function handle()
{
$outbox = new DoctrineMessageOutbox(app(Connection::class), new MessageOutbox());
$attempts = 0;
$maxAttempts = 3;
while ($attempts < $maxAttempts) {
try {
$outbox->processPendingMessages(function (array $messages) {
foreach ($messages as $message) {
$this->dispatchToBroker($message['payload']);
$outbox->markAsPublished($message['id']);
}
});
break;
} catch (\Exception $e) {
$attempts++;
sleep(2 ** $attempts); // Exponential backoff
}
}
}
Pattern: Use the outbox alongside Laravel’s event system for flexibility.
// Publish to both outbox and Laravel events
event(new UserRegisteredEvent($user));
$outbox->publish(
new UserRegistered($user->id, $user->email),
new \DateTimeImmutable(),
$user->id,
'User'
);
Transaction Management
publish() is called after commit() in your transaction block.Event Serialization Mismatches
MessageConverter consistently.Outbox Polling Deadlocks
DB::transaction() for atomic processing.Duplicate Events
processed_at).aggregate_id + occurred_on as a composite key for deduplication.Schema Changes
Performance Bottlenecks
message_outbox table size and optimize queries.Check Unprocessed Events
SELECT * FROM message_outbox WHERE processed_at IS NULL ORDER BY created_at DESC;
Log Event Payloads
Add logging in the processPendingMessages callback:
Log::debug('Processing event', ['payload' => $message['payload']]);
Simulate Failures Temporarily modify the consumer to fail randomly to test retries:
if (rand(0, 10) === 0) {
throw new \RuntimeException('Simulated failure');
}
Custom Message Converter
Extend JsonMessageConverter for custom serialization:
use EventSauce\MessageConverter\MessageConverter;
class CustomMessageConverter implements MessageConverter
{
public function toMessage(string $messageType, string $messageBody): Message
{
// Custom logic
}
}
Outbox Table Customization Override the default schema in your migration:
$table->string('custom_field')->nullable();
Event Filtering Filter events before processing:
$outbox->processPendingMessages(function (array $messages) {
$filtered = array_filter($messages, fn($m) => strpos($m['message_type'], 'Order') !== false);
// Process $filtered
});
Async Processing with Queues Dispatch outbox processing to a queue job:
ProcessOutboxJob::dispatch()->onQueue('events');
DB facade doesn’t work directly with Doctrine.doctrine/dbal explicitly:
$connection = Doctrine\DBAL\DriverManager::getConnection([
'url' => 'mysql://user:
How can I help you explore Laravel packages today?