eventsauce/message-repository-for-doctrine
Doctrine DBAL-backed EventSauce MessageRepository for persisting event streams. Supports configurable table/column schemas (default or legacy) and pluggable UUID encoding (binary or string), plus custom headers/columns for indexing and storage optimization.
Install the Package
composer require eventsauce/message-repository-for-doctrine
Configure Doctrine DBAL Connection
Ensure you have a Doctrine DBAL connection configured in your Laravel app (e.g., via config/database.php or a custom connection).
Define the Message Repository
use EventSauce\MessageRepository\DoctrineMessageRepository\DoctrineUuidV4MessageRepository;
use EventSauce\MessageRepository\TableSchema\DefaultTableSchema;
use EventSauce\UuidEncoding\BinaryUuidEncoder;
use EventSauce\EventSauce;
// Initialize the repository
$messageRepository = new DoctrineUuidV4MessageRepository(
connection: $doctrineDbalConnection,
tableName: 'event_store',
serializer: EventSauce::serializer(),
tableSchema: new DefaultTableSchema(),
uuidEncoder: new BinaryUuidEncoder(),
);
Create the Database Table
Run a migration to create the table matching the schema (e.g., event_id, aggregate_root_id, version, payload).
Example migration:
Schema::create('event_store', function (Blueprint $table) {
$table->binaryUuid('event_id')->primary();
$table->binaryUuid('aggregate_root_id');
$table->unsignedInteger('version');
$table->text('payload');
$table->timestamps(); // Optional: for debugging or auditing
});
First Use Case: Storing Events
$event = new SomeEvent('data');
$messageRepository->store($event, new UuidV4(), 1);
Event Storage Store events for an aggregate root:
$messageRepository->store($event, $aggregateRootId, $version);
Loading Events Load events for an aggregate root:
$events = $messageRepository->load($aggregateRootId);
Appending Events Append events to an aggregate root (atomic operation):
$messageRepository->append($aggregateRootId, $events);
Custom Table Schema
Extend DefaultTableSchema for custom columns:
use EventSauce\MessageRepository\TableSchema\TableSchema;
class CustomTableSchema extends TableSchema {
public function additionalColumns(): array {
return [
new Header('metadata', 'json'),
];
}
}
Service Provider Binding Bind the repository in a service provider:
$this->app->bind(DoctrineUuidV4MessageRepository::class, function ($app) {
return new DoctrineUuidV4MessageRepository(
$app->make(Connection::class),
'event_store',
EventSauce::serializer(),
new DefaultTableSchema(),
new BinaryUuidEncoder(),
);
});
Event Sourcing Pattern Use the repository in an event-sourced aggregate root:
class Order {
private $events = [];
public function apply(OrderEvent $event) {
$this->events[] = $event;
// Apply event logic
}
public function getUncommittedEvents() {
return $this->events;
}
public function persist() {
$messageRepository->append($this->id, $this->getUncommittedEvents());
$this->events = [];
}
}
Transaction Management Wrap repository operations in a DBAL transaction:
$connection->beginTransaction();
try {
$messageRepository->append($aggregateRootId, $events);
$connection->commit();
} catch (\Exception $e) {
$connection->rollBack();
throw $e;
}
Event Deserialization Customize deserialization by extending the serializer or using middleware:
$serializer = EventSauce::serializer();
$serializer->addMiddleware(new CustomDeserializationMiddleware());
UUID Type Mismatch
uuid type, use BinaryUuidEncoder to avoid storage issues.UuidStringEncoder, ensure the database column is text or varchar with sufficient length (e.g., 36 for UUIDv4).Table Schema Mismatch
aggregate_root_version vs. version) will cause queries to fail.TableSchema implementation.Concurrency Issues
version checks in your application logic:
$currentVersion = $messageRepository->getVersion($aggregateRootId);
if ($currentVersion !== $expectedVersion) {
throw new ConcurrencyException();
}
Performance with Large Event Streams
Serialization Errors
EventSauce\EventSauce interfaces (e.g., EventSauce\EventSauce).Enable Doctrine DBAL Logging Configure DBAL to log SQL queries for debugging:
$connection->getConfiguration()->setSQLLogger(new \Doctrine\DBAL\Logging\EchoSQLLogger());
Check for Silent Failures
$schemaManager = $connection->createSchemaManager();
if (!$schemaManager->tablesExist(['event_store'])) {
throw new \RuntimeException('Event store table does not exist');
}
UUID Generation
UuidV4 is used consistently for event_id and aggregate_root_id to avoid type conflicts.Custom Headers
Add metadata to events via additionalColumns in a custom TableSchema:
class MetadataTableSchema extends TableSchema {
public function additionalColumns(): array {
return [
new Header('created_at', 'datetime'),
new Header('user_id', 'text'),
];
}
}
Event Filtering Extend the repository to filter events by metadata:
class FilteredMessageRepository extends DoctrineUuidV4MessageRepository {
public function loadWithMetadata($aggregateRootId, $metadata) {
$query = $this->createSelectQuery($aggregateRootId);
$query->andWhere('metadata = :metadata');
$query->setParameter('metadata', json_encode($metadata));
return $this->loadFromQuery($query);
}
}
Bulk Operations Optimize bulk inserts by batching events:
$connection->beginTransaction();
foreach ($events in array_chunk($allEvents, 100)) {
$messageRepository->append($aggregateRootId, $events);
}
$connection->commit();
Read Replicas For read-heavy workloads, configure a read replica connection:
$readConnection = $connection->getWrappedConnection()->getReadConnection();
$readRepository = new DoctrineUuidV4MessageRepository(
$readConnection,
'event_store',
$serializer,
$tableSchema,
$uuidEncoder,
);
Event Projections Use the repository to build projections (e.g., materialized views):
$events = $messageRepository->load($aggregateRootId);
$projection = new OrderProjection();
foreach ($events as $event) {
$projection->apply($event);
}
How can I help you explore Laravel packages today?