cedricziel/messenger-pubsub
Symfony Messenger transport bridge for Google Cloud Pub/Sub. Adds a pubsub:// transport via PubSubTransportFactory for publishing and consuming messages from Pub/Sub. Note: Pub/Sub doesn’t support delayed delivery, so DelayStamp is ignored.
Install the Package
composer require cedricziel/messenger-pubsub
For Symfony Framework, prefer the bundle:
composer require cedricziel/symfony-messenger-pubsub-bundle
Configure the Transport Factory
Register the factory in config/services.yaml:
services:
CedricZiel\Symfony\Messenger\Bridge\GcpPubSub\Transport\PubSubTransportFactory:
tags: [messenger.transport_factory]
Define a Pub/Sub Transport
Add a transport in config/packages/messenger.yaml:
framework:
messenger:
transports:
my-pubsub: 'pubsub://default/project-id/topic-name'
Publish a Test Message Use the Messenger component to send a message:
use Symfony\Component\Messenger\MessageBusInterface;
$bus->dispatch(new MyMessage());
Publish a message to trigger a background job:
// config/packages/messenger.yaml
framework:
messenger:
transports:
async_tasks: 'pubsub://default/my-project/tasks-topic'
routing:
'App\Message\ProcessTask': async_tasks
Dispatch a message:
$bus->dispatch(new ProcessTask($taskId));
Message Routing Route messages to Pub/Sub for async processing:
# config/packages/messenger.yaml
framework:
messenger:
transports:
pubsub_queue: 'pubsub://default/my-project/queue-topic'
routing:
'App\Message\QueueableMessage': pubsub_queue
Handling Subscriptions Subscribe to a topic programmatically:
use CedricZiel\Symfony\Messenger\Bridge\GcpPubSub\Transport\PubSubTransport;
$transport = $container->get('messenger.transport.pubsub_queue');
$subscription = $transport->getSubscription('my-subscription');
Batch Processing
Use PubSubTransport to pull messages in batches:
$messages = $transport->getMessages(10); // Fetch 10 messages
foreach ($messages as $message) {
$bus->dispatch($message->body());
$transport->acknowledge($message);
}
Symfony Workflow Integration
Combine with symfony/workflow for stateful message processing:
use Symfony\Component\Workflow\WorkflowInterface;
$workflow = $container->get('workflow.task_workflow');
$transition = $workflow->apply($message, 'process');
Retry Logic Implement retry logic for failed messages:
try {
$bus->dispatch($message);
} catch (\Throwable $e) {
$transport->nacknowledge($message);
// Log or trigger a retry workflow
}
Dead Letter Queues Configure a dead-letter topic for failed messages:
# config/packages/messenger.yaml
framework:
messenger:
transports:
pubsub_dlq: 'pubsub://default/my-project/dlq-topic'
failure_transport: pubsub_dlq
No Delay Support
Google Cloud Pub/Sub does not support DelayStamp. Workaround:
DelayStamp handler that stores messages in a database with a scheduled timestamp.Acknowledgment Deadlines
Messages must be acknowledged within the subscription’s ack_deadline_seconds (default: 10s). For long-running tasks:
$transport->modifyAckDeadline($message, 30); // Extend to 30s
Idempotency Pub/Sub does not guarantee message ordering or uniqueness. Use:
Subscription Management Subscriptions must be created manually (via GCP Console, SDK, or this package). The bridge does not auto-create them.
Check Subscription State Verify subscriptions exist in GCP Console or via SDK:
gcloud pubsub subscriptions list --topic=projects/my-project/topics/my-topic
Log Unacknowledged Messages Enable debug logging for unacknowledged messages:
# config/packages/messenger.yaml
framework:
messenger:
transports:
my-pubsub:
dsn: 'pubsub://default/my-project/topic-name'
options:
logger: true
Test Locally Use the Emulator for local testing:
# config/packages/messenger.yaml
framework:
messenger:
transports:
pubsub_local:
dsn: 'pubsub://default/project-id/topic-name?emulator=true'
Custom Message Serializer Override the default serializer (e.g., for custom message formats):
# config/services.yaml
services:
CedricZiel\Symfony\Messenger\Bridge\GcpPubSub\Serializer\PubSubSerializer:
arguments:
$format: 'json' # or 'xml', 'custom'
Middleware for Pub/Sub Add middleware to transform messages before/after Pub/Sub:
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class PubSubMiddleware implements MiddlewareInterface {
public function handle(Envelope $envelope, HandleMiddleware $next): Envelope {
$message = $envelope->getMessage();
// Modify message before sending
return $next->handle($envelope);
}
}
Event Listeners Listen to Pub/Sub events (e.g., message published, subscription created):
use CedricZiel\Symfony\Messenger\Bridge\GcpPubSub\Event\PubSubEvents;
$dispatcher->addListener(PubSubEvents::MESSAGE_PUBLISHED, function ($event) {
// Log or track published messages
});
DSN Format
The DSN must include pubsub://default/project-id/topic-name. Omitting default may cause errors.
Credentials
Ensure GCP credentials are set (via GOOGLE_APPLICATION_CREDENTIALS or environment variables):
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
Region-Specific Topics For multi-region projects, specify the region in the DSN:
transports:
pubsub_europe: 'pubsub://europe-west1/project-id/topic-name'
How can I help you explore Laravel packages today?