sanmai/pipeline
sanmai/pipeline is a lightweight PHP pipeline library to process data through a chain of stages. Compose reusable, testable transformations with clear input/output flow, and plug in custom middleware-like steps for flexible processing in any app.
About This Documentation: This guide, primarily authored by an LLM with human oversight, is tailored to help both developers and LLMs grasp the library's design, best practices, and idiomatic usage. If you find any inconsistencies, please open an issue.
sanmai/pipeline is a PHP library for functional-style data processing, featuring lazy evaluation and a fluent, chainable interface. It brings the power of streaming pipelines, similar to the pipe operator (|>) in functional languages, to PHP.
The library is designed around lazy evaluation using PHP generators. This allows you to process datasets of any size—from small arrays to multi-gigabyte files or even infinite data streams—with minimal and predictable memory usage.
For optimal performance and memory safety, the recommended approach is to use iterable, streaming data sources, such as SplFileObject or custom generators. While the library includes convenience optimizations for in-memory arrays, these are best reserved for smaller datasets.
Pipeline\Standard ClassThe Pipeline\Standard class is the heart of the library, representing a data processing pipeline. All transformation methods return the same instance of the pipeline, making it inherently mutable. This design choice allows for a fluent, chainable interface where operations modify the pipeline in place, and subsequent operations will act on the modified state. This also implies that if you store a reference to a pipeline instance, any operations performed on that instance (or any other reference to it) will affect the same underlying pipeline.
Furthermore, a Pipeline\Standard instance can be initialized with another Pipeline\Standard instance, allowing for powerful chaining of complex data processing flows. This enables building modular pipelines where the output of one pipeline can seamlessly become the input of another.
$pipelineA = take([1, 2, 3]);
$pipelineB = $pipelineA; // $pipelineB now references the same pipeline as $pipelineA
$pipelineA->map(fn($x) => $x * 2); // This modifies the shared pipeline
// Both $pipelineA and $pipelineB will now yield [2, 4, 6]
var_dump($pipelineB->toList()); // Output: [2, 4, 6]
// Example of chaining pipelines
$firstPipeline = take(range(1, 5))->map(fn($n) => $n * 10);
$secondPipeline = new \Pipeline\Standard($firstPipeline);
$secondPipeline->filter(fn($n) => $n > 20);
// The second pipeline now processes the output of the first
var_dump($secondPipeline->toList()); // Output: [30, 40, 50]
The library employs a hybrid execution model to balance performance and memory efficiency:
toList() or each()) is called.filter(), cast(), chunk(), slice()) have an eager "fast path" that operates on the entire array at once. This can be faster for small arrays but may consume significant memory with larger ones.stream() Method: To ensure memory safety with large arrays, use the stream() method to convert an array into a generator, forcing all subsequent operations to be lazy.use function Pipeline\take;
// This pipeline will:
// 1. Take numbers from 1 to 100
// 2. Filter for even numbers
// 3. Square each number
// 4. Take the first 10 results
// 5. Sum the results
$result = take(range(1, 100))
->filter(fn($n) => $n % 2 === 0)
->map(fn($n) => $n ** 2)
->slice(0, 10)
->reduce(fn($a, $b) => $a + $b); // Returns 220
// Example with multiple data sources
$pipeline = take([1, 2, 3])
->append([4, 5, 6])
->prepend([0])
->map(fn($x) => $x * 2)
->toList(); // Returns [0, 2, 4, 6, 8, 10, 12]
composer require sanmai/pipeline
use Pipeline\Standard;
use function Pipeline\take;
use function Pipeline\map;
// From a variable
$pipeline = new Standard($data);
// Using the helper function
$pipeline = take($data);
// From a generator
$pipeline = map(function() {
yield 1;
yield 2;
yield 3;
});
// Chaining operations
$result = $pipeline
->filter($predicate)
->map($transformer)
->fold($initial, $reducer);
Process large files with minimal memory footprint:
$lineCount = take(new SplFileObject('huge.log'))
->filter(fn($line) => strpos($line, 'ERROR') !== false)
->runningCount($count)
->each(fn($line) => error_log($line));
echo "Processed $count error lines\n";
All non-terminal operations return $this, allowing for fluent method chaining:
$pipeline
->filter() // Returns $this
->map() // Returns $this
->chunk(100) // Returns $this
->flatten() // Returns $this
->reduce(); // Terminal operation, returns a value
map(), filter(), slice()).reduce(), fold(), toList(), each()).The library is designed to be robust and fault-tolerant:
map() callbacks are not executed on an empty pipeline.stream(): For optimal memory efficiency, especially with large arrays, always use the stream() method to explicitly convert an array into a generator. This forces all subsequent operations to be lazy, preventing the entire dataset from being loaded into memory.toList() or toAssoc() over iterator_to_array() when converting the pipeline to an array, as they handle key preservation and ensure all values are returned.runningCount() to count items within the pipeline without triggering a full consumption of the stream, which would terminate the pipeline.filter(), cast(), slice(), and chunk() have array-optimized paths. While faster for small arrays, these paths can consume significant memory if the pipeline contains a large array and stream() has not been called.slice() with Negative Offsets/Lengths: When using slice() with negative offset or length values on a non-array pipeline (i.e., a generator-based pipeline), the library must buffer elements to determine the starting or ending point. For very large streams, this can lead to increased memory consumption. If memory is a concern, consider alternative approaches that avoid negative offset or length values, or ensure the input is a finite, manageable size.each() Method: The each() method eagerly iterates over the entire pipeline. By default, it also discards the pipeline's internal iterator after consumption. If you need to re-iterate the pipeline, you must explicitly set the $discard parameter to false in each(), or re-initialize the pipeline.toList() to get an array with all values (keys ignored) or toAssoc() to get an associative array (keys preserved). The toArray() method now requires a boolean parameter; toArrayPreservingKeys() has been removed.runningCount() to count items without terminating the pipeline.filter(), cast(), slice(), and chunk() have array-optimized paths.How can I help you explore Laravel packages today?