Install the Bundle
composer require boutdecode/etl-core-bundle
Register the bundle in config/bundles.php if not using Symfony Flex.
Define Entities
Extend the provided abstract classes for Workflow, Step, Pipeline, StepHistory, and PipelineHistory (see README.md for examples). Run migrations:
php bin/console doctrine:migrations:diff
php bin/console doctrine:migrations:migrate
Create a Custom Step
Implement a step (e.g., CsvExtractorStep) by extending AbstractExtractorStep and tagging it with #[AsExecutableStep]:
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;
#[AsExecutableStep(code: 'csv_extractor', configurationDescription: [
'file_path' => 'Path to the CSV file',
])]
class CsvExtractorStep extends AbstractExtractorStep
{
public function process(Context $context): void
{
$filePath = $context->getConfigurationValue('file_path');
$data = $this->extractCsvData($filePath);
$context->setResult($data);
}
}
Define a Workflow
Create a Workflow entity with steps (e.g., csv_extractor → data_transformer → database_loader). Example:
$workflow = new Workflow('import_csv');
$workflow->addStep(new Step('csv_extractor', $workflow, ['file_path' => '/path/to/file.csv']));
$workflow->addStep(new Step('data_transformer', $workflow));
$workflow->addStep(new Step('database_loader', $workflow));
$entityManager->persist($workflow);
$entityManager->flush();
Run a Pipeline Dispatch a pipeline from a command/query handler:
use BoutDeCode\ETLCoreBundle\Run\Application\Operation\Command\RunPipelineCommand;
$command = new RunPipelineCommand(
workflowId: $workflow->getId(),
input: ['file_path' => '/path/to/file.csv']
);
$this->commandBus->dispatch($command);
Modular Steps
Break ETL logic into small, reusable steps (e.g., CsvExtractorStep, DataValidatorStep). Each step should handle one responsibility and communicate via Context.
Configuration-Driven
Use configurationDescription in #[AsExecutableStep] to document step parameters (e.g., file_path, timeout). Example:
#[AsExecutableStep(code: 'api_loader', configurationDescription: [
'api_url' => 'API endpoint URL',
'auth_token' => 'Authentication token',
])]
Step Chaining
Define step order in Workflow::stepConfiguration (array of Step objects with order property). Example:
$workflow->setStepConfiguration([
['code' => 'csv_extractor', 'configuration' => ['file_path' => '/data.csv']],
['code' => 'data_transformer'],
['code' => 'database_loader'],
]);
Synchronous vs. Asynchronous
RunPipelineCommand directly in a controller or service.AsyncCommand (e.g., RunPipelineAsyncCommand) for background processing via Symfony Messenger.Middleware Integration Add pipeline/step middleware to log, validate, or retry steps. Example:
use BoutDeCode\ETLCoreBundle\Run\Domain\Middleware\Middleware;
class LoggingMiddleware implements Middleware
{
public function __invoke(Context $context, callable $next): void
{
\Log::info('Step started', ['step' => $context->getStepCode()]);
$next($context);
\Log::info('Step completed', ['step' => $context->getStepCode()]);
}
}
Tag the middleware in services.yaml:
services:
App\Middleware\LoggingMiddleware:
tags:
- { name: boutdecode_etl_core.step_middleware, priority: 50 }
Error Handling
Use StepMiddleware to catch exceptions and update StepHistory status:
class ErrorHandlingMiddleware implements Middleware
{
public function __invoke(Context $context, callable $next): void
{
try {
$next($context);
} catch (\Throwable $e) {
$context->setError($e);
$context->setStatus(StepHistoryStatusEnum::FAILED);
}
}
}
Symfony Scheduler
Schedule pipelines using Symfony’s scheduler component. Example:
# config/packages/scheduler.yaml
framework:
scheduler:
cron:
'import_csv_daily':
command: 'app:run-pipeline {workflow_id} {input}'
schedule: '0 0 * * *'
arguments:
workflow_id: '%env(CSV_WORKFLOW_ID)%'
input: '{"file_path": "/data/daily.csv"}'
Dynamic Scheduling
Use PipelineScheduler to trigger pipelines programmatically:
$scheduler = $this->container->get('boutdecode_etl_core.scheduler');
$scheduler->schedule(
workflowId: $workflow->getId(),
input: ['file_path' => '/data/export.csv'],
scheduledAt: new \DateTimeImmutable('+1 hour')
);
Step Resolution Failures
StepResolver cannot find an ExecutableStep for a given code.#[AsExecutableStep] and the code matches exactly. Verify the service is autowired:
php bin/console debug:container boutdecode_etl_core.executable_step
Context Data Loss
Context is lost between steps.setResult() in the current step and getInput()/getResult() in the next. Example:
// Step 1: Extract
$context->setResult($extractedData);
// Step 2: Transform
$data = $context->getInput(); // or $context->getResult() if chained
Circular Dependencies
StepA calls StepB).Context for data passing and let the workflow manage the order.Middleware Ordering
priority in the tag (higher = earlier). Default range: -100 to 100.Workflow Configuration Overrides
Pipeline::overrideStepConfiguration() to merge or replace step configs:
$pipeline->overrideStepConfiguration([
'csv_extractor' => ['file_path' => '/new/path.csv'],
]);
Check Pipeline Status
Query the Pipeline entity for status:
$pipeline = $entityManager->getRepository(Pipeline::class)
->findOneBy(['id' => $pipelineId]);
echo $pipeline->getStatus()->value; // PENDING, IN_PROGRESS, COMPLETED, FAILED
Inspect Step History
Review StepHistory for errors:
$stepHistory = $entityManager->getRepository(StepHistory::class)
->findOneBy(['pipelineHistory' => $pipelineHistory]);
if ($stepHistory->getError()) {
echo $stepHistory->getError()->getMessage();
}
Enable Messenger Debugging Log dispatched messages:
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'BoutDeCode\ETLCoreBundle\Run\Application\Operation\Command\RunPipelineCommand': async
failure_transport: failed
transports:
failed: 'doctrine://default?queue_name=failed'
Validate Workflow Configuration Use `
How can I help you explore Laravel packages today?