hosmelq/sse
WHATWG-compliant PHP 8.2+ client for consuming Server-Sent Events. Connect via GET/POST using Guzzle, iterate events with a memory-efficient generator, and access data/event/id/retry fields with optional JSON decoding.
Start by installing the package via Composer:
composer require hosmelq/sse
For your first use case, create an SSE client and connect to an endpoint in a Laravel Artisan command or controller:
use HosmelQ\SSE\Client;
class ProcessSseEventsCommand extends Command
{
protected $signature = 'sse:process';
protected $description = 'Process Server-Sent Events';
public function handle()
{
$client = new Client();
$eventSource = $client->get('https://example.com/sse-endpoint');
foreach ($eventSource->events() as $event) {
$this->info("Received event: " . $event->data);
// Process event data (e.g., dispatch Laravel jobs, update models)
}
}
}
Key files to reference:
vendor/hosmelq/sse/src/Client.php (core implementation)vendor/hosmelq/sse/src/Exceptions/ (error handling)Client Initialization
// Basic usage
$client = new Client();
// With custom HTTP client (e.g., Guzzle with middleware)
$httpClient = new GuzzleHttp\Client(['timeout' => 30]);
$client = new Client($httpClient);
Event Processing Loop
$eventSource = $client->get('https://api.example.com/stream');
foreach ($eventSource->events() as $event) {
// Process each event
$this->handleEvent($event);
}
Laravel Integration
// In a service class
public function listenToEvents()
{
$client = app(Client::class);
$eventSource = $client->get(config('services.sse.endpoint'));
foreach ($eventSource->events() as $event) {
event(new SseEvent($event->data, $event->event));
}
}
Reconnection Logic
$lastEventId = null;
while (true) {
try {
$eventSource = $client->get('https://api.example.com/stream', [
'headers' => ['Last-Event-ID' => $lastEventId]
]);
foreach ($eventSource->events() as $event) {
$lastEventId = $event->id;
// Process event
}
} catch (SSEConnectionException $e) {
$this->error("Connection failed, retrying...");
sleep(5);
}
}
POST Requests with Authentication
$eventSource = $client->post('https://api.example.com/stream', [
'json' => ['auth' => $token],
'headers' => ['Authorization' => 'Bearer ' . $token]
]);
Event Data Parsing
foreach ($eventSource->events() as $event) {
$data = json_decode($event->data, true);
if ($event->event === 'notification') {
Notification::send($data['user_id'], $data['message']);
}
}
Queue Job for Background Processing
class ProcessSseEvents implements ShouldQueue
{
public function handle()
{
$client = new Client();
$eventSource = $client->get(env('SSE_ENDPOINT'));
foreach ($eventSource->events() as $event) {
// Dispatch additional jobs or update models
ProcessEventJob::dispatch($event->data);
}
}
}
Service Provider Binding
public function register()
{
$this->app->singleton(Client::class, function ($app) {
return new Client($app->make(GuzzleHttp\Client::class));
});
}
Event Listeners for SSE Events
class HandleSseEvent
{
public function handle(SseEvent $event)
{
// Process the event data
}
}
Connection Timeouts
$client = new Client(new GuzzleHttp\Client([
'timeout' => 0, // No timeout
'read_timeout' => 60, // 60s read timeout
]));
Memory Leaks
break or return in loops, or process events in batches.Last-Event-ID Handling
$lastEventId = Cache::get('sse_last_event_id');
$eventSource = $client->get('https://api.example.com/stream', [
'headers' => ['Last-Event-ID' => $lastEventId]
]);
foreach ($eventSource->events() as $event) {
Cache::put('sse_last_event_id', $event->id, now()->addMinutes(5));
}
Protocol Violations
data: field) throws SSEProtocolException.try {
foreach ($eventSource->events() as $event) {
if (empty($event->data)) continue;
// Process valid events
}
} catch (SSEProtocolException $e) {
Log::error("Invalid SSE format: " . $e->getMessage());
}
Log Raw Events
foreach ($eventSource->events() as $event) {
Log::debug("Raw event: " . $event->raw());
}
Check HTTP Headers
Accept: text/event-stream is set (the package handles this automatically).Test with a Mock Server Use a tool like EventSource-Test to simulate SSE endpoints locally.
Custom Event Classes
Extend the base Event class for type safety:
class NotificationEvent extends \HosmelQ\SSE\Event
{
public function getUserId(): int
{
return json_decode($this->data, true)['user_id'];
}
}
Middleware for HTTP Client Add authentication or logging middleware:
$httpClient = new GuzzleHttp\Client([
'middleware' => [
new AddAuthHeaderMiddleware($token),
new LogSseRequestsMiddleware(),
],
]);
Event Filtering Create a decorator to filter events:
class FilteredEventSource
{
public function __construct(private EventSource $eventSource, private string $eventType)
{
}
public function events(): Generator
{
foreach ($this->eventSource->events() as $event) {
if ($event->event === $this->eventType) {
yield $event;
}
}
}
}
Guzzle vs. Laravel HTTP Client
$client = new Client(app(\Illuminate\Http\Client\PendingRequest::class)->getClient());
Case Sensitivity
event) are case-sensitive. Ensure consistency with the server.Retry Headers
retry field is in milliseconds. Convert to seconds for Laravel's retry logic:
$retryDelay = $event->retry / 1000;
Batch Processing For high-volume streams, process events in batches:
$batch = [];
foreach ($eventSource->events() as $event) {
$batch[] = $event;
if (count($batch) >= 100) {
$this->processBatch($batch);
$batch = [];
}
}
Connection Pooling Reuse the HTTP client for multiple SSE connections to reduce overhead:
$httpClient = new GuzzleHttp\Client(['pooling' => true]);
$client = new Client($httpClient);
Memory Management Avoid storing large event histories. Use generators and process events immediately.
How can I help you explore Laravel packages today?