stancl/jobpipeline
Turn any series of Laravel jobs into an event listener. Build pipelines that pull data from events, run jobs sequentially, and choose sync or queued execution (with optional queue name). Ideal for workflows like tenant setup, migrations, and seeding.
Install the package:
composer require stancl/jobpipeline
Define a job pipeline in your EventServiceProvider or a service class:
use Stancl\JobPipeline\JobPipeline;
use App\Jobs\ProcessOrder;
use App\Jobs\SendConfirmation;
use App\Jobs\UpdateInventory;
// Register pipeline for an event
Event::listen(\App\Events\OrderPlaced::class, function () {
return JobPipeline::make([
ProcessOrder::class,
UpdateInventory::class,
SendConfirmation::class,
])
->send(function (\App\Events\OrderPlaced $event) {
return $event->order; // Pass order data to jobs
})
->shouldBeQueued(true)
->toListener();
});
Trigger the pipeline by dispatching the event:
event(new OrderPlaced($order));
Convert a monolithic job into a pipeline for an OrderCreated event:
// Before: Single bloated job
// After: Modular pipeline
JobPipeline::make([
ValidateOrder::class,
ChargePayment::class,
FulfillOrder::class,
NotifyCustomer::class,
])
->send(fn (OrderCreated $event) => $event->order)
->shouldBeQueued('high-priority')
->toListener();
EventServiceProvider or dynamically via Event::listen().// Dynamic registration (e.g., in a service class)
Event::listen(OrderPlaced::class, JobPipeline::make([
ProcessPayment::class,
UpdateInventory::class,
])->send(fn ($event) => $event->order)->toListener());
$pipeline = JobPipeline::make([
ValidateOrder::class,
ChargePayment::class,
]);
if ($event->order->isPremium()) {
$pipeline->add(ApplyDiscount::class);
}
$pipeline->send(fn ($event) => $event->order)->toListener();
JobPipeline::make([...])
->shouldBeQueued('payments') // High-priority queue
->send(fn ($event) => $event->order)
->toListener();
send() closure.JobPipeline::make([...])
->send(fn (TenantCreated $event) => [
'tenantId' => $event->tenant->id,
'data' => $event->data,
])
->toListener();
false.// In ChargePaymentJob::handle()
if (!$payment->succeeded()) {
return false; // Stops pipeline execution
}
JobPipeline::fake() (if supported) or test individual jobs.public function test_order_pipeline()
{
$order = new Order();
$this->fake(); // Fake queues if using Laravel 10+
event(new OrderPlaced($order));
// Assert jobs were dispatched
ProcessOrder::assertDispatched();
ChargePayment::assertDispatched();
}
tenancy package integration to pass tenant context.JobPipeline::make([...])
->send(fn (TenantCreated $event) => [
'tenant' => $event->tenant,
'central' => CentralTenant::current(), // For multi-tenancy
])
->toListener();
try-catch or use Laravel’s HandleExceptions.try {
event(new OrderPlaced($order));
} catch (\Throwable $e) {
Log::error("Pipeline failed: " . $e->getMessage());
// Notify admin or retry logic
}
Queue Defaults:
JobPipeline::$shouldBeQueuedByDefault = true globally or use shouldBeQueued(true) per pipeline.Context Leakage:
Early Termination:
false from a job stops the entire pipeline. Use sparingly for critical failures.payment-pipeline, fulfillment-pipeline) for granular control.Error Propagation:
false).try-catch in jobs or wrap pipelines in error handlers.Queue Connections:
database, redis) are configured.shouldBeQueued('queue-name').Closure Errors:
send() closure (e.g., accessing undefined properties) will crash the pipeline.Testing Quirks:
fake() natively. Test by dispatching events and asserting job dispatches.assertDispatched() or mock queues manually.Log Pipeline Execution: Add logging in jobs to track progress:
public function handle()
{
Log::info("Executing {$this->job} with data: " . json_encode($this->data));
// Job logic
}
Inspect Queue Jobs:
Use php artisan queue:work --once to manually process jobs and debug failures.
Check Queue Names: Verify queue names with:
dd(config('queue.connections'));
Validate Context:
Ensure the send() closure returns the expected data structure. Use dd() to inspect:
->send(fn ($event) => dd($event->data)) // Debug context
Custom Pipeline Classes:
Extend JobPipeline to add methods (e.g., withRetry(), withTimeout()):
class CustomPipeline extends JobPipeline
{
public function withRetry(int $attempts): self
{
$this->retryAttempts = $attempts;
return $this;
}
}
Dynamic Job Resolution:
Override resolveJobs() to dynamically load jobs from a config or database:
protected function resolveJobs(array $jobClasses): array
{
return collect($jobClasses)->map(fn ($class) => new $class())->all();
}
Middleware for Jobs: Add middleware to jobs in the pipeline (e.g., logging, auth):
JobPipeline::make([...])
->withMiddleware([
new LogJobExecution(),
new ValidateTenant(),
])
->send(...);
Pipeline Events:
Listen for pipeline lifecycle events (e.g., JobPipelineStarting, JobPipelineCompleted) using Laravel’s Event system.
Global Defaults:
Override defaults in bootstrap/app.php or a service provider:
\Stancl\JobPipeline\JobPipeline::$shouldBeQueuedByDefault = true;
Queue Configuration:
Ensure your .env has the correct queue driver:
QUEUE_CONNECTION=redis
Job Dependencies:
If jobs require shared dependencies (e.g., a Tenant resolver), ensure they are available in the pipeline’s context.
job:batch to process jobs in chunks:
How can I help you explore Laravel packages today?