Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Pipeline Laravel Package

amphp/pipeline

Fiber-safe concurrent iterators and collection operators for AMPHP. Build pipelines from iterables or async sources, then consume them safely from multiple fibers via foreach or manual iteration, with backpressure-friendly async iteration on PHP 8.1+.

View on GitHub
Deep Wiki
Context7
v1.2.3
  • Fixed failing any pending value consumption when a Queue is disposed (regression in 1.2.2).

Full Changelog: https://github.com/amphp/pipeline/compare/v1.2.2...v1.2.3

v1.2.2

What's Changed

  • Fixed clearing back-pressure if a Queue was completed while containing emitted values which had not been consumed, then the consumer explicitly disposed (called ConcurrentIterator::dispose()) of the associated iterator without consuming those values. Prior, the futures were not properly resolved with a DisposedException, but now will resolve as expected.

Full Changelog: https://github.com/amphp/pipeline/compare/v1.2.1...v1.2.2

v1.2.1

What's Changed

  • Fix a potential race condition when using a Queue to create a ConcurrentIterator. An Error with message "Must call suspend() before calling throw()" was thrown when a Cancellation provided to ConcurrentIterator::continue() was cancelled and the underlying Queue instance was simultaneously completed. See #22.

Full Changelog: https://github.com/amphp/pipeline/compare/v1.2.0...v1.2.1

v1.2.0

What's Changed

Full Changelog: https://github.com/amphp/pipeline/compare/v1.1.0...v1.2.0

v1.1.0
  • Added Pipeline::merge() which combines multiple iterators, emitting a value whenever any iterator emits a value.
  • Fixed Pipeline::take() not completing until a value beyond the given count was emitted. The pipeline now completes immediately after emitting the last view.
  • Marked the template type of ConcurrentIterator as covariant.
v1.0.0

Initial stable release 🎉

Changes from 1.0.0 Beta 7

  • Marked ConcurrentArrayIterator, ConcurrentChainedIterator, and ConcurrentIterableIterator as [@internal](https://github.com/internal). Instead of these classes, use Pipeline::fromIterable() or Pipeline::concat()
  • Pipeline::concat() now accepts an array of any iterable, not only other Pipeline objects
v1.0.0-beta.7
  • Removed failing a Queue that is destructed without being completed. PHP's random destruct order sometimes will lead to the Queue destructor being invoked before another destructor that would have completed the queue.
v1.0.0-beta.6
  • Add compatibility with Revolt v1.x
  • Improve ConcurrentIterableIterator
v1.0.0-beta.5
  • Added isComplete() to the ConcurrentIterator interface that returns true when the iterator has been completed (either successfully or with an error) and no further values are pending)
  • Fixed an issue where a reference to the prior value emitted on a ConcurrentIterator was held while awaiting the next value.
v1.0.0-beta.4
  • PHP 8.1 is now required.
  • Fixed circular references in ConcurrentIterableIterator and ConcurrentFlatMapIterator that prevented quick garbage collection, particularly problematic with instances created from Pipeline::fromIterable() using a generator.
v1.0.0-beta.3
  • Pipeline has been changed from an interface to a final class. ConcurrentIterator acts as the interface replacement
  • Pipeline::pipe() has been removed in favor of operator methods directly on Pipeline, such as map() and filter()
  • Emitter has been renamed to Queue
    • yield() has been renamed to push()
    • emit() has been renamed to pushAsync()
  • All functions in the Amp\Pipeline have been removed.
    • fromIterable() is available as Pipeline::fromIterable()
    • concat() is now Pipeline::concat()
    • Most operators are available directly on Pipeline
  • Added Pipeline::generate() that invokes a closure to create each pipeline value.

Example of using Pipeline for concurrency:

use Amp\Pipeline\Pipeline;
use function Amp\delay;

$pipeline = Pipeline::fromIterable(function (): \Generator {
    for ($i = 0; $i < 100; ++$i) {
        yield $i;
    }
});

$results = $pipeline->concurrent(10)
        ->tap(fn () => delay(\random_int(1, 10) / 10))  // Delay for 0.1 to 1 seconds, simulating I/O.
        ->map(fn (int $input): int => $input * 10)
        ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3.

foreach ($results as $value) {
    echo $value, "\n";
}
v1.0.0-beta.2
  • Pipeline back-pressure has been modified to be relieved immediately when the value is consumed from the pipeline. Prior, another value had to be requested from the pipeline before back-pressure was relieved.
  • Removed AsyncGenerator class. Instead, fromIterable now also accepts a Closure returning an iterable, which can be a generator function.
  • concurrentOrdered has been removed and concurrentUnordered renamed to concurrent. Unfortunately, ordered iteration broke if using operators that would not always emit a value, so support has been dropped.
  • Added an optional $bufferSize parameter to the Emitter constructor that sets a number of items that can be emitted before awaiting back-pressure from the pipeline consumer. This value defaults to 0, which will await back-pressure with every emitted value.
v1.0.0-beta.1

Initial beta release!

Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
davejamesmiller/laravel-breadcrumbs
artisanry/parsedown
christhompsontldr/phpsdk
enqueue/dsn
bunny/bunny
enqueue/test
enqueue/null
enqueue/amqp-tools
bower-asset/punycode
bower-asset/inputmask
bower-asset/jquery
bower-asset/yii2-pjax
laravel/nova
spatie/laravel-mailcoach
spatie/laravel-superseeder
laravel/liferaft
nst/json-test-suite
danielmiessler/sec-lists
jackalope/jackalope-transport
twbs/bootstrap4