## Getting Started
### First Steps
1. **Installation**
```bash
composer require prooph/pdo-event-store:^1.16.5
Ensure pdo and pdo_mysql (or your preferred DB driver) are installed via PECL.
Basic Setup
Register the service provider in config/app.php:
Prooph\EventStore\PDO\EventStorePDO::class,
Publish the config (optional):
php artisan vendor:publish --provider="Prooph\EventStore\PDO\EventStorePDO"
First Use Case: Storing an Event
use Prooph\EventStore\EventStore;
use Prooph\EventStore\PDO\EventStorePDO;
$eventStore = new EventStorePDO(
new PDO('mysql:host=localhost;dbname=test', 'user', 'pass'),
new \Prooph\EventStore\Uuid\Uuid()
);
$event = new \Prooph\EventStore\TestEvent([
'message' => 'Hello World'
]);
$eventStore->store([
new \Prooph\EventStore\StreamName('my-stream'),
$event
]);
New Feature: Gap Detection in Projections
The PdoEventStoreReadModelProjector now implements GapDetector for seamless gap handling during read model projections. This is critical for maintaining data consistency when reprocessing streams.
Leverage the new GapDetector capability in PdoEventStoreReadModelProjector:
use Prooph\EventStore\PDO\PdoEventStoreReadModelProjector;
use Prooph\EventStore\Projection\GapHandler;
$projector = new PdoEventStoreReadModelProjector(
$pdo,
$eventStore,
new \Prooph\EventStore\Uuid\Uuid(),
new \Prooph\EventStore\Serializer\JsonSerializer()
);
// Set a custom gap handler
$projector->setGapHandler(new class implements GapHandler {
public function handleGap($streamName, $expectedPosition, $actualPosition) {
Log::warning("Gap detected in {$streamName}: expected {$expectedPosition}, got {$actualPosition}");
// Optionally: Trigger reprocessing or notify stakeholders
}
});
$projector->run([new \Prooph\EventStore\Criteria\StreamName('user-activity')]);
Append events to a stream atomically (unchanged):
$streamName = new \Prooph\EventStore\StreamName('order-' . $orderId);
$eventStore->appendToStream($streamName, [
new OrderCreated($orderId, $customerId),
new PaymentProcessed($orderId, $amount)
]);
Mark events as deleted (unchanged):
$eventStore->delete(
new \Prooph\EventStore\StreamName('my-stream'),
0, // position
1 // count
);
Combine with prooph/service-bus for CQRS workflows (unchanged):
$eventStore = new EventStorePDO($pdo, $uuid);
$commandBus = new \Prooph\ServiceBus\CommandBus(
new \Prooph\ServiceBus\Plugin\RouterPlugin($router),
new \Prooph\ServiceBus\Plugin\EventStorePlugin($eventStore)
);
GapHandler for read models to ensure graceful degradation when gaps occur.LogGapHandler for debugging:
$projector->setGapHandler(new \Prooph\EventStore\Projection\LogGapHandler());
$projector->run([new \Prooph\EventStore\Criteria\StreamName('large-stream')], 100);
Prooph\EventStore\Uuid\Uuid for UUID generation.Ramsey\Uuid\Uuid or Symfony\Uid\Uuid with an adapter if needed:
$uuid = new \Prooph\EventStore\Uuid\Uuid(
new \Ramsey\Uuid\UuidFactory()
);
$pdo->beginTransaction();
try {
$eventStore->appendToStream($streamName, [$event]);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
throw $e;
}
strtolower()) to avoid issues.max_allowed_packet limit (default: 4MB).PdoEventStoreReadModelProjector now detects gaps in event streams. Ensure your GapHandler is implemented to handle interruptions gracefully.$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
Use Laravel’s query logging or PDO’s PDO::ATTR_STATEMENT_CLASS:
$pdo->setAttribute(
PDO::ATTR_STATEMENT_CLASS,
[\Prooph\EventStore\PDO\DebugStatement::class]
);
GapHandler to track gaps during projection:
class CustomGapHandler implements GapHandler
{
public function handleGap($streamName, $expectedPosition, $actualPosition)
{
Log::debug("Gap detected: Stream {$streamName}, Expected {$expectedPosition}, Actual {$actualPosition}");
}
}
Override the default JSON serializer:
$eventStore = new EventStorePDO(
$pdo,
$uuid,
new \Prooph\EventStore\Serializer\JsonSerializer()
);
Or implement Prooph\EventStore\Serializer\EventSerializer.
Implement Prooph\EventStore\Projection\GapHandler to customize gap detection behavior:
class CustomGapHandler implements GapHandler
{
public function handleGap($streamName, $expectedPosition, $actualPosition)
{
// Custom logic: e.g., trigger reprocessing or notify stakeholders
event(new EventStreamGapDetected($streamName, $expectedPosition, $actualPosition));
}
}
$projector->setGapHandler(new CustomGapHandler());
Add custom metadata to events:
$event = new \Prooph\EventStore\TestEvent(['message' => 'Hello']);
$event->setMetadata(['user_id' => 123, 'processed_at' => now()->toIso8601String()]);
Use Prooph’s middleware system to intercept events:
$eventStore = new EventStorePDO($pdo, $uuid);
$eventStore->addPlugin(new \Prooph\EventStore\Plugin\LoggingPlugin());
Use appendToStream() for multiple events in a single transaction.
Ensure stream_name and position are indexed:
CREATE INDEX idx_stream_position ON event_store_event (stream_name, position);
Reuse PDO connections to avoid overhead:
$pdo = new PDO('mysql:host=localhost;dbname=test', 'user', 'pass');
$pdo->setAttribute(PDO::ATTR_PERSISTENT, true);
GapDetector to skip known gaps, reducing unnecessary reprocessing.How can I help you explore Laravel packages today?