amphp/pipeline
Fiber-safe concurrent iterators and collection operators for PHP 8.1+ using AMPHP. Build pipelines from iterables and consume them safely from multiple fibers with foreach or manual iteration, supporting async sets and concurrent processing.
Installation:
composer require amphp/pipeline
Requires PHP 8.1+.
First Use Case: Transform a simple iterable into a concurrent pipeline:
use Amp\Pipeline\Pipeline;
$pipeline = Pipeline::fromIterable([1, 2, 3, 4, 5])
->map(fn($x) => $x * 2)
->filter(fn($x) => $x % 3 === 0);
foreach ($pipeline as $value) {
echo $value; // Outputs: 6
}
Key Entry Points:
Pipeline::fromIterable(): Convert arrays/generators to pipelines.Pipeline::generate(): Create infinite streams (e.g., Pipeline::generate(fn() => rand(1, 100))).Queue: For producer-consumer patterns with back-pressure.Data Processing Pipelines:
Pipeline::fromIterable($data)
->concurrent(5) // Process 5 items concurrently
->tap(fn($x) => log($x)) // Side effects
->map(fn($x) => $x->process())
->filter(fn($x) => $x->isValid())
->forEach(fn($x) => $x->save());
Producer-Consumer with Back-Pressure:
$queue = new Queue();
$consumer = Amp\async(function() use ($queue) {
foreach ($queue->iterate() as $item) {
process($item); // Blocks if queue is full
}
});
// Producer (runs in background)
Amp\async(function() use ($queue) {
foreach ($data as $item) {
$queue->push($item); // Waits if consumer is slow
}
$queue->complete();
});
Merging Streams:
$pipeline = Pipeline::merge([
Pipeline::fromIterable($stream1),
Pipeline::fromIterable($stream2),
])->take(10); // First 10 items from either stream
Laravel Compatibility:
Use with Amp\Loop for async tasks (e.g., in Laravel queues):
Amp\Loop::run(function() {
$pipeline = Pipeline::fromIterable($jobs)
->concurrent(10)
->tap(fn($job) => $job->dispatch())
->forEach(fn($result) => log($result));
});
Error Handling:
Wrap pipelines in try-catch blocks to handle DisposedException or upstream errors:
try {
foreach ($pipeline as $item) { ... }
} catch (DisposedException) {
// Consumer was canceled
}
Performance:
buffer() to control memory usage:
$pipeline->buffer(100); // Buffer 100 items before back-pressure
unordered() for CPU-bound tasks to maximize concurrency.Forgetting complete():
Queue consumers hang if complete() is omitted.complete() or error() after pushing all items.Concurrent Iterator State:
getValue()/getPosition() require continue() to be called first.foreach instead of manual iteration unless necessary.Race Conditions:
Queue while values are pending may cause DisposedException in producers.DisposedException in producers or ensure graceful shutdown.Ordered vs. Unordered:
concurrent() emits results out of order by default.unordered() explicitly or avoid concurrency for ordered results.isComplete():
if ($iterator->isComplete()) {
echo "Pipeline finished!";
}
tap() for debugging:
->tap(fn($x) => logger()->debug("Value: $x"))
Custom Operators: Create reusable pipeline methods:
$pipeline->customOp(fn($carry, $item) => $carry + $item);
Integrate with Laravel:
Pipeline in Artisan commands or Console kernels:
Amp\Loop::run(function() {
$this->handle(Pipeline::fromIterable($data));
});
Amp\Promise for async HTTP requests:
$pipeline->map(fn($url) => Amp\Promise\wait(Http::getAsync($url)));
Testing:
Queue for unit tests:
$queue = $this->createMock(Queue::class);
$queue->method('push')->willReturnCallback(...);
buffer(0) applies back-pressure immediately. Increase for bursty workloads (e.g., buffer(100)).concurrent(N) where N matches your system’s CPU cores or I/O capacity.take() or limit().delay()) are used in tap() to prevent pipeline stalls.How can I help you explore Laravel packages today?