Weave Code
Code Weaver
Helps Laravel developers discover, compare, and choose open-source packages. See popularity, security, maintainers, and scores at a glance to make better decisions.
Feedback
Share your thoughts, report bugs, or suggest improvements.
Subject
Message

Etl Core Bundle Laravel Package

boutdecode/etl-core-bundle

View on GitHub
Deep Wiki
Context7

Getting Started

Minimal Setup

  1. Install the Bundle

    composer require boutdecode/etl-core-bundle
    

    Register the bundle in config/bundles.php if not using Symfony Flex.

  2. Define Entities Extend the provided abstract classes for Workflow, Step, Pipeline, StepHistory, and PipelineHistory (see README.md for examples). Run migrations:

    php bin/console doctrine:migrations:diff
    php bin/console doctrine:migrations:migrate
    
  3. Create a Custom Step Implement a step (e.g., CsvExtractorStep) by extending AbstractExtractorStep and tagging it with #[AsExecutableStep]:

    use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
    use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;
    
    #[AsExecutableStep(code: 'csv_extractor', configurationDescription: [
        'file_path' => 'Path to the CSV file',
    ])]
    class CsvExtractorStep extends AbstractExtractorStep
    {
        public function process(Context $context): void
        {
            $filePath = $context->getConfigurationValue('file_path');
            $data = $this->extractCsvData($filePath);
            $context->setResult($data);
        }
    }
    
  4. Define a Workflow Create a Workflow entity with steps (e.g., csv_extractordata_transformerdatabase_loader). Example:

    $workflow = new Workflow('import_csv');
    $workflow->addStep(new Step('csv_extractor', $workflow, ['file_path' => '/path/to/file.csv']));
    $workflow->addStep(new Step('data_transformer', $workflow));
    $workflow->addStep(new Step('database_loader', $workflow));
    $entityManager->persist($workflow);
    $entityManager->flush();
    
  5. Run a Pipeline Dispatch a pipeline from a command/query handler:

    use BoutDeCode\ETLCoreBundle\Run\Application\Operation\Command\RunPipelineCommand;
    
    $command = new RunPipelineCommand(
        workflowId: $workflow->getId(),
        input: ['file_path' => '/path/to/file.csv']
    );
    $this->commandBus->dispatch($command);
    

Implementation Patterns

Workflow Design

  1. Modular Steps Break ETL logic into small, reusable steps (e.g., CsvExtractorStep, DataValidatorStep). Each step should handle one responsibility and communicate via Context.

  2. Configuration-Driven Use configurationDescription in #[AsExecutableStep] to document step parameters (e.g., file_path, timeout). Example:

    #[AsExecutableStep(code: 'api_loader', configurationDescription: [
        'api_url' => 'API endpoint URL',
        'auth_token' => 'Authentication token',
    ])]
    
  3. Step Chaining Define step order in Workflow::stepConfiguration (array of Step objects with order property). Example:

    $workflow->setStepConfiguration([
        ['code' => 'csv_extractor', 'configuration' => ['file_path' => '/data.csv']],
        ['code' => 'data_transformer'],
        ['code' => 'database_loader'],
    ]);
    

Execution Patterns

  1. Synchronous vs. Asynchronous

    • Synchronous: Dispatch RunPipelineCommand directly in a controller or service.
    • Asynchronous: Use AsyncCommand (e.g., RunPipelineAsyncCommand) for background processing via Symfony Messenger.
  2. Middleware Integration Add pipeline/step middleware to log, validate, or retry steps. Example:

    use BoutDeCode\ETLCoreBundle\Run\Domain\Middleware\Middleware;
    
    class LoggingMiddleware implements Middleware
    {
        public function __invoke(Context $context, callable $next): void
        {
            \Log::info('Step started', ['step' => $context->getStepCode()]);
            $next($context);
            \Log::info('Step completed', ['step' => $context->getStepCode()]);
        }
    }
    

    Tag the middleware in services.yaml:

    services:
        App\Middleware\LoggingMiddleware:
            tags:
                - { name: boutdecode_etl_core.step_middleware, priority: 50 }
    
  3. Error Handling Use StepMiddleware to catch exceptions and update StepHistory status:

    class ErrorHandlingMiddleware implements Middleware
    {
        public function __invoke(Context $context, callable $next): void
        {
            try {
                $next($context);
            } catch (\Throwable $e) {
                $context->setError($e);
                $context->setStatus(StepHistoryStatusEnum::FAILED);
            }
        }
    }
    

Scheduling

  1. Symfony Scheduler Schedule pipelines using Symfony’s scheduler component. Example:

    # config/packages/scheduler.yaml
    framework:
        scheduler:
            cron:
                'import_csv_daily':
                    command: 'app:run-pipeline {workflow_id} {input}'
                    schedule: '0 0 * * *'
                    arguments:
                        workflow_id: '%env(CSV_WORKFLOW_ID)%'
                        input: '{"file_path": "/data/daily.csv"}'
    
  2. Dynamic Scheduling Use PipelineScheduler to trigger pipelines programmatically:

    $scheduler = $this->container->get('boutdecode_etl_core.scheduler');
    $scheduler->schedule(
        workflowId: $workflow->getId(),
        input: ['file_path' => '/data/export.csv'],
        scheduledAt: new \DateTimeImmutable('+1 hour')
    );
    

Gotchas and Tips

Common Pitfalls

  1. Step Resolution Failures

    • Issue: StepResolver cannot find an ExecutableStep for a given code.
    • Fix: Ensure the step class is tagged with #[AsExecutableStep] and the code matches exactly. Verify the service is autowired:
      php bin/console debug:container boutdecode_etl_core.executable_step
      
  2. Context Data Loss

    • Issue: Data passed via Context is lost between steps.
    • Fix: Always call setResult() in the current step and getInput()/getResult() in the next. Example:
      // Step 1: Extract
      $context->setResult($extractedData);
      
      // Step 2: Transform
      $data = $context->getInput(); // or $context->getResult() if chained
      
  3. Circular Dependencies

    • Issue: Steps depend on each other directly (e.g., StepA calls StepB).
    • Fix: Avoid direct step-to-step calls. Use Context for data passing and let the workflow manage the order.
  4. Middleware Ordering

    • Issue: Middleware runs in unexpected order.
    • Fix: Explicitly set priority in the tag (higher = earlier). Default range: -100 to 100.
  5. Workflow Configuration Overrides

    • Issue: Pipeline-level step configuration overrides workflow defaults incorrectly.
    • Fix: Use Pipeline::overrideStepConfiguration() to merge or replace step configs:
      $pipeline->overrideStepConfiguration([
          'csv_extractor' => ['file_path' => '/new/path.csv'],
      ]);
      

Debugging Tips

  1. Check Pipeline Status Query the Pipeline entity for status:

    $pipeline = $entityManager->getRepository(Pipeline::class)
        ->findOneBy(['id' => $pipelineId]);
    echo $pipeline->getStatus()->value; // PENDING, IN_PROGRESS, COMPLETED, FAILED
    
  2. Inspect Step History Review StepHistory for errors:

    $stepHistory = $entityManager->getRepository(StepHistory::class)
        ->findOneBy(['pipelineHistory' => $pipelineHistory]);
    if ($stepHistory->getError()) {
        echo $stepHistory->getError()->getMessage();
    }
    
  3. Enable Messenger Debugging Log dispatched messages:

    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async: '%env(MESSENGER_TRANSPORT_DSN)%'
            routing:
                'BoutDeCode\ETLCoreBundle\Run\Application\Operation\Command\RunPipelineCommand': async
            failure_transport: failed
            transports:
                failed: 'doctrine://default?queue_name=failed'
    
  4. Validate Workflow Configuration Use `

Weaver

How can I help you explore Laravel packages today?

Conversation history is not saved when not logged in.
Prompt
Add packages to context
No packages found.
emuniq/filament-browser-notifications
syriable/filament-translator
hungnm28/livewire-form
wenprise/eloquent
crudly/encrypted
fadion/bouncy
cuci/prototurk-sdk
gos/pubsub-router-bundle
cuci/prototurk-sdk-symfony
clementtalleu/easyadmin-markdown-bundle
codeflextech/permission-manager
karnoweb/livewire-datepicker
sayedenam/sayed-dashboard
milito/query-filter
apiboxsym/user-bundle
apiboxsym/health-check-bundle
jayeshmepani/jpl-moshier-ephemeris-php
elnasnato/laraliveui
labrodev/rest-sdk
sampaui/sampaui