react/promise-stream
ReactPHP helper functions bridging promises and streams. Buffer an entire readable stream into a promise, get the first chunk, collect all chunks, or unwrap promises to readable/writable streams with proper error, cancel, and max-length handling.
Start by installing the package via Composer:
composer require react/promise-stream:^1.7
This library provides bridging utilities between ReactPHP’s Promise API and stream events. Your first use case will likely be converting a streaming response (e.g., HTTP, socket, or file) into a single string (e.g., for JSON parsing), using buffer():
use function React\Promise\Stream\buffer;
$stream = $httpClient->request('GET', 'https://example.com/data.json');
buffer($stream)->then(function (string $contents) {
$data = json_decode($contents, true);
// Process complete payload
});
Check the use function React\Promise\Stream\* imports—this package only contains a few pure functions, no classes.
buffer() to collect full stream contents into memory (safe for small payloads), or all() to get all chunks as an array if per-chunk processing is needed.first() when you only need the first chunk (e.g., to read a magic header or version byte before switching parsers).unwrapReadable() or unwrapWritable() to return a proxy stream immediately—your consumer code can listen for data/write before the underlying stream resolves.unwrapWritable(), writes before resolution are buffered transparently. Combine with backpressure-aware consumers (e.g., stream_pipeline or manual write()/drain handling) to avoid memory bloat.Example: upload with streaming body built after auth resolution:
$promise = $authClient->getToken()->then(fn($token) => createUploadStream($token));
$uploadStream = React\Promise\Stream\unwrapWritable($promise);
$uploadStream->write($jsonData);
$uploadStream->end();
buffer(), first(), and all() reject if cancelled. Always handle this, especially when implementing timeouts (e.g., using React\Promise\timeout() or manual timer + $promise->cancel()).buffer($stream, 1024) to avoid OOM on large/unbounded streams. The resulting OverflowException is more explicit than a silent crash.all() and first() may emit null if events carry no data (e.g., custom events like 'end' or 'close'). Sanitize before type-casting or decoding.unwrapReadable()/unwrapWritable() only emit errors correctly if the promise is pending at call time. If the promise resolves synchronously (e.g., cached or already fulfilled), error events may be lost—guard against this with is_fulfilled($promise) or use unwrapPromise() manually if needed.unwrapReadable() returns a new proxy each time. If reusing streams, explicitly close proxies to avoid leaks or uncancelled pending promises.ReadableStreamInterface<string> implies each chunk is a string, but PHP streams may yield binary or mixed. Validate expected chunk types before json_decode() or unserialize().>=1.6.0 if you run long-lived apps where memory leaks are critical.How can I help you explore Laravel packages today?