Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Parallel Laravel Package

amphp/parallel

True parallel processing for PHP with AMPHP: run blocking work in worker processes or threads without blocking the event loop and without extensions. Includes non-blocking concurrency tools plus an opinionated worker pool API for submitting tasks and awaiting results.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Steps to Begin

  1. Installation:

    composer require amphp/parallel
    

    Ensure PHP 8.1+ is installed. For threads, PHP 8.2+ with ZTS and ext-parallel is required.

  2. First Use Case: Offload blocking operations (e.g., file I/O, HTTP requests) to worker processes/threads. Example: Parallelize fetching multiple URLs without blocking the event loop.

    use Amp\Parallel\Worker;
    use Amp\Future;
    
    $urls = ['https://example.com', 'https://laravel.com'];
    $executions = [];
    
    foreach ($urls as $url) {
        $executions[] = Worker\submit(new FetchTask($url));
    }
    
    $results = Future\await(array_map(fn ($e) => $e->getFuture(), $executions));
    
  3. Key Files to Review:

    • Worker for task submission.
    • Task interface for defining tasks.
    • WorkerPool for managing multiple workers.

Implementation Patterns

Core Workflows

  1. Task Submission:

    • Define a Task class implementing Amp\Parallel\Worker\Task with a run() method.
    • Submit tasks to a Worker or WorkerPool using submit().
    • Retrieve results via Execution::getFuture() or await().
    class ProcessDataTask implements Task {
        public function run(Channel $channel, Cancellation $cancellation): string {
            return strtoupper(file_get_contents($channel->receive()));
        }
    }
    
    $worker = Worker\createWorker();
    $execution = $worker->submit(new ProcessDataTask());
    $execution->send('hello'); // Send data to task
    $result = $execution->await(); // 'HELLO'
    
  2. Worker Pools:

    • Use WorkerPool for concurrent task execution across multiple workers.
    • Configure pool size via WorkerPool::create():
      $pool = WorkerPool::create(4); // 4 workers
      $executions = [];
      foreach ($urls as $url) {
          $executions[] = $pool->submit(new FetchTask($url));
      }
      
  3. Data Sharing:

    • Use static properties with LocalCache or AtomicCache for shared state:
      final class SharedTask implements Task {
          private static ?LocalCache $cache = null;
      
          public function run(Channel $channel, Cancellation $cancellation): mixed {
              $cache = self::$cache ??= new LocalCache();
              $cache->set('key', 'value');
              return $cache->get('key');
          }
      }
      
  4. Cancellation:

    • Pass a Cancellation token to submit() to cancel tasks:
      $cancellation = new Cancellation();
      $execution = $worker->submit(new LongRunningTask(), $cancellation);
      $cancellation->cancel(); // Cancel task
      
  5. Contexts for Custom Logic:

    • Use Context for bidirectional IPC (e.g., streaming data):
      $context = contextFactory()->start(__DIR__ . '/child_script.php');
      $context->send('data');
      $reply = $context->receive();
      

Integration Tips

  • Laravel Integration:

    • Offload long-running jobs (e.g., PDF generation, API calls) to workers in App\Jobs:
      class GeneratePdfJob implements ShouldQueue {
          public function handle() {
              $worker = Worker\createWorker();
              $execution = $worker->submit(new PdfTask($this->data));
              $pdf = $execution->await();
              Storage::put('pdf.pdf', $pdf);
          }
      }
      
    • Use WorkerPool in Laravel's app/Console/Kernel.php for batch processing:
      protected function schedule(Schedule $schedule) {
          $schedule->command('process:batch')->everyMinute()->pool(4);
      }
      
  • Event-Driven Workflows:

    • Combine with amphp/event-loop for reactive pipelines:
      $loop = Reactor::run();
      $pool = WorkerPool::create(2);
      $pool->submit(new AsyncTask())->then(
          fn ($result) => $loop->queue(fn () => Log::info($result))
      );
      
  • Testing:

    • Mock WorkerPool in tests:
      $mockPool = Mockery::mock(WorkerPool::class);
      $mockPool->shouldReceive('submit')->andReturn(new MockExecution());
      $this->app->instance(WorkerPool::class, $mockPool);
      

Gotchas and Tips

Pitfalls

  1. Serialization Issues:

    • Tasks and data must be serializable. Avoid passing:
      • Closures (use class methods instead).
      • Resources (e.g., database connections, file handles).
      • Non-serializable objects (e.g., stdClass with non-serializable properties).
    • Fix: Use DTOs or ensure all properties are serializable.
  2. Blocking the Event Loop:

    • Never call Worker::submit() from within a Task::run(). This creates a deadlock.
    • Fix: Offload further work to a queue or use Future\await() carefully.
  3. Worker Pool Exhaustion:

    • Submitting more tasks than workers can handle may cause timeouts.
    • Fix: Use WorkerPool::getWorker() to reserve a worker or increase pool size.
  4. Thread-Specific Quirks:

    • Threads (ext-parallel) share memory but have limitations (e.g., no shared file handles).
    • Fix: Prefer processes for I/O-bound tasks; use threads for CPU-bound tasks.
  5. Cancellation Ignored:

    • Tasks may ignore cancellation. Always check Cancellation status:
      if ($cancellation->isCancelled()) {
          throw new CancelledException();
      }
      
  6. Global State in Workers:

    • Static variables in tasks are shared across task runs but not between workers.
    • Fix: Use LocalCache or AtomicCache for shared state.

Debugging Tips

  1. Worker Logs:

    • Redirect worker output to a file:
      $worker = Worker\createWorker(['stdout' => fopen('worker.log', 'a')]);
      
  2. Task Timeouts:

    • Set a timeout for Execution::await():
      try {
          $result = $execution->await(new TimeoutException(), 5.0); // 5s timeout
      } catch (TimeoutException) {
          $execution->cancel();
      }
      
  3. Memory Leaks:

    • Monitor memory usage in workers. Large data structures may bloat workers.
    • Fix: Stream data instead of loading it entirely into memory.
  4. Channel Deadlocks:

    • Ensure send() and receive() are balanced. Unmatched calls block indefinitely.
    • Fix: Use Channel::isReadable()/isWritable() to check state.

Extension Points

  1. Custom Context Factories:

    • Extend ContextFactory to customize worker bootstrapping:
      class CustomContextFactory extends ProcessContextFactory {
          protected function createProcess(array $options): Process {
              $process = parent::createProcess($options);
              $process->setEnv(['APP_ENV' => 'worker']);
              return $process;
          }
      }
      
  2. Task Middleware:

    • Wrap tasks in middleware for logging/metrics:
      $execution = $worker->submit(new LoggingTaskMiddleware(new FetchTask($url)));
      
  3. Dynamic Worker Pools:

    • Scale pools dynamically based on load:
      $pool = WorkerPool::create(2);
      if ($load > 100) {
          $pool->resize(8); // Scale up
      }
      
  4. Error Handling:

    • Catch exceptions in tasks and propagate them:
      try {
          return $this->run($channel, $cancellation);
      } catch (Exception $e) {
          $channel->send(['error' => $e->getMessage()]);
          throw $e;
      }
      
  5. Laravel Service Providers:

    • Bind WorkerPool to the container:
      $this->app->singleton(WorkerPool::class, fn () => WorkerPool::create(4));
      
    • Resolve workers in controllers/jobs:
      $pool = app(WorkerPool::class);
      
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui
babelqueue/php-sdk
facebook/capi-param-builder-php
babelqueue/symfony
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle
atriumphp/atrium