ddd-module/broadway
Broadway provides infrastructure and testing helpers for building CQRS and event-sourced PHP applications. It offers loosely coupled components for command handling, event storage, and projection workflows, designed to stay out of your way and be used together or separately.
Install the Laravel Package:
composer require nwidart/laravel-broadway
Publish the config:
php artisan vendor:publish --provider="Nwidart\Broadway\BroadwayServiceProvider"
Configure Event Store (config/broadway.php):
'event_store' => [
'connection' => 'mysql',
'table' => 'broadway_events',
],
Run Migrations:
php artisan migrate
First Use Case: Event-Sourced Aggregate
Define an aggregate root (e.g., app/Domain/Invoice.php):
use Broadway\Domain\DomainMessage;
use Broadway\Domain\AggregateRoot;
class Invoice extends AggregateRoot
{
public function createInvoice(string $invoiceId, float $amount)
{
$this->recordThat(new InvoiceCreated($invoiceId, $amount));
}
protected function apply(InvoiceCreated $event)
{
$this->invoiceId = $event->invoiceId;
$this->amount = $event->amount;
}
}
Register the aggregate in config/broadway.php:
'aggregates' => [
'App\\Domain\\Invoice' => [
'repository' => 'App\\Domain\\InvoiceRepository',
],
],
Dispatch Commands:
$commandBus = app('broadway.command_bus');
$commandBus->dispatch(new CreateInvoiceCommand('INV-123', 100.0));
Handle Commands (via SimpleCommandHandler):
use Broadway\CommandHandling\SimpleCommandHandler;
class CreateInvoiceHandler extends SimpleCommandHandler
{
public function handleCreateInvoiceCommand(CreateInvoiceCommand $command)
{
$invoice = new Invoice();
$invoice->createInvoice($command->getInvoiceId(), $command->getAmount());
$invoiceRepository = app('broadway.repository.invoice');
$invoiceRepository->save($invoice);
}
}
Register Handlers (via service provider):
$this->app->bind('broadway.command_handler.create_invoice', function () {
return new CreateInvoiceHandler();
});
Project Events to Read Models:
use Broadway\ReadModel\Projector;
class InvoiceProjector implements Projector
{
public function project(DomainMessage $domainMessage)
{
if (!$domainMessage instanceof InvoiceCreated) {
return;
}
$readModel = new InvoiceReadModel(
$domainMessage->invoiceId,
$domainMessage->amount
);
$readModelRepository = app('broadway.read_model.invoice');
$readModelRepository->save($readModel);
}
}
Register Projector:
$this->app->bind('broadway.projector.invoice', function () {
return new InvoiceProjector();
});
Scenario-Based Tests:
use Broadway\CommandHandling\ScenarioTestCase;
class InvoiceTest extends ScenarioTestCase
{
protected function provideScenarios()
{
return [
'create_invoice' => [
new CreateInvoiceCommand('INV-123', 100.0),
[new InvoiceCreated('INV-123', 100.0)],
],
];
}
}
Mock Event Store for Tests:
$eventStore = new InMemoryEventStore();
$this->setEventStore($eventStore);
Process Events Asynchronously:
use Broadway\Processor\EventProcessingResult;
use Broadway\Processor\SimpleEventProcessor;
class InvoiceEventProcessor extends SimpleEventProcessor
{
public function process(DomainMessage $domainMessage): EventProcessingResult
{
if ($domainMessage instanceof InvoiceCreated) {
// Async logic (e.g., send email, update external API)
return new EventProcessingResult();
}
return new EventProcessingResult();
}
}
Register Processor:
$this->app->bind('broadway.processor.invoice', function () {
return new InvoiceEventProcessor();
});
Playhead Management:
DuplicatePlayheadException when saving aggregates to avoid race conditions.loadFromPlayhead() for optimistic locking:
$invoice = $invoiceRepository->loadFromPlayhead($invoiceId, $playhead);
Database Schema:
broadway:event-store:schema:create command creates a table with playhead, aggregate_root_id, aggregate_type, recorded_on, and payload.payload column supports JSON serialization (e.g., LONGTEXT in MySQL).Custom Serializers:
JsonSerializable or use the ReflectionSerializer:
$serializer = new ReflectionSerializer();
$serializer->serialize($event);
Avoid Circular References:
EventA references EventB which references EventA) will fail serialization. Use DTOs or break cycles manually.Handler Naming Convention:
handle<CommandName> (e.g., handleCreateInvoiceCommand). Forgetting the handle prefix will cause the handler to be ignored.Dependency Injection:
public function __construct(private InvoiceRepository $invoiceRepository) {}
Reset Event Store Between Tests:
setUp():
protected function setUp(): void
{
parent::setUp();
$this->eventStore = new InMemoryEventStore();
}
Verify Projections:
ReadModelTestCase to assert read model state:
$this->assertReadModelEquals(
new InvoiceReadModel('INV-123', 100.0),
$this->readModelRepository->find('INV-123')
);
Batch Processing:
Broadway\Processor\BatchEventProcessor to process events in batches.Snapshot Aggregates:
$repository = new SnapshottingAggregateRepository(
$eventStore,
new InMemorySnapshotStore(),
$serializer
);
Enable Auditing:
CommandLogger:
$this->app->bind('broadway.event_listener.command_logger', function () {
return new CommandLogger(app('logger'));
});
Inspect Event Streams:
EventStoreManagement interface to query events:
$events = $eventStore->load($aggregateId);
Service Provider Binding:
$this->app->bind('broadway.repository.invoice', function () {
return new EventSourcingAggregateRepository(
app('broadway.event_store'),
Invoice::class,
app('broadway.serializer')
);
});
Artisan Commands:
php artisan broadway:event-store:schema:create
php artisan broadway:event-store:replay-projections
Custom Event Store:
Broadway\EventStore\EventStore for non-DBAL backends (e.g., Redis, MongoDB).Custom Serializer:
Broadway\Serializer\Serializer for domain-specific needs:
class CustomSerializer implements Serializer
{
public function serialize($object): string
{
return json_encode($object, JSON_UNESCAPED_SLASHES);
}
public function deserialize(string $serialized): object
{
return json_decode($serialized, true);
}
}
How can I help you explore Laravel packages today?