amphp/pipeline
Fiber-safe concurrent iterators and collection operators for AMPHP. Build pipelines from iterables or async sources, then consume them safely from multiple fibers via foreach or manual iteration, with backpressure-friendly async iteration on PHP 8.1+.
Install via Composer (composer require amphp/pipeline) and ensure you're on PHP 8.1+. Begin by creating a simple Pipeline from an iterable—Pipeline::fromIterable([...]) or Pipeline::generate(fn () => ...). Use foreach to consume it synchronously or in fibers. For simple async iteration without concurrency, the interface mimics standard PHP Iterator, making adoption intuitive. The first practical use case: transforming a generator-based data source into a concurrent pipeline with ->concurrent($n), ->map(), and ->filter() to process items asynchronously (e.g., HTTP requests, DB queries) while preserving order or allowing out-of-order emission via ->unordered().
Pipeline::fromIterable($source) → chain operators like map(), filter(), delay(), tap() → ->concurrent($parallelism) → ->unordered() if order doesn’t matter. Perfect for batch-processing batches of API calls, file I/O, or ETL jobs where parallelization wins.Queue: Use Queue when producing and consuming rates differ significantly (e.g., streaming logs, event ingestion). push() blocks until consumed (synchronous producer); pushAsync() returns a Future for fire-and-forget or monitorable production. Always call complete() (or error()) when done.Pipeline results directly into sinks: ->forEach($sink), ->toArray(), or ->reduce($callback). Combine streams with Pipeline::concat() or Pipeline::merge() (emits values as any source yields).ConcurrentIterator methods (continue(), getValue(), getPosition()) in low-level fiber loops—though foreach is preferred unless fine-grained state management is needed.complete() on Queue — otherwise consumers hang indefinitely. Wrap producer fibers in try/finally to guarantee completion.concurrent() + unordered() maximizes throughput — avoid ordered behavior unless strictly required, as it may stall on slow items blocking later ones.buffer($size) controls how many items are queued before back-pressure kicks in. Default is 0 (strict back-pressure per item).DisposedException handling — catch this when push()/pushAsync() fails due to consumer early termination (e.g., timeout, cancel). In high-reliability systems, ignore it or log and discard.tap() for logging/observability without side-effect pollution.v1.2.1) and disposal bugs (v1.2.2, v1.2.3). Lock to ^1.2 to avoid regressions.tap(fn ($v) => error_log("Processing: $v")) early in the pipeline to inspect data flow without disrupting iteration logic.How can I help you explore Laravel packages today?