sanmai/pipeline
sanmai/pipeline is a lightweight PHP pipeline library to process data through a chain of stages. Compose reusable, testable transformations with clear input/output flow, and plug in custom middleware-like steps for flexible processing in any app.
This section explores advanced techniques for building sophisticated, maintainable, and scalable data processing pipelines.
One of the most powerful features of the library is the ability to compose complex pipelines from smaller, reusable components. This is achieved by encapsulating business logic into separate classes or functions, which can then be chained together.
By creating dedicated classes for pipeline operations, you can build a library of reusable, testable components.
Example: A UserProcessor Class
namespace App\Pipeline\Components;
class UserProcessor
{
public static function filterActive(array $user): bool
{
return ($user['active'] ?? false) === true;
}
public static function normalize(array $user): array
{
return [
...$user,
'name' => ucwords(strtolower(trim($user['name'] ?? ''))),
'email' => strtolower(trim($user['email'] ?? '')),
];
}
}
These components can then be used to build a clean and readable pipeline:
use App\Pipeline\Components\UserProcessor;
use function Pipeline\take;
$processedUsers = take($rawUsers)
->filter(UserProcessor::filterActive(...))
->map(UserProcessor::normalize(...))
->toList();
This approach offers several advantages:
For operations that require state to be maintained between elements, you can use a class to encapsulate the state.
Example: A ChangeDetector
class ChangeDetector
{
private $previous = null;
public function detect($value): ?array
{
if ($this->previous === null) {
$this->previous = $value;
return null;
}
$change = $value - $this->previous;
$this->previous = $value;
return ['value' => $value, 'change' => $change];
}
}
$detector = new ChangeDetector();
$changes = take($prices)
->map($detector->detect(...))
->filter() // Remove the first null
->toList();
For pipelines that may encounter errors, you can create a wrapper to handle exceptions gracefully.
Example: A SafeProcessor
class SafeProcessor
{
private array $errors = [];
public function transform(callable $transformer): callable
{
return function ($item) use ($transformer) {
try {
return ['success' => true, 'data' => $transformer($item)];
} catch (\Exception $e) {
$this->errors[] = ['item' => $item, 'error' => $e->getMessage()];
return ['success' => false, 'data' => null];
}
};
}
public function getErrors(): array
{
return $this->errors;
}
}
$processor = new SafeProcessor();
$results = take($inputs)
->map($processor->transform(fn($item) => process($item)))
->filter(fn($result) => $result['success'])
->map(fn($result) => $result['data'])
->toList();
$errors = $processor->getErrors();
Here's a real-world example of handling errors when processing API responses:
use function Pipeline\take;
// Simulate API responses with potential failures
$apiResponses = [
['url' => '/users/1', 'data' => '{"id":1,"name":"Alice"}'],
['url' => '/users/2', 'data' => 'invalid json'],
['url' => '/users/3', 'data' => '{"id":3,"name":"Charlie"}'],
['url' => '/users/4', 'data' => null], // Failed request
];
// Process with error collection
$validUsers = [];
$errors = [];
take($apiResponses)
->map(function($response) use (&$errors) {
if ($response['data'] === null) {
$errors[] = ['url' => $response['url'], 'error' => 'Request failed'];
return null;
}
$decoded = json_decode($response['data'], true);
if (json_last_error() !== JSON_ERROR_NONE) {
$errors[] = ['url' => $response['url'], 'error' => 'Invalid JSON'];
return null;
}
return $decoded;
})
->filter() // Remove nulls
->each(function($user) use (&$validUsers) {
$validUsers[] = $user;
});
// $validUsers:
// [
// ['id' => 1, 'name' => 'Alice'],
// ['id' => 3, 'name' => 'Charlie'],
// ]
//
// $errors:
// [
// ['url' => '/users/2', 'error' => 'Invalid JSON'],
// ['url' => '/users/4', 'error' => 'Request failed'],
// ]
For nested data structures, you can use recursion to process the entire tree.
Example: A TreeProcessor
class TreeProcessor
{
public static function traverse(array $node, callable $processor): array
{
$result = $processor($node);
if (isset($node['children'])) {
$result['children'] = take($node['children'])
->map(fn($child) => self::traverse($child, $processor))
->toList();
}
return $result;
}
}
$processedTree = TreeProcessor::traverse($tree, fn($node) => [
...$node,
'processed' => true,
]);
How can I help you explore Laravel packages today?