amphp/pipeline
Fiber-safe concurrent iterators and collection operators for PHP 8.1+ using AMPHP. Build pipelines from iterables and consume them safely from multiple fibers with foreach or manual iteration, supporting async sets and concurrent processing.
spatie/async or amphp/amp) and fiber-based concurrency models. Enables non-blocking data pipelines for high-throughput tasks (e.g., batch processing, real-time streams).Illuminate\Events) by enabling reactive pipelines for event-driven workflows (e.g., processing queued jobs, webhook payloads).map(), filter()), mirroring Laravel’s collection methods.Illuminate\Queue) to avoid memory overload when consuming high-volume async sources (e.g., Kafka, database streams).amphp/amp (for fibers) or spatie/async (for async/await). Laravel’s built-in async support (e.g., SynchronousQueue) is insufficient; this package bridges the gap.Collection for async operations (e.g., Pipeline::fromCollection($laravelCollection)->map(...)).Queue::push() → Pipeline::fromQueue()) for async job processing.Amp\run()).DisposedException) may propagate unpredictably in Laravel’s sync stack. Requires middleware to catch and rethrow fiber errors as HTTP exceptions or queue failures.async helper) or AMPHP’s fibers be the primary runtime? This affects pipeline initialization (e.g., Amp\run() vs. async()).ConcurrentIterator wrappers may be needed.DisposedException) map to Laravel’s error handling (e.g., try/catch in middleware vs. queue retries)?ConcurrentIterator requires custom test doubles (e.g., MockConcurrentIterator).Pipeline::fromDatabaseCursor()->concurrent(100)).Pipeline::fromRequestStream()->map(fn($chunk) => decode($chunk))).Event::listen(fn($event) => Pipeline::from($event->payload)->process())).Illuminate\Queue\Jobs in pipelines for parallel processing.Pipeline::fromRequest()->tap(fn($req) => $req->mergeAsyncData())).Pipeline::fromModel()->chunkBy(100)->map(fn($chunk) => $chunk->load('relations'))).Pipeline::fromIterable($files)->concurrent(50)->map(...).Pipeline facade to Laravel (e.g., use Illuminate\Support\Facades\Pipeline;).Pipeline::fromQueueJob(), Pipeline::fromDatabase()).app(Pipeline::class)->map(...)).laravel-pipeline-eloquent, laravel-pipeline-queue).$asyncResults = Pipeline::fromCollection($laravelCollection)
->concurrent(10)
->map(fn($item) => asyncFn($item));
Illuminate\Queue\Jobs to support pipeline processing:
class ProcessInPipelineJob implements ShouldQueue {
public function handle(): void {
Pipeline::from($this->data)
->concurrent(20)
->map(fn($item) => $this->processItem($item))
->forEach(fn($result) => $this->storeResult($result));
}
}
public function handle(Request $request, Closure $next): Response {
$response = Pipeline::from($request->stream())
->map(fn($chunk) => transform($chunk))
->reduce(fn($carry, $chunk) => $carry . $chunk)
->then(fn($body) => response($body));
return $next($request)->then(fn($res) => $response);
}
Event::listen(OrderCreated::class, function (OrderCreated $event) {
Pipeline::from($event->order->items)
->concurrent(5)
->map(fn($item) => validateInventory($item))
->forEach(fn($result) => log($result));
});
Amp\run() or async()).Queue::push() for async data sources (e.g., database streams, HTTP clients).foreach or forEach() to consume results synchronously or asynchronously.complete() or error() on Queue instances to avoid hanging consumers.try/catch to handle DisposedException or async errors.amphp/pipeline and amphp/amp to composer.json. Monitor for breaking changes in the AMPHP ecosystem.ConcurrentIterator state").Queue instances or unbounded concurrency. Solution: Use buffer() and complete().ConcurrentIterator state (e.g., isComplete(), getPosition()).Xdebug to inspect fiber stacks.Pipeline::healthCheck()).How can I help you explore Laravel packages today?