react/stream
Event-driven readable and writable streams for non-blocking I/O in ReactPHP. Process large data in small chunks with async-friendly stream interfaces and resource-based streams, with support for piping, backpressure, errors, and duplex streams.
Start by installing react/stream via Composer: composer require react/stream. Its core purpose is to provide event-driven, non-blocking streams — essential for building scalable async PHP apps with ReactPHP. First, import the namespace (use React\Stream;) and create a simple readable stream (e.g., from STDIN or a file resource) to begin handling data chunks incrementally. A minimal use case: read STDIN line-by-line without blocking the event loop:
$loop = React\EventLoop\Loop::get();
$stdin = new React\Stream\ReadableResourceStream(STDIN, $loop);
$stdin->on('data', fn($chunk) => echo "Received: $chunk");
$loop->run();
This is the foundational pattern — all higher-level ReactPHP components (e.g., HTTP server, TCP clients) rely on these abstractions. Always check the latest branch: v3 (main) vs stable v1 (1.x branch) — compatibility matters.
$source->pipe($transform1)->pipe($transform2)->pipe($dest). Use ThroughStream to implement lightweight filtering/decoding (e.g., JSON line decoder, gzip decompressor).pipe() with pause()/resume() for flow control. Example: throttle upload stream when disk I/O lags.$connection->pipe($connection).new ReadableResourceStream(fopen('file.log', 'r'), $loop) or new WritableResourceStream(STDOUT, $loop).CompositeStream, useful for aggregating inputs (e.g., merging multiple input sources).ThroughStream to build message delimiters (e.g., \n-terminated payloads) — emit full lines in data handlers after buffering partial chunks.end ≠ close. end signals successful EOF; close signals termination (success or failure). Always handle error to avoid silent hangs.isReadable() and isWritable() before operations — writing to a closed stream silently fails and may log warnings.data after pause() (e.g., buffered data). Always guard event handlers: if (!$stream->isReadable()) return;.\n-delimited parser with a buffer property.$dest->close() manually if source closes early (e.g., $source->on('close', fn() => $dest->end())).close() or garbage collection. Reusing resources after wrapping often causes issues.^1.11. Check changelog for EndStreamException removal and constructor parameter changes.How can I help you explore Laravel packages today?