Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Jobpipeline Laravel Package

stancl/jobpipeline

Turn any series of Laravel jobs into an event listener. Build pipelines that pull data from events, run jobs sequentially, and choose sync or queued execution (with optional queue name). Ideal for workflows like tenant setup, migrations, and seeding.

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Install the Package:
    composer require stancl/jobpipeline
    
  2. Define a Job Pipeline: Create a pipeline for a series of jobs (e.g., in EventServiceProvider or a dedicated service class):
    use Stancl\JobPipeline\JobPipeline;
    use App\Jobs\ProcessOrder;
    use App\Jobs\SendNotification;
    use App\Jobs\UpdateInventory;
    
    JobPipeline::make([
        ProcessOrder::class,
        SendNotification::class,
        UpdateInventory::class,
    ]);
    
  3. Bind to an Event: Attach the pipeline to an event (e.g., OrderPlaced):
    Event::listen(OrderPlaced::class, JobPipeline::make([
        ProcessOrder::class,
        SendNotification::class,
        UpdateInventory::class,
    ])->send(function ($event) {
        return $event->order; // Pass event data to jobs
    })->toListener());
    

First Use Case: Tenant Onboarding

Trigger a pipeline when a tenant is created:

// In EventServiceProvider
protected $listen = [
    TenantCreated::class => [
        JobPipeline::make([
            CreateDatabase::class,
            MigrateDatabase::class,
            SeedDatabase::class,
        ])->send(fn ($event) => $event->tenant)
         ->shouldBeQueued(true)
         ->toListener(),
    ],
];

Implementation Patterns

1. Event-Driven Job Chaining

Use pipelines to replace nested dispatch() calls in event handlers:

// Before (monolithic handler)
public function handle(OrderPlaced $event) {
    ProcessOrder::dispatch($event->order);
    SendNotification::dispatch($event->order);
    UpdateInventory::dispatch($event->order);
}

// After (declarative pipeline)
JobPipeline::make([
    ProcessOrder::class,
    SendNotification::class,
    UpdateInventory::class,
])->send(fn ($event) => $event->order)
->toListener();

2. Dynamic Data Flow

Pass event-specific data to jobs via send():

JobPipeline::make([JobA::class, JobB::class])
    ->send(function ($event) {
        return [
            'user_id' => $event->user->id,
            'metadata' => $event->metadata,
        ];
    });

3. Conditional Pipeline Execution

Cancel remaining jobs if a job returns false:

// In JobA::handle()
if ($this->order->is_cancelled) {
    return false; // Stops JobB and JobC
}

4. Queue Configuration

  • Default Queuing: Set globally:
    \Stancl\JobPipeline\JobPipeline::$shouldBeQueuedByDefault = true;
    
  • Per-Pipeline Queuing:
    ->shouldBeQueued(true, 'high-priority') // Queue + custom queue
    ->shouldBeQueued('low-priority')       // Shorthand for queued=true
    

5. Reusable Pipeline Services

Encapsulate pipelines in a service class for reuse:

// app/Services/TenantPipeline.php
class TenantPipeline {
    public static function createTenantPipeline(): JobPipeline {
        return JobPipeline::make([
            CreateDatabase::class,
            MigrateDatabase::class,
        ])->send(fn ($event) => $event->tenant);
    }
}

// Usage in EventServiceProvider
Event::listen(TenantCreated::class, TenantPipeline::createTenantPipeline()->toListener());

6. Error Handling

  • Job Failures: Failures in one job do not stop the pipeline by default (unless false is returned).
  • Closure Errors: Wrapped in try-catch to prevent pipeline crashes (see Gotchas).

Gotchas and Tips

Pitfalls

  1. Synchronous by Default:

    • Pipelines run synchronously unless explicitly queued.
    • Fix: Always call shouldBeQueued(true) for long-running jobs or high-load scenarios.
  2. Closure Errors:

    • Errors in the send() closure will crash the pipeline unless caught.
    • Fix: Use try-catch in the closure or ensure the closure is robust:
      ->send(function ($event) {
          try {
              return $event->tenant->load('roles')->first();
          } catch (\Exception $e) {
              Log::error("Pipeline data prep failed: " . $e->getMessage());
              return null; // Pipeline will fail gracefully
          }
      })
      
  3. Data Serialization:

    • Complex objects passed via send() must be serializable for queued jobs.
    • Fix: Use simple arrays or ensure jobs handle deserialization:
      ->send(fn ($event) => [
          'tenant_id' => $event->tenant->id,
          'name' => $event->tenant->name,
      ])
      
  4. Job Dependencies:

    • If JobB depends on JobA's output, ensure data is passed via send() or shared storage (e.g., database, cache).
    • Tip: Use Laravel’s bus to share data between jobs:
      // JobA
      bus()->dispatch(new JobB($this->processedData));
      
      // JobB
      public function handle($data) { ... }
      
  5. Queue Deadlocks:

    • Long-running pipelines may cause queue timeouts.
    • Fix: Use shouldBeQueued() with a dedicated queue and monitor with Laravel Horizon.

Debugging Tips

  1. Log Pipeline Execution: Add logging to jobs or use Laravel’s queue worker logging:

    // In JobA::handle()
    Log::debug("JobA executed for tenant: {$this->tenant->id}");
    
  2. Test Pipelines: Mock events and pipelines in tests:

    public function test_pipeline_execution() {
        $event = new TenantCreated(...);
        $pipeline = JobPipeline::make([JobA::class, JobB::class])
            ->send(fn () => $event->tenant);
    
        // Simulate execution
        $pipeline->toListener()($event);
    }
    
  3. Check Queue Workers: Ensure queue workers are running:

    php artisan queue:work --queue=high-priority
    

Extension Points

  1. Custom Pipeline Classes: Extend JobPipeline to add pre/post-processing:

    class CustomPipeline extends JobPipeline {
        public function withRetryLogic(): static {
            $this->onFailure(fn ($job, $exception) => retry($job, 3));
            return $this;
        }
    }
    
  2. Dynamic Job Lists: Build pipelines dynamically based on conditions:

    $jobs = [];
    if ($event->is_premium) {
        $jobs[] = PremiumSetupJob::class;
    }
    $jobs[] = CommonSetupJob::class;
    
    JobPipeline::make($jobs)->send(...);
    
  3. Middleware for Jobs: Use Laravel’s job middleware to add cross-cutting concerns:

    // app/Console/Kernel.php
    protected function schedule(Schedule $schedule) {
        $schedule->job(new ProcessOrder(...))
                ->middleware([LogExecution::class]);
    }
    

Configuration Quirks

  1. PHP 8.4 Compatibility:

    • The package supports PHP 8.4, but ensure your jobs and closures use nullable types correctly (e.g., shouldBeQueued(?string $queue = null)).
  2. Laravel Version:

    • Tested up to Laravel 13. For older versions, use v1.x branches.
    • Tip: Check the releases for version-specific notes.
  3. Static Defaults:

    • Changing \Stancl\JobPipeline\JobPipeline::$shouldBeQueuedByDefault affects all pipelines globally. Reset it if needed:
      \Stancl\JobPipeline\JobPipeline::$shouldBeQueuedByDefault = false;
      
Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
hamzi/corewatch
minionfactory/raw-hydrator
hexters/coinpayment
rjcodes/rjcms
act-training/laravel-permissions-manager
alimarchal/laravel-chart-of-accounts
babenkoivan/elastic-scout-driver
mkwebdesign/filament-watchdog-v5
renatomarinho/laravel-page-speed
zedmagdy/filament-business-hours
renatovdemoura/blade-elements-ui
devgeek/beacon-admin
benjamin-rqt/data-watcher-bundle
atriumphp/atrium
sandermuller/package-boost-laravel
sandermuller/boost-skills
redaxo/core
yusufgenc/filament-api-forge
l3aro/rating-star-for-filament
leek/filament-subtenant-scope