sulu/messenger
Symfony Messenger add-on for Sulu providing stamps and middlewares to configure the Sulu message bus. Includes UnpackExceptionMiddleware to surface real handler errors and LockMiddleware to prevent concurrent access. Usable standalone in any Symfony app.
Install the Package:
composer require sulu/messenger
Ensure your Laravel app uses Symfony Messenger (via spatie/laravel-messenger or direct symfony/messenger integration).
Register the Bundle (if not using Laravel’s service provider):
Add to config/bundles.php (Symfony-style) or manually bind services in Laravel’s AppServiceProvider:
// config/bundles.php (Symfony)
Sulu\Messenger\Infrastructure\Symfony\HttpKernel\SuluMessengerBundle::class => ['all' => true],
First Use Case: Async Job with Locking Define a message and handler:
// app/Messages/ProcessOrder.php
class ProcessOrder implements MessageInterface {
public function __construct(public string $orderId) {}
}
// app/Handlers/ProcessOrderHandler.php
class ProcessOrderHandler {
public function __invoke(ProcessOrder $message) {
// Business logic here
return new OrderProcessed($message->orderId);
}
}
Dispatch with locking:
use Sulu\Messenger\Infrastructure\Symfony\Messenger\LockMiddleware\LockStamp;
use Symfony\Component\Messenger\Envelope;
$this->bus->dispatch(
new Envelope(new ProcessOrder('order-123'), [
new LockStamp('order-lock-' . $message->orderId, 60.0) // 60s TTL
])
);
Configure Transport (e.g., Doctrine or AMQP):
# config/packages/messenger.yaml (Symfony-style)
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Messages\ProcessOrder': async
LockStamp for critical sections (e.g., inventory updates, payment processing).$this->bus->dispatch(
new Envelope(new UpdateInventory($productId), [
new LockStamp('inventory-' . $productId, 30.0, true) // Auto-release
])
);
LockFactory in a Laravel service:
$lockFactory = $this->container->get('lock.factory');
$lock = $lockFactory->createLock('key', 60.0);
EnableFlushStamp to avoid global transactions.$this->bus->dispatch(
new Envelope(new SaveUserProfile($userId), [
new EnableFlushStamp()
])
);
HandlerFailedException and map to HTTP responses (e.g., 422 for validation errors).try {
$this->bus->dispatch(new ValidateOrder($orderId));
} catch (HandlerFailedException $e) {
$originalException = $e->getOriginalException();
return response()->json(['error' => $originalException->getMessage()], 422);
}
Envelope to model workflows (e.g., order → payment → shipping).$envelope = new Envelope(new ProcessOrder($orderId));
$envelope = $this->bus->dispatch($envelope);
$this->bus->dispatch(new Envelope(new ShipOrder($orderId), $envelope->getStamps()));
// config/messenger.php
'transports' => [
'laravel_queue' => [
'dsn' => 'laravel-queue://',
'options' => [
'queue' => 'default',
],
],
],
HandleMessageMiddleware but before transports:
# config/packages/messenger.yaml
framework:
messenger:
middleware:
- Sulu\Messenger\Infrastructure\Symfony\Messenger\UnpackExceptionMiddleware
- Sulu\Messenger\Infrastructure\Symfony\Messenger\LockMiddleware\LockMiddleware
- Sulu\Messenger\Infrastructure\Symfony\Messenger\FlushMiddleware\DoctrineFlushMiddleware
$bus = $this->createMock(BusInterface::class);
$bus->expects($this->once())
->method('dispatch')
->with($this->isInstanceOf(Envelope::class));
Doctrine Dependency:
DoctrineFlushMiddleware requires Doctrine ORM (not Eloquent). Workaround:
DoctrineFlushMiddleware to support Eloquent:
class EloquentFlushMiddleware implements MiddlewareInterface {
public function handle(Envelope $envelope, HandleTrait $handle) {
if ($envelope->last(EnableFlushStamp::class)) {
DB::commit(); // Laravel's DB facade
}
return $handle($envelope);
}
}
Lock Granularity:
global-lock) can block entire workflows. Use specific keys (e.g., user-123-profile).Exception Unpacking:
UnpackExceptionMiddleware exposes raw exceptions. Ensure your error handlers (e.g., API controllers) can process them:
try {
$this->bus->dispatch(new YourMessage());
} catch (HandlerFailedException $e) {
$error = $e->getOriginalException()->getMessage();
// Log or return $error
}
Middleware Order:
UnpackExceptionMiddleware first to catch handler errors.LockMiddleware before business logic.DoctrineFlushMiddleware last to ensure flushing happens after all operations.PHP 8.2+ Requirement:
Lock Issues:
LockFactory directly to inspect locks:
$lockFactory = $this->container->get('lock.factory');
$lock = $lockFactory->createLock('test-key');
if (!$lock->acquire()) {
// Handle lock failure
}
Doctrine Flush:
EnableFlushStamp is present in the Envelope:
$envelope->last(EnableFlushStamp::class); // Returns stamp or null
// config/packages/doctrine.yaml
doctrine:
orm:
event_listeners:
flush_logger:
class: App\Doctrine\FlushLogger
tags: [doctrine.event_listener]
Performance:
Custom Middleware:
class LogMiddleware implements MiddlewareInterface {
public function handle(Envelope $envelope, HandleTrait $handle) {
Log::info('Message dispatched', ['message' => $envelope->getMessage()]);
return $handle($envelope);
}
}
config/packages/messenger.yaml:
framework:
messenger:
middleware:
- App\Middleware\LogMiddleware
Custom Stamps:
RetryStamp):
class RetryStamp implements StampInterface {
public function __construct(public int $attempts) {}
}
How can I help you explore Laravel packages today?