prooph/event-store
Common interfaces and classes for Prooph Event Store implementations. Provides the core building blocks to work with event stores, with persistent implementations available via separate TCP and HTTP client packages. Supports PHP 7.4+ (v7).
Installation
composer require prooph/event-store
Ensure your project meets PHP 8.0+ requirements (updated from PHP 7.4+).
Basic Event Store Initialization
use Prooph\EventStore\EventStore;
use Prooph\EventStore\StreamName;
$eventStore = new EventStore(
new \Prooph\EventStore\Pdo\MySql\EventStoreConnection(
new \PDO('mysql:host=localhost;dbname=event_store', 'user', 'pass')
)
);
First Use Case: Appending an Event
$event = new \Prooph\EventStore\Model\Event(
uuid('event-uuid'),
'my-aggregate',
'EventOccurred',
['data' => 'payload']
);
$eventStore->appendToStream(
StreamName::fromString('my-stream'),
[$event]
);
Where to Look First
src/Prooph/EventStore/ for core classes.tests/ for usage examples (note: abstract test classes have been renamed due to PHPUnit compatibility changes in v7.12.3).class MyAggregate {
private $events = [];
public function apply(Event $event) {
$this->events[] = $event;
// Apply event logic (e.g., update state)
}
public function recordThat(EventOccurred $event) {
$this->apply($event);
return $event;
}
}
// Usage:
$aggregate = new MyAggregate();
$event = $aggregate->recordThat(new EventOccurred('payload'));
$eventStore->appendToStream($streamName, [$event]);
$events = $eventStore->load(
StreamName::fromString('my-stream'),
0, // from version
10 // limit
);
foreach ($events as $event) {
// Process event (e.g., update a read model)
}
// Take a snapshot
$snapshot = new \Prooph\EventStore\Model\Snapshot(
uuid('snapshot-uuid'),
'my-aggregate',
10, // version
['state' => 'serialized']
);
$eventStore->appendToStream($streamName, [$snapshot]);
// Load snapshot
$snapshot = $eventStore->loadSnapshot(
StreamName::fromString('my-stream'),
10
);
use Prooph\ServiceBus\EventBus;
use Prooph\EventStore\Plugin\AggregateTranslatorPlugin;
$eventBus = new EventBus();
$eventBus->plug(new AggregateTranslatorPlugin($eventStore));
{aggregateId}-{aggregateType}).occurredOn in events for time-based queries.appendToStream in transactions or retry logic for idempotency.EventStore for unit tests.Prooph\EventStore\Pdo\Sqlite\EventStoreConnection) for integration tests.Concurrency Conflicts
expectedVersion to handle conflicts:
$eventStore->appendToStream(
$streamName,
[$event],
5 // expectedVersion (throws if stream is ahead)
);
Performance with Large Streams
loadFrom() with version ranges or paginate:
$events = $eventStore->loadFrom(
$streamName,
0,
100 // batch size
);
Metadata Quirks
headers) is stored as JSON. Ensure serialization/deserialization is consistent:
$event = new Event(
uuid(),
'stream',
'Event',
['data' => 'payload'],
['custom' => json_encode(['key' => 'value'])] // Avoid nested JSON
);
Snapshot Gaps
$snapshot = $eventStore->loadSnapshot($streamName, $version);
if (!$snapshot) {
// Rebuild from events
}
PHPUnit Test Compatibility
Prooph\EventStore\Test\AbstractTest).$pdo = new \PDO('mysql:host=localhost;dbname=event_store', 'user', 'pass');
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(\PDO::ATTR_EMULATE_PREPARES, false);
streamExists() before appending to avoid errors:
if (!$eventStore->streamExists($streamName)) {
$eventStore->createStream($streamName);
}
prooph/event-store-pdo).Custom Event Stores
Implement Prooph\EventStore\EventStoreInterface for non-PDO backends (e.g., Redis, Elasticsearch):
class CustomEventStore implements EventStoreInterface {
public function appendToStream(StreamName $streamName, array $events, ?int $expectedVersion = null) {
// Custom logic
}
// ... other methods
}
Plugins
Extend functionality with plugins (e.g., AggregateTranslatorPlugin for service bus integration). Example:
class MyPlugin implements PluginInterface {
public function handle(Envelope $command) {
// Custom command handling
}
}
Event Serialization Override the default JSON serializer for custom formats (e.g., MessagePack):
$eventStore->setSerializer(new \Prooph\EventStore\Serializer\MessagePackSerializer());
Middleware Use middleware for cross-cutting concerns (e.g., logging, validation):
$eventStore->plug(new class implements PluginInterface {
public function handle(Envelope $command) {
// Pre/post processing
}
});
How can I help you explore Laravel packages today?