stancl/jobpipeline
Turn any series of Laravel jobs into an event listener. Build pipelines that pull data from events, run jobs sequentially, and choose sync or queued execution (with optional queue name). Ideal for workflows like tenant setup, migrations, and seeding.
composer require stancl/jobpipeline
EventServiceProvider or a dedicated service class):
use Stancl\JobPipeline\JobPipeline;
use App\Jobs\ProcessOrder;
use App\Jobs\SendNotification;
use App\Jobs\UpdateInventory;
JobPipeline::make([
ProcessOrder::class,
SendNotification::class,
UpdateInventory::class,
]);
OrderPlaced):
Event::listen(OrderPlaced::class, JobPipeline::make([
ProcessOrder::class,
SendNotification::class,
UpdateInventory::class,
])->send(function ($event) {
return $event->order; // Pass event data to jobs
})->toListener());
Trigger a pipeline when a tenant is created:
// In EventServiceProvider
protected $listen = [
TenantCreated::class => [
JobPipeline::make([
CreateDatabase::class,
MigrateDatabase::class,
SeedDatabase::class,
])->send(fn ($event) => $event->tenant)
->shouldBeQueued(true)
->toListener(),
],
];
Use pipelines to replace nested dispatch() calls in event handlers:
// Before (monolithic handler)
public function handle(OrderPlaced $event) {
ProcessOrder::dispatch($event->order);
SendNotification::dispatch($event->order);
UpdateInventory::dispatch($event->order);
}
// After (declarative pipeline)
JobPipeline::make([
ProcessOrder::class,
SendNotification::class,
UpdateInventory::class,
])->send(fn ($event) => $event->order)
->toListener();
Pass event-specific data to jobs via send():
JobPipeline::make([JobA::class, JobB::class])
->send(function ($event) {
return [
'user_id' => $event->user->id,
'metadata' => $event->metadata,
];
});
Cancel remaining jobs if a job returns false:
// In JobA::handle()
if ($this->order->is_cancelled) {
return false; // Stops JobB and JobC
}
\Stancl\JobPipeline\JobPipeline::$shouldBeQueuedByDefault = true;
->shouldBeQueued(true, 'high-priority') // Queue + custom queue
->shouldBeQueued('low-priority') // Shorthand for queued=true
Encapsulate pipelines in a service class for reuse:
// app/Services/TenantPipeline.php
class TenantPipeline {
public static function createTenantPipeline(): JobPipeline {
return JobPipeline::make([
CreateDatabase::class,
MigrateDatabase::class,
])->send(fn ($event) => $event->tenant);
}
}
// Usage in EventServiceProvider
Event::listen(TenantCreated::class, TenantPipeline::createTenantPipeline()->toListener());
false is returned).try-catch to prevent pipeline crashes (see Gotchas).Synchronous by Default:
shouldBeQueued(true) for long-running jobs or high-load scenarios.Closure Errors:
send() closure will crash the pipeline unless caught.try-catch in the closure or ensure the closure is robust:
->send(function ($event) {
try {
return $event->tenant->load('roles')->first();
} catch (\Exception $e) {
Log::error("Pipeline data prep failed: " . $e->getMessage());
return null; // Pipeline will fail gracefully
}
})
Data Serialization:
send() must be serializable for queued jobs.->send(fn ($event) => [
'tenant_id' => $event->tenant->id,
'name' => $event->tenant->name,
])
Job Dependencies:
JobB depends on JobA's output, ensure data is passed via send() or shared storage (e.g., database, cache).bus to share data between jobs:
// JobA
bus()->dispatch(new JobB($this->processedData));
// JobB
public function handle($data) { ... }
Queue Deadlocks:
shouldBeQueued() with a dedicated queue and monitor with Laravel Horizon.Log Pipeline Execution: Add logging to jobs or use Laravel’s queue worker logging:
// In JobA::handle()
Log::debug("JobA executed for tenant: {$this->tenant->id}");
Test Pipelines: Mock events and pipelines in tests:
public function test_pipeline_execution() {
$event = new TenantCreated(...);
$pipeline = JobPipeline::make([JobA::class, JobB::class])
->send(fn () => $event->tenant);
// Simulate execution
$pipeline->toListener()($event);
}
Check Queue Workers: Ensure queue workers are running:
php artisan queue:work --queue=high-priority
Custom Pipeline Classes:
Extend JobPipeline to add pre/post-processing:
class CustomPipeline extends JobPipeline {
public function withRetryLogic(): static {
$this->onFailure(fn ($job, $exception) => retry($job, 3));
return $this;
}
}
Dynamic Job Lists: Build pipelines dynamically based on conditions:
$jobs = [];
if ($event->is_premium) {
$jobs[] = PremiumSetupJob::class;
}
$jobs[] = CommonSetupJob::class;
JobPipeline::make($jobs)->send(...);
Middleware for Jobs: Use Laravel’s job middleware to add cross-cutting concerns:
// app/Console/Kernel.php
protected function schedule(Schedule $schedule) {
$schedule->job(new ProcessOrder(...))
->middleware([LogExecution::class]);
}
PHP 8.4 Compatibility:
shouldBeQueued(?string $queue = null)).Laravel Version:
v1.x branches.Static Defaults:
\Stancl\JobPipeline\JobPipeline::$shouldBeQueuedByDefault affects all pipelines globally. Reset it if needed:
\Stancl\JobPipeline\JobPipeline::$shouldBeQueuedByDefault = false;
How can I help you explore Laravel packages today?