Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Pipeline Laravel Package

amphp/pipeline

Fiber-safe concurrent iterators and collection operators for AMPHP. Build pipelines from iterables or async sources, then consume them safely from multiple fibers via foreach or manual iteration, with backpressure-friendly async iteration on PHP 8.1+.

View on GitHub
Deep Wiki
Context7

Getting Started

Install via Composer (composer require amphp/pipeline) and ensure you're on PHP 8.1+. Begin by creating a simple Pipeline from an iterable—Pipeline::fromIterable([...]) or Pipeline::generate(fn () => ...). Use foreach to consume it synchronously or in fibers. For simple async iteration without concurrency, the interface mimics standard PHP Iterator, making adoption intuitive. The first practical use case: transforming a generator-based data source into a concurrent pipeline with ->concurrent($n), ->map(), and ->filter() to process items asynchronously (e.g., HTTP requests, DB queries) while preserving order or allowing out-of-order emission via ->unordered().

Implementation Patterns

  • Concurrent Data Processing: Use Pipeline::fromIterable($source) → chain operators like map(), filter(), delay(), tap()->concurrent($parallelism)->unordered() if order doesn’t matter. Perfect for batch-processing batches of API calls, file I/O, or ETL jobs where parallelization wins.
  • Back-Pressure with Queue: Use Queue when producing and consuming rates differ significantly (e.g., streaming logs, event ingestion). push() blocks until consumed (synchronous producer); pushAsync() returns a Future for fire-and-forget or monitorable production. Always call complete() (or error()) when done.
  • Streaming & Integration: Feed Pipeline results directly into sinks: ->forEach($sink), ->toArray(), or ->reduce($callback). Combine streams with Pipeline::concat() or Pipeline::merge() (emits values as any source yields).
  • Structured Fiber Coordination: Use ConcurrentIterator methods (continue(), getValue(), getPosition()) in low-level fiber loops—though foreach is preferred unless fine-grained state management is needed.

Gotchas and Tips

  • Always call complete() on Queue — otherwise consumers hang indefinitely. Wrap producer fibers in try/finally to guarantee completion.
  • concurrent() + unordered() maximizes throughput — avoid ordered behavior unless strictly required, as it may stall on slow items blocking later ones.
  • Back-pressure triggers only when buffers fillbuffer($size) controls how many items are queued before back-pressure kicks in. Default is 0 (strict back-pressure per item).
  • DisposedException handling — catch this when push()/pushAsync() fails due to consumer early termination (e.g., timeout, cancel). In high-reliability systems, ignore it or log and discard.
  • Memory management — avoid holding references to yielded values in long-running pipelines. Use tap() for logging/observability without side-effect pollution.
  • Upgrade readiness — recent patches fixed race conditions (v1.2.1) and disposal bugs (v1.2.2, v1.2.3). Lock to ^1.2 to avoid regressions.
  • Debugging tip — use tap(fn ($v) => error_log("Processing: $v")) early in the pipeline to inspect data flow without disrupting iteration logic.
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
davejamesmiller/laravel-breadcrumbs
artisanry/parsedown
christhompsontldr/phpsdk
enqueue/dsn
bunny/bunny
enqueue/test
enqueue/null
enqueue/amqp-tools
bower-asset/punycode
bower-asset/inputmask
bower-asset/jquery
bower-asset/yii2-pjax
laravel/nova
spatie/laravel-mailcoach
spatie/laravel-superseeder
laravel/liferaft
nst/json-test-suite
danielmiessler/sec-lists
jackalope/jackalope-transport
twbs/bootstrap4