google/cloud-pubsub
Idiomatic PHP client for Google Cloud Pub/Sub. Publish and consume messages between services using REST or gRPC (streaming supported). Install via Composer (google/cloud-pubsub) and authenticate with Google Cloud credentials.
Installation:
composer require google/cloud-pubsub
For gRPC (recommended for streaming):
composer require google/cloud-pubsub-grpc
First Use Case: Publish a message to a topic:
use Google\Cloud\PubSub\V1\PublisherClient;
use Google\Cloud\PubSub\V1\TopicName;
$publisher = new PublisherClient();
$topicName = new TopicName('your-project-id', 'your-topic-name');
$future = $publisher->publish($topicName, ['data' => 'Hello, Pub/Sub!']);
$messageId = $future->wait(); // Blocking call
Key Files:
PublisherClient with publish() for async batching (default: 100 messages or 1MB).
$batchSettings = $publisher->batchSettings();
$batchSettings->setMaxMessages(500)->setMaxBytes(5 * 1024 * 1024); // 5MB
$publisher->setBatchSettings($batchSettings);
orderingKey in the message for FIFO delivery.
$future = $publisher->publish($topicName, ['data' => 'data', 'orderingKey' => 'user-123']);
$subscription = $subscriber->subscription('projects/p/subscriptions/s');
$subscription->create([
'pushConfig' => ['pushEndpoint' => 'https://your-endpoint.com/pubsub']
]);
StreamingPullClient (gRPC required):
$streamingPullClient = new StreamingPullClient();
$stream = $streamingPullClient->streamingPull($subscriptionName);
foreach ($stream as $response) {
foreach ($response->getReceivedMessages() as $message) {
echo $message->getMessage()->getData(); // Process message
$streamingPullClient->acknowledge($subscriptionName, [$message->getAckId()]);
}
}
$subscriber->modifyAckDeadline($subscriptionName, $message->getAckId(), 300); // 5min
$topic->create(['schemaSettings' => ['schema' => 'projects/p/topics/t/schemas/s']]);
$transform = new \Google\Cloud\PubSub\V1\MessageTransform();
$transform->setDisabled(false);
$transform->setType('AI_INFERENCE');
$topic->create(['messageTransforms' => [$transform]]);
RetrySettings for pull operations:
$retrySettings = $subscriber->retrySettings();
$retrySettings->setMaxAttempts(5);
$subscriber->setRetrySettings($retrySettings);
$subscription->create(['deadLetterPolicy' => ['deadLetterTopic' => 'projects/p/topics/dlq']]);
use Google\Cloud\PubSub\V1\PublisherClient;
use Illuminate\Support\ServiceProvider;
class PubSubServiceProvider extends ServiceProvider {
public function register() {
$this->app->singleton(PublisherClient::class, function ($app) {
return new PublisherClient();
});
}
}
class PubSubQueueWorker extends ShouldQueue {
protected $topicName;
public function __construct(string $topicName) {
$this->topicName = $topicName;
}
public function handle() {
$publisher = app(PublisherClient::class);
$publisher->publish($this->topicName, ['data' => $this->payload]);
}
}
Authentication:
keyFile and keyFilePath are deprecated; use credentials instead:
$client = new PublisherClient(['credentials' => $serviceAccount]);
gRPC vs REST:
StreamingPullClient, ensure gRPC is installed. Fallback to REST for simple pull:
$subscriber = new SubscriberClient();
$response = $subscriber->pull($subscriptionName, ['returnImmediately' => false]);
Message Size Limits:
batchSettings().Idempotency:
ackId for acknowledgments. Never rely on message content.Schema Mismatches:
IngestionFailureEvent. Monitor with:
$subscription->getIamPolicy(); // Check for monitoring permissions
Enable Logging:
$client = new PublisherClient([
'logger' => new \Monolog\Logger('pubsub', [
new \Monolog\Handler\StreamHandler('php://stderr', \Monolog\Logger::DEBUG)
])
]);
Common Errors:
INVALID_ARGUMENT: Validate topic/subscription names (format: projects/{project}/topics/{topic}).PERMISSION_DENIED: Ensure IAM roles (roles/pubsub.publisher, roles/pubsub.subscriber).DEADLINE_EXCEEDED: Increase timeout for large pulls:
$subscriber->setGrpcOptions(['deadline' => new \Google\ApiCore\Grpc\Deadline(60)]);
Testing:
docker run -p 8085:8085 gcr.io/google.com/cloudsdktool/cloud-pubsub-emulator
PublisherClient in tests:
$mock = $this->createMock(PublisherClient::class);
$mock->method('publish')->willReturn(new \Google\ApiCore\Future\Future());
Custom Message Handlers:
Message class for domain-specific logic:
class OrderMessage extends \Google\Cloud\PubSub\V1\PubsubMessage {
public function getOrderId(): string {
return json_decode($this->getData(), true)['order_id'];
}
}
Middleware for Consumers:
StreamingPullClient to add preprocessing:
$stream = $streamingPullClient->streamingPull($subscriptionName);
foreach ($stream as $response) {
foreach ($response->getReceivedMessages() as $message) {
$processed = $this->preprocess($message->getMessage());
$this->handle($processed);
$streamingPullClient->acknowledge($subscriptionName, [$message->getAckId()]);
}
}
Event Dispatching:
event(new PubSubMessageReceived($message
How can I help you explore Laravel packages today?