A Symfony Bundle providing a configurable ETL (Extract / Transform / Load) pipeline engine built on top of Domain-Driven Design, CQS, Symfony Messenger, Symfony Workflow and Flow-PHP.
| Dependency | Version |
|---|---|
| PHP | >= 8.2 |
| Symfony | ^6.4 || ^7.0 |
| Doctrine ORM | ^3.6 |
| Flow-PHP ETL | ~0.25 |
ETL is a data processing pattern split into three sequential stages:
| Stage | Role |
|---|---|
| Extract | Read raw data from a source (CSV file, API, database, …) |
| Transform | Filter, map, enrich or validate the extracted data |
| Load | Write the processed data to a destination (database, JSON file, …) |
Each stage is implemented as a Step — a single, focused unit of work. Steps are chained together so the output of one becomes the input of the next, flowing through a shared Context object.
These two terms look similar but represent fundamentally different things in this bundle:
Workflow — the reusable template
A Workflow is a named, static definition that describes what should happen:
stepConfiguration), each identified by a code that maps to a registered ExecutableStep serviceconfigurationA Workflow has no notion of time, data, or execution state. It never runs by itself. Think of it as a class or a recipe.
Pipeline — the execution instance
A Pipeline is a concrete, time-bound instance created from a Workflow. It represents one specific run:
pending → in_progress → completed / failed) managed by a Symfony Workflow state machinescheduledAt, startedAt, finishedAt)Think of it as an object instantiated from a class — or a ticket raised against a recipe.
Workflow ──createFromWorkflowId()──► Pipeline ──dispatch()──► execution
(template, reusable) (instance, stateful) (runtime)
Step — configuration vs execution
The same word "step" covers two distinct things:
| Concept | Where | Role |
|---|---|---|
Step (config) |
Stored with the Pipeline |
Carries code, order, and per-step configuration. A value object — no logic. |
ExecutableStep (service) |
Symfony DI container | Implements the actual ETL logic in process(Context). Tagged boutdecode_etl_core.executable_step. |
At runtime the StepResolver bridges the two: it looks up the ExecutableStep service whose tag matches Step::getCode(), clones it, applies the step configuration, and hands it to the execution chain.
composer require boutdecode/etl-core-bundle
If you are not using Symfony Flex, register the bundle manually:
// config/bundles.php
return [
// ...
BoutDeCode\ETLCoreBundle\BoutDeCodeETLCoreBundle::class => ['all' => true],
];
// config/packages/boutdecode_etl_core.yaml
imports:
- { resource: "@BoutDeCodeETLCoreBundle/Resources/config/config.yaml" }
No configuration is required. The bundle works out of the box with sensible defaults.
The bundle exposes no configurable keys under boutdecode_etl_core: — all service IDs, tags, and bus names are fixed constants defined by the bundle itself:
| Constant | Value |
|---|---|
| Command bus | boutdecode_etl_core.command.bus |
| Query bus | boutdecode_etl_core.query.bus |
| Executable step tag | boutdecode_etl_core.executable_step |
| Step middleware tag | boutdecode_etl_core.step_middleware |
| Pipeline middleware tag | boutdecode_etl_core.pipeline_middleware |
The bundle does not ship Doctrine entities. You must create them in your application and then generate the migrations.
The bundle provides abstract base classes to extend and interfaces to implement:
| What to create | Extends | Implements |
|---|---|---|
Workflow entity |
AbstractWorkflow |
— |
Step entity |
AbstractStep |
— |
Pipeline entity |
AbstractPipeline |
— |
StepHistory entity |
AbstractStepHistory |
— |
PipelineHistory entity |
AbstractPipelineHistory |
— |
Each abstract class holds all the typed properties and method implementations. The only thing left to add in the concrete entity is:
#[ORM\Entity] / #[ORM\Table] mapping.$id property with its getter (getId(): string), except for Step and history entities where you may choose any PK strategy.#[ORM\Column] etc. directly in the child class).// src/Entity/Workflow.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractWorkflow;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;
#[ORM\Entity]
#[ORM\Table(name: 'workflow')]
class Workflow extends AbstractWorkflow
{
#[ORM\Id]
#[ORM\Column(type: 'uuid', unique: true)]
private string $id;
#[ORM\Column]
protected string $name;
#[ORM\Column(nullable: true)]
protected ?string $description = null;
#[ORM\Column(type: 'json')]
protected array $stepConfiguration = [];
#[ORM\Column(type: 'json')]
protected array $configuration = [];
#[ORM\Column]
protected \DateTimeImmutable $createdAt;
#[ORM\Column(nullable: true)]
protected ?\DateTimeImmutable $updatedAt = null;
public function __construct(string $name)
{
$this->id = (string) Uuid::v7();
$this->name = $name;
$this->createdAt = new \DateTimeImmutable();
$this->stepConfiguration = [];
$this->configuration = [];
}
public function getId(): string
{
return $this->id;
}
}
// src/Entity/Step.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractStep;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;
#[ORM\Entity]
#[ORM\Table(name: 'step')]
class Step extends AbstractStep
{
#[ORM\Id]
#[ORM\Column(type: 'uuid', unique: true)]
private string $id;
#[ORM\ManyToOne(targetEntity: Workflow::class)]
#[ORM\JoinColumn(nullable: false)]
private Workflow $workflow;
#[ORM\Column(nullable: true)]
protected ?string $name = null;
#[ORM\Column]
protected string $code;
#[ORM\Column(type: 'json')]
protected array $configuration = [];
#[ORM\Column]
protected int $order = 0;
public function __construct(string $code, Workflow $workflow)
{
$this->id = (string) Uuid::v7();
$this->code = $code;
$this->workflow = $workflow;
}
public function getId(): string
{
return $this->id;
}
}
// src/Entity/Pipeline.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\AbstractPipeline;
use BoutDeCode\ETLCoreBundle\Core\Domain\Enum\PipelineStatus;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;
#[ORM\Entity]
#[ORM\Table(name: 'pipeline')]
class Pipeline extends AbstractPipeline
{
#[ORM\Id]
#[ORM\Column(type: 'uuid', unique: true)]
private string $id;
#[ORM\ManyToOne(targetEntity: Workflow::class)]
#[ORM\JoinColumn(nullable: false)]
protected Workflow $workflow;
#[ORM\OneToMany(targetEntity: Step::class, mappedBy: 'pipeline', cascade: ['persist'])]
#[ORM\OrderBy(['order' => 'ASC'])]
protected iterable $steps;
#[ORM\Column(type: 'json')]
protected array $configuration = [];
#[ORM\Column(type: 'json')]
protected array $input = [];
#[ORM\Column(enumType: PipelineStatus::class)]
protected PipelineStatus $status;
#[ORM\Column]
protected \DateTimeImmutable $createdAt;
#[ORM\Column(nullable: true)]
protected ?\DateTimeImmutable $scheduledAt = null;
#[ORM\Column(nullable: true)]
protected ?\DateTimeImmutable $startedAt = null;
#[ORM\Column(nullable: true)]
protected ?\DateTimeImmutable $finishedAt = null;
public function __construct(Workflow $workflow)
{
$this->id = (string) Uuid::v7();
$this->workflow = $workflow;
$this->status = PipelineStatus::PENDING;
$this->createdAt = new \DateTimeImmutable();
$this->steps = new ArrayCollection();
$this->runnableSteps = new ArrayCollection();
}
public function getId(): string
{
return $this->id;
}
}
// src/Entity/StepHistory.php
use BoutDeCode\ETLCoreBundle\Run\Domain\Model\AbstractStepHistory;
use BoutDeCode\ETLCoreBundle\Run\Domain\Enum\StepHistoryStatusEnum;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;
#[ORM\Entity]
#[ORM\Table(name: 'step_history')]
class StepHistory extends AbstractStepHistory
{
#[ORM\Id]
#[ORM\Column(type: 'uuid', unique: true)]
private string $id;
#[ORM\Column(enumType: StepHistoryStatusEnum::class)]
protected StepHistoryStatusEnum $status;
#[ORM\Column]
protected \DateTimeImmutable $createdAt;
#[ORM\Column(type: 'json', nullable: true)]
protected mixed $input = null;
#[ORM\Column(type: 'json', nullable: true)]
protected mixed $result = null;
public function __construct(StepHistoryStatusEnum $status, mixed $input, mixed $result)
{
$this->id = (string) Uuid::v7();
$this->status = $status;
$this->createdAt = new \DateTimeImmutable();
$this->input = $input;
$this->result = $result;
}
public function getId(): string
{
return $this->id;
}
}
// src/Entity/PipelineHistory.php
use BoutDeCode\ETLCoreBundle\Run\Domain\Model\AbstractPipelineHistory;
use BoutDeCode\ETLCoreBundle\Run\Domain\Enum\PipelineHistoryStatusEnum;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Pipeline as PipelineInterface;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Uid\Uuid;
#[ORM\Entity]
#[ORM\Table(name: 'pipeline_history')]
class PipelineHistory extends AbstractPipelineHistory
{
#[ORM\Id]
#[ORM\Column(type: 'uuid', unique: true)]
private string $id;
#[ORM\ManyToOne(targetEntity: Pipeline::class)]
#[ORM\JoinColumn(nullable: false)]
protected PipelineInterface $pipeline;
#[ORM\Column(enumType: PipelineHistoryStatusEnum::class)]
protected PipelineHistoryStatusEnum $status;
#[ORM\Column]
protected \DateTimeImmutable $createdAt;
#[ORM\Column(type: 'json', nullable: true)]
protected mixed $input = null;
#[ORM\Column(type: 'json', nullable: true)]
protected mixed $result = null;
#[ORM\OneToMany(targetEntity: StepHistory::class, mappedBy: 'pipelineHistory', cascade: ['persist'])]
protected iterable $stepHistories;
public function __construct(PipelineInterface $pipeline, PipelineHistoryStatusEnum $status, mixed $input, mixed $result)
{
$this->id = (string) Uuid::v7();
$this->pipeline = $pipeline;
$this->status = $status;
$this->createdAt = new \DateTimeImmutable();
$this->input = $input;
$this->result = $result;
$this->stepHistories = new ArrayCollection();
}
public function getId(): string
{
return $this->id;
}
}
Once all entities are created, generate and run the Doctrine migrations:
php bin/console doctrine:migrations:diff
php bin/console doctrine:migrations:migrate
src/
├── ETLCoreBundle.php # Bundle entry point
├── DependencyInjection/
│ ├── ETLCoreExtension.php # Loads services, exposes config parameters
│ └── Configuration.php # Config tree (boutdecode_etl_core:)
├── Resources/config/
│ ├── services.yaml # Service definitions & tagged iterators
│ ├── config.yaml # Root import (messenger + workflow)
│ └── packages/
│ ├── messenger.yaml # Buses & routing
│ └── workflow.yaml # pipeline_lifecycle state machine
├── Core/ # Central domain (Pipeline, Step, Context)
├── ETL/ # ETL logic (Extract, Transform, Load)
├── Run/ # Execution engine & middleware
└── CQS/ # Command / Query Separation
| Pattern | Where |
|---|---|
| Domain-Driven Design | */Domain/ layers |
| CQS (Command / Query Separation) | src/CQS/ |
| Middleware chain | Run/Domain/Middleware/ |
| Strategy (pluggable steps) | ETL/Domain/Model/ + ExecutableStep tag |
| State machine | pipeline_lifecycle Symfony Workflow |
#[AsExecutableStep]Every step class must carry the #[AsExecutableStep] attribute. It serves two purposes:
code — the unique machine identifier used to resolve the step at runtime (e.g. when a Workflow references a step by its code). It must be unique across the whole application.configurationDescription (optional) — a map of configuration key → human-readable description, returned by getConfigurationDescription(). Useful for documentation and introspection.use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;
#[AsExecutableStep(
code: 'app.extractor.my_csv',
configurationDescription: [
'source' => 'Absolute path to the CSV file',
'delimiter' => 'Field delimiter character (default: ",")',
],
)]
final class MyCsvExtractorStep extends AbstractExtractorStep
{
// …
}
Extend one of the three abstract base classes and implement the corresponding method. The Context $context parameter is always injected by the framework — you do not need to call process() yourself.
| Base class | Method to implement | Signature |
|---|---|---|
AbstractExtractorStep |
extract() |
extract(mixed $source, Context $context, array $configuration = []): mixed |
AbstractTransformerStep |
transform() |
transform(mixed $data, Context $context, array $configuration = []): mixed |
AbstractLoaderStep |
load() |
load(mixed $data, mixed $destination, Context $context, array $configuration = []): mixed |
The $configuration array is automatically populated from the step's entry in the pipeline configuration. Default values can also be injected via the constructor and stored in $this->configuration.
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractExtractorStep;
#[AsExecutableStep(
code: 'app.extractor.my_csv',
configurationDescription: [
'source' => 'Absolute path to the CSV file',
'delimiter' => 'Field delimiter character (default: ",")',
],
)]
final class MyCsvExtractorStep extends AbstractExtractorStep
{
public function __construct(
private readonly string $delimiter = ',',
) {}
public function extract(mixed $source, Context $context, array $configuration = []): array
{
$filePath = is_string($source) ? $source : ($configuration['source'] ?? '');
$delimiter = $configuration['delimiter'] ?? $this->delimiter;
// … read and return rows …
return [];
}
}
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractTransformerStep;
#[AsExecutableStep(
code: 'app.transformer.uppercase_name',
configurationDescription: [
'field' => 'Name of the field to uppercase (default: "name")',
],
)]
final class UppercaseNameTransformStep extends AbstractTransformerStep
{
public function transform(mixed $data, Context $context, array $configuration = []): mixed
{
if (! is_array($data)) {
return $data;
}
$field = $configuration['field'] ?? 'name';
return array_map(static function (array $row) use ($field): array {
if (isset($row[$field]) && is_string($row[$field])) {
$row[$field] = strtoupper($row[$field]);
}
return $row;
}, $data);
}
}
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Attribute\AsExecutableStep;
use BoutDeCode\ETLCoreBundle\ETL\Domain\Model\AbstractLoaderStep;
#[AsExecutableStep(
code: 'app.loader.csv_file',
configurationDescription: [
'destination' => 'Absolute path to the output CSV file',
],
)]
final class CsvFileLoadStep extends AbstractLoaderStep
{
public function load(mixed $data, mixed $destination, Context $context, array $configuration = []): bool
{
if (! is_string($destination)) {
throw new \InvalidArgumentException('Destination must be a file path string.');
}
// … write $data to $destination …
return true;
}
}
All classes that implement ExecutableStep (which all three abstract base classes do) are automatically tagged boutdecode_etl_core.executable_step via _instanceof — no manual service configuration is required as long as your class is picked up by Symfony's autowiring.
# config/services.yaml (standard Symfony service autodiscovery — nothing extra needed)
App\ETL\Step\:
resource: '../src/ETL/Step/'
autowire: true
autoconfigure: true
If for any reason you need to register a step explicitly:
# config/services.yaml
App\ETL\Step\MyCsvExtractorStep:
tags:
- { name: boutdecode_etl_core.executable_step }
Use the code declared in #[AsExecutableStep] as the step identifier in your Workflow's stepConfiguration:
$workflow->setStepConfiguration([
[
'code' => 'app.extractor.my_csv',
'name' => 'extract_customers',
'order' => 1,
'configuration' => [
'source' => '/data/customers.csv',
'delimiter' => ';',
],
],
[
'code' => 'app.transformer.uppercase_name',
'name' => 'normalize_names',
'order' => 2,
'configuration' => [
'field' => 'name',
],
],
[
'code' => 'app.loader.csv_file',
'name' => 'save_result',
'order' => 3,
'configuration' => [
'destination' => '/output/result.csv',
],
],
]);
At runtime, the StepResolver reads each step's code, finds the matching tagged service in the container, and injects the per-step configuration before execution.
Inject CommandBus and call dispatch() with any object implementing Command:
use BoutDeCode\ETLCoreBundle\CQS\Application\Operation\Command\CommandBus;
class MyService
{
public function __construct(private readonly CommandBus $commandBus) {}
public function doSomething(): void
{
$this->commandBus->dispatch(new MyCommand(/* ... */));
}
}
The bundle ships one built-in command: ExecuteWorkflowCommand. It takes a persisted pipeline ID and triggers the full middleware chain asynchronously.
PipelineFactoryThe bundle provides the PipelineFactory interface but no concrete implementation — you must provide one (typically a Doctrine-backed service):
// src/Factory/PipelineFactory.php
use BoutDeCode\ETLCoreBundle\Core\Domain\Factory\PipelineFactory as PipelineFactoryInterface;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Pipeline;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Step;
use BoutDeCode\ETLCoreBundle\Core\Domain\Data\Provider\WorkflowProvider;
use BoutDeCode\ETLCoreBundle\Core\Domain\Data\Persister\PipelinePersister;
final class PipelineFactory implements PipelineFactoryInterface
{
public function __construct(
private readonly WorkflowProvider $workflowProvider,
private readonly PipelinePersister $pipelinePersister,
) {}
public function create(array $steps = [], array $configuration = []): Pipeline
{
// build a Pipeline from a list of Step objects
// ...
}
/**
* @param array<string, mixed> $overrideConfiguration
* @param array<string, mixed> $input
*/
public function createFromWorkflowId(
string $workflowId,
array $overrideConfiguration = [],
array $input = [],
): Pipeline {
$workflow = $this->workflowProvider->findWorkflowByIdentifier($workflowId);
// build Pipeline from Workflow steps & config, then persist it
$pipeline = new \App\Entity\Pipeline($workflow);
// ... populate steps, configuration, input ...
return $this->pipelinePersister->create($pipeline);
}
}
The bundle's DataInterfaceAliasPass compiler pass automatically creates the DI alias as soon as your class is registered as a service — no manual wiring needed.
use BoutDeCode\ETLCoreBundle\Core\Domain\Factory\PipelineFactory;
final class StartImportHandler
{
public function __construct(
private readonly PipelineFactory $pipelineFactory,
) {}
public function handle(string $workflowId): string
{
$pipeline = $this->pipelineFactory->createFromWorkflowId(
workflowId: $workflowId,
overrideConfiguration: [
'extract_step' => ['file' => '/data/import.csv'],
],
input: ['source' => 'manual'],
);
// Pipeline is now persisted with PipelineStatus::PENDING
return $pipeline->getId();
}
}
use BoutDeCode\ETLCoreBundle\CQS\Application\Operation\Command\CommandBus;
use BoutDeCode\ETLCoreBundle\Run\Application\Operation\Command\ExecuteWorkflowCommand;
final class StartImportHandler
{
public function __construct(
private readonly PipelineFactory $pipelineFactory,
private readonly CommandBus $commandBus,
) {}
public function handle(string $workflowId): void
{
$pipeline = $this->pipelineFactory->createFromWorkflowId(
workflowId: $workflowId,
input: ['source' => 'manual'],
);
// ExecuteWorkflowCommand implements AsyncCommand:
// routed to an async Messenger transport if one is configured,
// otherwise handled synchronously.
$this->commandBus->dispatch(
new ExecuteWorkflowCommand(pipelineId: $pipeline->getId())
);
}
}
Note:
ExecuteWorkflowCommandimplementsAsyncCommand. If you configure a Symfony Messenger transport for theasyncrouting key the execution will be deferred to a worker. The pipeline must be inPipelineStatus::PENDING— if it is alreadyIN_PROGRESS,COMPLETED, orFAILEDthe handler returns silently without re-running it.
CommandBus::dispatch() returns the value produced by the handler (Context). You can inspect the results directly when running synchronously:
use BoutDeCode\ETLCoreBundle\Core\Domain\DTO\Context;
/** @var Context $context */
$context = $this->commandBus->dispatch(
new ExecuteWorkflowCommand(pipelineId: $pipeline->getId())
);
// Last result produced by the pipeline
$result = $context->getResult();
// Result keyed by step name
$extracted = $context->getResultByKey('extract_step');
// Check for step failures
$errors = $context->getErrors(); // array<string, mixed>
use BoutDeCode\ETLCoreBundle\Run\Domain\Middleware\Middleware;
use BoutDeCode\ETLCoreBundle\Core\Domain\Model\Context;
final class AuditPipelineMiddleware implements Middleware
{
public function process(Context $context, callable $next): Context
{
// before
$result = $next($context);
// after
return $result;
}
}
# config/services.yaml
App\Middleware\AuditPipelineMiddleware:
tags:
- { name: boutdecode_etl_core.pipeline_middleware, priority: 50 }
Same pattern, tag name: boutdecode_etl_core.step_middleware.
Built-in middleware priority reference:
| Middleware | Tag | Priority |
|---|---|---|
PipelineStartMiddleware |
pipeline | 100 |
PipelineFailureMiddleware |
pipeline | 1 |
PipelineProcessMiddleware |
pipeline | 0 |
PipelineHistoryMiddleware |
pipeline | -50 |
PipelineSuccessMiddleware |
pipeline | -100 |
StepStartMiddleware |
step | 100 |
StepFailureMiddleware |
step | 1 |
StepProcessMiddleware |
step | 0 |
StepHistoryMiddleware |
step | -50 |
StepSuccessMiddleware |
step | -100 |
# All tests
composer test
# Unit tests only
composer test:unit
# Integration tests only
composer test:integration
Current status: 394 unit tests, 3 integration tests — all passing.
MIT — see LICENSE.
Built with ❤️ by Boutdecode
How can I help you explore Laravel packages today?