amphp/pipeline
Fiber-safe concurrent iterators and collection operators for AMPHP. Build pipelines from iterables or async sources, then consume them safely from multiple fibers via foreach or manual iteration, with backpressure-friendly async iteration on PHP 8.1+.
Queue is disposed (regression in 1.2.2).Full Changelog: https://github.com/amphp/pipeline/compare/v1.2.2...v1.2.3
Queue was completed while containing emitted values which had not been consumed, then the consumer explicitly disposed (called ConcurrentIterator::dispose()) of the associated iterator without consuming those values. Prior, the futures were not properly resolved with a DisposedException, but now will resolve as expected.Full Changelog: https://github.com/amphp/pipeline/compare/v1.2.1...v1.2.2
Queue to create a ConcurrentIterator. An Error with message "Must call suspend() before calling throw()" was thrown when a Cancellation provided to ConcurrentIterator::continue() was cancelled and the underlying Queue instance was simultaneously completed. See #22.Full Changelog: https://github.com/amphp/pipeline/compare/v1.2.0...v1.2.1
Pipeline::buffer(), which provides control of the number of values buffered by the pipeline before back-pressure is applied to the data source by @trowski in https://github.com/amphp/pipeline/pull/21Full Changelog: https://github.com/amphp/pipeline/compare/v1.1.0...v1.2.0
Pipeline::merge() which combines multiple iterators, emitting a value whenever any iterator emits a value.Pipeline::take() not completing until a value beyond the given count was emitted. The pipeline now completes immediately after emitting the last view.ConcurrentIterator as covariant.Initial stable release 🎉
ConcurrentArrayIterator, ConcurrentChainedIterator, and ConcurrentIterableIterator as [@internal](https://github.com/internal). Instead of these classes, use Pipeline::fromIterable() or Pipeline::concat()Pipeline::concat() now accepts an array of any iterable, not only other Pipeline objectsQueue that is destructed without being completed. PHP's random destruct order sometimes will lead to the Queue destructor being invoked before another destructor that would have completed the queue.ConcurrentIterableIteratorisComplete() to the ConcurrentIterator interface that returns true when the iterator has been completed (either successfully or with an error) and no further values are pending)ConcurrentIterator was held while awaiting the next value.ConcurrentIterableIterator and ConcurrentFlatMapIterator that prevented quick garbage collection, particularly problematic with instances created from Pipeline::fromIterable() using a generator.Pipeline has been changed from an interface to a final class. ConcurrentIterator acts as the interface replacementPipeline::pipe() has been removed in favor of operator methods directly on Pipeline, such as map() and filter()Emitter has been renamed to Queue
yield() has been renamed to push()emit() has been renamed to pushAsync()Amp\Pipeline have been removed.
fromIterable() is available as Pipeline::fromIterable()concat() is now Pipeline::concat()PipelinePipeline::generate() that invokes a closure to create each pipeline value.Example of using Pipeline for concurrency:
use Amp\Pipeline\Pipeline;
use function Amp\delay;
$pipeline = Pipeline::fromIterable(function (): \Generator {
for ($i = 0; $i < 100; ++$i) {
yield $i;
}
});
$results = $pipeline->concurrent(10)
->tap(fn () => delay(\random_int(1, 10) / 10)) // Delay for 0.1 to 1 seconds, simulating I/O.
->map(fn (int $input): int => $input * 10)
->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3.
foreach ($results as $value) {
echo $value, "\n";
}
AsyncGenerator class. Instead, fromIterable now also accepts a Closure returning an iterable, which can be a generator function.concurrentOrdered has been removed and concurrentUnordered renamed to concurrent. Unfortunately, ordered iteration broke if using operators that would not always emit a value, so support has been dropped.$bufferSize parameter to the Emitter constructor that sets a number of items that can be emitted before awaiting back-pressure from the pipeline consumer. This value defaults to 0, which will await back-pressure with every emitted value.Initial beta release!
How can I help you explore Laravel packages today?