Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Pipeline Laravel Package

amphp/pipeline

Fiber-safe concurrent iterators and collection operators for PHP 8.1+ using AMPHP. Build pipelines from iterables and consume them safely from multiple fibers with foreach or manual iteration, supporting async sets and concurrent processing.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Steps

  1. Installation:

    composer require amphp/pipeline
    

    Requires PHP 8.1+.

  2. First Use Case: Transform a simple iterable into a concurrent pipeline:

    use Amp\Pipeline\Pipeline;
    
    $pipeline = Pipeline::fromIterable([1, 2, 3, 4, 5])
        ->map(fn($x) => $x * 2)
        ->filter(fn($x) => $x % 3 === 0);
    
    foreach ($pipeline as $value) {
        echo $value; // Outputs: 6
    }
    
  3. Key Entry Points:

    • Pipeline::fromIterable(): Convert arrays/generators to pipelines.
    • Pipeline::generate(): Create infinite streams (e.g., Pipeline::generate(fn() => rand(1, 100))).
    • Queue: For producer-consumer patterns with back-pressure.

Implementation Patterns

Core Workflows

  1. Data Processing Pipelines:

    Pipeline::fromIterable($data)
        ->concurrent(5)       // Process 5 items concurrently
        ->tap(fn($x) => log($x)) // Side effects
        ->map(fn($x) => $x->process())
        ->filter(fn($x) => $x->isValid())
        ->forEach(fn($x) => $x->save());
    
  2. Producer-Consumer with Back-Pressure:

    $queue = new Queue();
    $consumer = Amp\async(function() use ($queue) {
        foreach ($queue->iterate() as $item) {
            process($item); // Blocks if queue is full
        }
    });
    
    // Producer (runs in background)
    Amp\async(function() use ($queue) {
        foreach ($data as $item) {
            $queue->push($item); // Waits if consumer is slow
        }
        $queue->complete();
    });
    
  3. Merging Streams:

    $pipeline = Pipeline::merge([
        Pipeline::fromIterable($stream1),
        Pipeline::fromIterable($stream2),
    ])->take(10); // First 10 items from either stream
    

Integration Tips

  • Laravel Compatibility: Use with Amp\Loop for async tasks (e.g., in Laravel queues):

    Amp\Loop::run(function() {
        $pipeline = Pipeline::fromIterable($jobs)
            ->concurrent(10)
            ->tap(fn($job) => $job->dispatch())
            ->forEach(fn($result) => log($result));
    });
    
  • Error Handling: Wrap pipelines in try-catch blocks to handle DisposedException or upstream errors:

    try {
        foreach ($pipeline as $item) { ... }
    } catch (DisposedException) {
        // Consumer was canceled
    }
    
  • Performance:

    • Use buffer() to control memory usage:
      $pipeline->buffer(100); // Buffer 100 items before back-pressure
      
    • Prefer unordered() for CPU-bound tasks to maximize concurrency.

Gotchas and Tips

Pitfalls

  1. Forgetting complete():

    • Issue: Queue consumers hang if complete() is omitted.
    • Fix: Always call complete() or error() after pushing all items.
  2. Concurrent Iterator State:

    • Issue: getValue()/getPosition() require continue() to be called first.
    • Fix: Use foreach instead of manual iteration unless necessary.
  3. Race Conditions:

    • Issue: Disposing a Queue while values are pending may cause DisposedException in producers.
    • Fix: Catch DisposedException in producers or ensure graceful shutdown.
  4. Ordered vs. Unordered:

    • Issue: concurrent() emits results out of order by default.
    • Fix: Use unordered() explicitly or avoid concurrency for ordered results.

Debugging Tips

  • Check isComplete():
    if ($iterator->isComplete()) {
        echo "Pipeline finished!";
    }
    
  • Log Pipeline State: Insert tap() for debugging:
    ->tap(fn($x) => logger()->debug("Value: $x"))
    

Extension Points

  1. Custom Operators: Create reusable pipeline methods:

    $pipeline->customOp(fn($carry, $item) => $carry + $item);
    
  2. Integrate with Laravel:

    • Use Pipeline in Artisan commands or Console kernels:
      Amp\Loop::run(function() {
          $this->handle(Pipeline::fromIterable($data));
      });
      
    • Combine with Amp\Promise for async HTTP requests:
      $pipeline->map(fn($url) => Amp\Promise\wait(Http::getAsync($url)));
      
  3. Testing:

    • Mock Queue for unit tests:
      $queue = $this->createMock(Queue::class);
      $queue->method('push')->willReturnCallback(...);
      

Config Quirks

  • Buffer Size: Default buffer(0) applies back-pressure immediately. Increase for bursty workloads (e.g., buffer(100)).
  • Concurrency Limits: Set concurrent(N) where N matches your system’s CPU cores or I/O capacity.

Performance Gotchas

  • Memory Leaks: Avoid infinite generators without take() or limit().
  • Blocking Operations: Ensure async operations (e.g., delay()) are used in tap() to prevent pipeline stalls.
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle
atriumphp/atrium
sandermuller/package-boost-laravel
sandermuller/boost-skills
redaxo/core
yusufgenc/filament-api-forge
l3aro/rating-star-for-filament
leek/filament-subtenant-scope