Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Pipeline Laravel Package

amphp/pipeline

Fiber-safe concurrent iterators and collection operators for PHP 8.1+ using AMPHP. Build pipelines from iterables and consume them safely from multiple fibers with foreach or manual iteration, supporting async sets and concurrent processing.

View on GitHub
Deep Wiki
Context7

Technical Evaluation

Architecture Fit

  • Event-Driven & Fiber-Based: Aligns perfectly with Laravel’s async/await (via spatie/async or amphp/amp) and fiber-based concurrency models. Enables non-blocking data pipelines for high-throughput tasks (e.g., batch processing, real-time streams).
  • Reactive Streams: Complements Laravel’s event system (e.g., Illuminate\Events) by enabling reactive pipelines for event-driven workflows (e.g., processing queued jobs, webhook payloads).
  • Functional Composition: Supports Laravel’s service container and dependency injection via fluent pipeline methods (e.g., map(), filter()), mirroring Laravel’s collection methods.
  • Back-Pressure: Critical for Laravel’s queue workers (e.g., Illuminate\Queue) to avoid memory overload when consuming high-volume async sources (e.g., Kafka, database streams).

Integration Feasibility

  • PHP 8.1+: Laravel 10+ (PHP 8.1+) is required, ensuring compatibility. No breaking changes for modern Laravel apps.
  • AMPHP Ecosystem: Requires amphp/amp (for fibers) or spatie/async (for async/await). Laravel’s built-in async support (e.g., SynchronousQueue) is insufficient; this package bridges the gap.
  • Laravel Collections: Pipelines can replace or extend Laravel’s Collection for async operations (e.g., Pipeline::fromCollection($laravelCollection)->map(...)).
  • Queue Workers: Directly integrates with Laravel’s queue system (e.g., Queue::push()Pipeline::fromQueue()) for async job processing.

Technical Risk

  • Fiber Adoption: Laravel’s core is not fiber-aware; mixing fibers with synchronous code (e.g., Eloquent ORM) risks deadlocks. Mitigation: Isolate pipelines to fiber-safe contexts (e.g., queue workers, HTTP middleware with Amp\run()).
  • Learning Curve: Fibers and async iterators are unfamiliar to most Laravel devs. Requires documentation/examples for Laravel-specific use cases (e.g., async Eloquent queries).
  • Error Handling: Async exceptions (e.g., DisposedException) may propagate unpredictably in Laravel’s sync stack. Requires middleware to catch and rethrow fiber errors as HTTP exceptions or queue failures.
  • Performance Overhead: Concurrent iterators add latency for small datasets. Benchmark against synchronous alternatives (e.g., Laravel Collections) to justify use.

Key Questions

  1. Async Stack: Will Laravel’s async/await (e.g., async helper) or AMPHP’s fibers be the primary runtime? This affects pipeline initialization (e.g., Amp\run() vs. async()).
  2. Data Sources: What async sources will feed pipelines? (e.g., database cursors, HTTP streams, queue consumers). Custom ConcurrentIterator wrappers may be needed.
  3. Error Recovery: How will pipeline failures (e.g., DisposedException) map to Laravel’s error handling (e.g., try/catch in middleware vs. queue retries)?
  4. Testing: How will pipelines be tested? Mocking ConcurrentIterator requires custom test doubles (e.g., MockConcurrentIterator).
  5. Scaling: Will pipelines run in Laravel’s request lifecycle (risking timeouts) or in long-running processes (e.g., workers, cron jobs)?

Integration Approach

Stack Fit

  • Primary Use Cases:
    • Async Data Processing: Replace synchronous loops in queue workers (e.g., processing 10K records with Pipeline::fromDatabaseCursor()->concurrent(100)).
    • Real-Time Streams: Handle WebSocket/HTTP streams (e.g., Pipeline::fromRequestStream()->map(fn($chunk) => decode($chunk))).
    • Event-Driven Workflows: Chain event listeners with pipelines (e.g., Event::listen(fn($event) => Pipeline::from($event->payload)->process())).
  • Laravel-Specific Integrations:
    • Queue Workers: Wrap Illuminate\Queue\Jobs in pipelines for parallel processing.
    • HTTP Middleware: Use pipelines for async request/response transformations (e.g., Pipeline::fromRequest()->tap(fn($req) => $req->mergeAsyncData())).
    • Eloquent: Create async query builders (e.g., Pipeline::fromModel()->chunkBy(100)->map(fn($chunk) => $chunk->load('relations'))).

Migration Path

  1. Phase 1: Proof of Concept
    • Isolate a single high-throughput use case (e.g., a queue worker processing files).
    • Replace synchronous loops with Pipeline::fromIterable($files)->concurrent(50)->map(...).
    • Measure performance vs. memory usage.
  2. Phase 2: Core Integration
    • Add a Pipeline facade to Laravel (e.g., use Illuminate\Support\Facades\Pipeline;).
    • Create Laravel-specific pipeline builders (e.g., Pipeline::fromQueueJob(), Pipeline::fromDatabase()).
    • Integrate with Laravel’s service container for dependency injection (e.g., app(Pipeline::class)->map(...)).
  3. Phase 3: Ecosystem Adoption
    • Publish packages for common integrations (e.g., laravel-pipeline-eloquent, laravel-pipeline-queue).
    • Document fiber-safe patterns for Laravel devs (e.g., "Always run pipelines in workers, not routes").

Compatibility

  • Laravel Collections: Pipelines can wrap collections for async operations:
    $asyncResults = Pipeline::fromCollection($laravelCollection)
        ->concurrent(10)
        ->map(fn($item) => asyncFn($item));
    
  • Queue System: Extend Illuminate\Queue\Jobs to support pipeline processing:
    class ProcessInPipelineJob implements ShouldQueue {
        public function handle(): void {
            Pipeline::from($this->data)
                ->concurrent(20)
                ->map(fn($item) => $this->processItem($item))
                ->forEach(fn($result) => $this->storeResult($result));
        }
    }
    
  • HTTP: Use pipelines in middleware for async request/response handling:
    public function handle(Request $request, Closure $next): Response {
        $response = Pipeline::from($request->stream())
            ->map(fn($chunk) => transform($chunk))
            ->reduce(fn($carry, $chunk) => $carry . $chunk)
            ->then(fn($body) => response($body));
        return $next($request)->then(fn($res) => $response);
    }
    
  • Events: Process event payloads asynchronously:
    Event::listen(OrderCreated::class, function (OrderCreated $event) {
        Pipeline::from($event->order->items)
            ->concurrent(5)
            ->map(fn($item) => validateInventory($item))
            ->forEach(fn($result) => log($result));
    });
    

Sequencing

  1. Initialization: Pipelines must be initialized in a fiber context (e.g., Amp\run() or async()).
  2. Data Flow:
    • Producers: Use Queue::push() for async data sources (e.g., database streams, HTTP clients).
    • Consumers: Use foreach or forEach() to consume results synchronously or asynchronously.
  3. Cleanup: Always call complete() or error() on Queue instances to avoid hanging consumers.
  4. Error Handling: Wrap pipeline execution in try/catch to handle DisposedException or async errors.

Operational Impact

Maintenance

  • Dependency Management: Add amphp/pipeline and amphp/amp to composer.json. Monitor for breaking changes in the AMPHP ecosystem.
  • Documentation: Maintain Laravel-specific guides for:
    • Fiber safety (e.g., "Avoid mixing fibers with sync code").
    • Common patterns (e.g., "Using pipelines in queue workers").
    • Debugging (e.g., "How to inspect ConcurrentIterator state").
  • Testing: Write integration tests for:
    • Pipeline + Laravel Collections.
    • Pipeline + Queue Workers.
    • Error scenarios (e.g., disposed iterators, async exceptions).

Support

  • Common Issues:
    • Deadlocks: Caused by mixing fibers and sync code (e.g., Eloquent in a pipeline). Solution: Isolate pipelines to fiber-only contexts.
    • Memory Leaks: Unclosed Queue instances or unbounded concurrency. Solution: Use buffer() and complete().
    • Timeouts: Pipelines in HTTP routes may hit Laravel’s max execution time. Solution: Offload to workers.
  • Debugging Tools:
    • Log ConcurrentIterator state (e.g., isComplete(), getPosition()).
    • Use Xdebug to inspect fiber stacks.
    • Add health checks for long-running pipelines (e.g., Pipeline::healthCheck()).

Scaling

  • Horizontal Scaling: Pipelines are stateless; scale by adding more workers (e
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
apiboxsym/user-bundle
apiboxsym/health-check-bundle
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui
babelqueue/php-sdk
facebook/capi-param-builder-php
babelqueue/symfony
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui