amphp/parallel
True parallel processing for PHP with AMPHP: run blocking work in worker processes or threads without blocking the event loop and without extensions. Includes non-blocking concurrency tools plus an opinionated worker pool API for submitting tasks and awaiting results.
Installation:
composer require amphp/parallel
Ensure PHP 8.1+ is installed. For threads, PHP 8.2+ with ZTS and ext-parallel is required.
First Use Case: Offload blocking operations (e.g., file I/O, HTTP requests) to worker processes/threads. Example: Parallelize fetching multiple URLs without blocking the event loop.
use Amp\Parallel\Worker;
use Amp\Future;
$urls = ['https://example.com', 'https://laravel.com'];
$executions = [];
foreach ($urls as $url) {
$executions[] = Worker\submit(new FetchTask($url));
}
$results = Future\await(array_map(fn ($e) => $e->getFuture(), $executions));
Key Files to Review:
Worker for task submission.Task interface for defining tasks.WorkerPool for managing multiple workers.Task Submission:
Task class implementing Amp\Parallel\Worker\Task with a run() method.Worker or WorkerPool using submit().Execution::getFuture() or await().class ProcessDataTask implements Task {
public function run(Channel $channel, Cancellation $cancellation): string {
return strtoupper(file_get_contents($channel->receive()));
}
}
$worker = Worker\createWorker();
$execution = $worker->submit(new ProcessDataTask());
$execution->send('hello'); // Send data to task
$result = $execution->await(); // 'HELLO'
Worker Pools:
WorkerPool for concurrent task execution across multiple workers.WorkerPool::create():
$pool = WorkerPool::create(4); // 4 workers
$executions = [];
foreach ($urls as $url) {
$executions[] = $pool->submit(new FetchTask($url));
}
Data Sharing:
LocalCache or AtomicCache for shared state:
final class SharedTask implements Task {
private static ?LocalCache $cache = null;
public function run(Channel $channel, Cancellation $cancellation): mixed {
$cache = self::$cache ??= new LocalCache();
$cache->set('key', 'value');
return $cache->get('key');
}
}
Cancellation:
Cancellation token to submit() to cancel tasks:
$cancellation = new Cancellation();
$execution = $worker->submit(new LongRunningTask(), $cancellation);
$cancellation->cancel(); // Cancel task
Contexts for Custom Logic:
Context for bidirectional IPC (e.g., streaming data):
$context = contextFactory()->start(__DIR__ . '/child_script.php');
$context->send('data');
$reply = $context->receive();
Laravel Integration:
App\Jobs:
class GeneratePdfJob implements ShouldQueue {
public function handle() {
$worker = Worker\createWorker();
$execution = $worker->submit(new PdfTask($this->data));
$pdf = $execution->await();
Storage::put('pdf.pdf', $pdf);
}
}
WorkerPool in Laravel's app/Console/Kernel.php for batch processing:
protected function schedule(Schedule $schedule) {
$schedule->command('process:batch')->everyMinute()->pool(4);
}
Event-Driven Workflows:
amphp/event-loop for reactive pipelines:
$loop = Reactor::run();
$pool = WorkerPool::create(2);
$pool->submit(new AsyncTask())->then(
fn ($result) => $loop->queue(fn () => Log::info($result))
);
Testing:
WorkerPool in tests:
$mockPool = Mockery::mock(WorkerPool::class);
$mockPool->shouldReceive('submit')->andReturn(new MockExecution());
$this->app->instance(WorkerPool::class, $mockPool);
Serialization Issues:
stdClass with non-serializable properties).Blocking the Event Loop:
Worker::submit() from within a Task::run(). This creates a deadlock.Future\await() carefully.Worker Pool Exhaustion:
WorkerPool::getWorker() to reserve a worker or increase pool size.Thread-Specific Quirks:
ext-parallel) share memory but have limitations (e.g., no shared file handles).Cancellation Ignored:
Cancellation status:
if ($cancellation->isCancelled()) {
throw new CancelledException();
}
Global State in Workers:
LocalCache or AtomicCache for shared state.Worker Logs:
$worker = Worker\createWorker(['stdout' => fopen('worker.log', 'a')]);
Task Timeouts:
Execution::await():
try {
$result = $execution->await(new TimeoutException(), 5.0); // 5s timeout
} catch (TimeoutException) {
$execution->cancel();
}
Memory Leaks:
Channel Deadlocks:
send() and receive() are balanced. Unmatched calls block indefinitely.Channel::isReadable()/isWritable() to check state.Custom Context Factories:
ContextFactory to customize worker bootstrapping:
class CustomContextFactory extends ProcessContextFactory {
protected function createProcess(array $options): Process {
$process = parent::createProcess($options);
$process->setEnv(['APP_ENV' => 'worker']);
return $process;
}
}
Task Middleware:
$execution = $worker->submit(new LoggingTaskMiddleware(new FetchTask($url)));
Dynamic Worker Pools:
$pool = WorkerPool::create(2);
if ($load > 100) {
$pool->resize(8); // Scale up
}
Error Handling:
try {
return $this->run($channel, $cancellation);
} catch (Exception $e) {
$channel->send(['error' => $e->getMessage()]);
throw $e;
}
Laravel Service Providers:
WorkerPool to the container:
$this->app->singleton(WorkerPool::class, fn () => WorkerPool::create(4));
$pool = app(WorkerPool::class);
How can I help you explore Laravel packages today?