becklyn/eventor-symfony
Minimal pub/sub abstraction for Symfony, with built-in support for Dapr’s Pub/Sub API. Configure via env vars and publish typed messages to topics, then register handlers and expose simple subscription and topic endpoints through a controller/registry.
Installation:
composer require becklyn/eventor-symfony
Ensure symfony/framework-bundle (v6.1+) is installed (required dependency).
Configure Environment:
Add these to your .env:
DAPR_HOST=http://localhost:3500 # Dapr endpoint (required)
DAPR_PUBSUB=pubsubname # Pub/sub name (required)
First Use Case: Publish an Event
Define a message class (e.g., app/Domain/Events/Message.php):
class Message {
public function __construct(
public string $id,
public string $body,
) {}
}
Publish it in a service:
use Becklyn\Eventor\Publisher;
class OrderService {
public function __construct(private Publisher $publisher) {}
public function createOrder() {
$this->publisher->publish("orders.created", new Message(
id: "123",
body: "Order #123 created",
));
}
}
First Use Case: Subscribe to an Event
Register a subscriber in a controller (e.g., src/Controller/DaprSubscriptionController.php):
use Becklyn\Eventor\DaprSubscriptionRegistry;
use Becklyn\Eventor\On;
class DaprSubscriptionController {
public function __construct(private DaprSubscriptionRegistry $registry) {}
public function __invoke() {
new On(
fn(Message $msg) => logger()->info($msg->body),
$this->registry,
"orders.created"
);
}
}
Expose Dapr Endpoints:
Add routes in config/routes.yaml:
dapr_subscribe:
path: /dapr/subscribe
controller: Becklyn\Eventor\DaprSubscriptionController
methods: [GET]
dapr_topic:
path: /dapr/{pubsub}/orders.created
controller: Becklyn\Eventor\DaprSubscriptionController
methods: [POST]
Event Publishing:
Publisher to emit events with a topic and payload.$this->publisher->publish("user.updated", new UserUpdatedEvent($userId, $changes));
EventDispatcher or EventBus pattern).Subscription Management:
On in a controller or service.new On(
fn(UserUpdatedEvent $event) => $this->notifyUser($event),
$this->registry,
"user.updated"
);
UserEventSubscriber).Dependency Injection:
Publisher and DaprSubscriptionRegistry into services/controllers.class AnalyticsService {
public function __construct(
private Publisher $publisher,
private DaprSubscriptionRegistry $registry
) {}
}
Error Handling:
new On(
fn(UserUpdatedEvent $event) => $this->handleUpdate($event),
$this->registry,
"user.updated"
);
// Handle exceptions at the controller level or wrap in a try-catch.
Symfony Serialization:
Serializer for payload serialization.#[SerializedName] or implement __serialize()).OpenTelemetry:
symfony/otel) separately.Testing:
Publisher: Use Mockery or PHPUnit to stub publish() calls.DaprSubscriptionRegistry in tests to verify event handling.$mockPublisher = Mockery::mock(Publisher::class);
$mockPublisher->shouldReceive('publish')->once();
$this->app->instance(Publisher::class, $mockPublisher);
Dynamic Topics:
user.{id}.updated), use a factory or service locator to register subscribers.Exception Handling:
try-catch or handle errors at the controller level.
new On(
fn($event) => try { $this->handle($event); } catch (\Exception $e) { logger()->error($e); },
$this->registry,
"topic"
);
Dapr Configuration:
DAPR_HOST or DAPR_PUBSUB in .env will cause silent failures.if (!getenv('DAPR_HOST')) {
throw new \RuntimeException('DAPR_HOST is not configured.');
}
Serialization Issues:
#[SerializedName] or implement __serialize()/__unserialize():
class UserEvent {
#[SerializedName('user_id')]
public string $id;
}
Route Conflicts:
/dapr/{pubsub}/{topic}). Conflicts with other routes may occur._dapr prefix or route priorities:
_dapr:
resource: "@BecklynEventorDaprSubscriptionController"
path: /_dapr/{pubsub}/{topic}
Dependency Versioning:
symfony/framework-bundle:^6.1. Downgrading may break functionality.composer.json if using older Symfony.Log Subscriptions:
Becklyn\Eventor to trace subscription calls:
// config/packages/monolog.yaml
handlers:
main:
level: debug
channels: ["eventor"]
Dapr HTTP Errors:
http://localhost:3500/v1.0/log) for pub/sub failures.Content-Type: application/json header in Dapr requests.Payload Inspection:
var_dump() or logger() in subscribers to inspect incoming events:
new On(
fn($event) => logger()->debug('Received:', $event),
$this->registry,
"topic"
);
Custom Brokers:
Publisher or SubscriptionRegistry to support other brokers (e.g., RabbitMQ, Kafka).class RabbitMqPublisher implements PublisherInterface {
public function publish(string $topic, mixed $payload): void {
// Custom RabbitMQ logic
}
}
Middleware:
Publisher or On handlers:
$publisher = new MiddlewarePublisher(
new Publisher(),
fn($topic, $payload) => logger()->info("Publishing to $topic")
);
Dynamic Subscriptions:
$subscriber = new DynamicSubscriber($this->registry);
$subscriber->registerFromConfig($config);
Event Validation:
use Symfony\Component\Validator\Validator\ValidatorInterface;
class ValidatingPublisher implements PublisherInterface {
public function __construct(
private Publisher $decorated,
private ValidatorInterface $validator
) {}
public function publish(string $topic, mixed $payload): void {
$errors = $this->validator->validate($payload);
if ($errors->count() > 0) {
throw new \InvalidArgumentException('Validation failed');
}
$this->decorated->publish($topic
How can I help you explore Laravel packages today?