Skip to content
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
28c8b84
Implement window()
sanmai Dec 21, 2025
e10b4b1
display these
sanmai Dec 21, 2025
aac1e9b
tests
sanmai Dec 21, 2025
5149a5f
Merge branch 'main' into feat/window-buffer
sanmai Dec 21, 2025
1f33dad
defer
sanmai Dec 21, 2025
4e8deb4
Merge branch 'feat/window-buffer' of github.com:sanmai/pipeline into …
sanmai Dec 21, 2025
00e36ac
Cleanup
sanmai Dec 21, 2025
f8f3ebb
Improve tests
sanmai Dec 21, 2025
21d382e
lazy initialization only in valid()
sanmai Dec 21, 2025
29a6146
cs
sanmai Dec 21, 2025
71b4ba4
forward
sanmai Dec 21, 2025
8f2c2f8
simplify
sanmai Dec 21, 2025
78beb83
simplify
sanmai Dec 21, 2025
a27c5a0
dirty generator
sanmai Dec 21, 2025
7dd4258
FixedLengthList
sanmai Dec 21, 2025
697e379
Revert "FixedLengthList"
sanmai Dec 21, 2025
9370ff8
simplify
sanmai Dec 21, 2025
46d8373
Document that getIterator() returns only Iterator
sanmai Dec 21, 2025
1481229
Merge branch 'main' into feat/window-buffer
sanmai Dec 21, 2025
210bd1f
add count() and countable
sanmai Dec 22, 2025
f203ad1
remove tailKey
sanmai Dec 22, 2025
09aa3cc
Merge branch 'main' into feat/window-buffer
sanmai Jan 9, 2026
a5fc017
Merge branch 'main' into feat/window-buffer
sanmai Jan 9, 2026
f3a63db
Refactor WindowTest to use shared TestCase helpers
sanmai Jan 9, 2026
9c45d3b
Merge branch 'main' into feat/window-buffer
sanmai Jan 11, 2026
5156ca8
Add WindowIterator test to kill timeout mutations
sanmai Jan 12, 2026
7f1bdca
Merge branch 'main' into feat/window-buffer
sanmai Jan 16, 2026
b7ef96b
Merge branch 'main' into feat/window-buffer
sanmai Jan 16, 2026
33a0e98
Merge branch 'main' into feat/window-buffer
sanmai Jan 23, 2026
5182359
Merge branch 'main' into feat/window-buffer
sanmai Jan 25, 2026
b86b3a4
cs
sanmai Jan 25, 2026
91634a2
cs
sanmai Jan 25, 2026
5e9db4b
Apply suggestion from @sanmai
sanmai Jan 26, 2026
12a9dbd
coverage fix
sanmai Jan 26, 2026
e467548
Merge branch 'main' into feat/window-buffer
sanmai Jan 26, 2026
de91be1
Merge branch 'main' into feat/window-buffer
sanmai Jan 27, 2026
d97d06a
Merge branch 'feat/window-buffer' of github.com:sanmai/pipeline into …
sanmai Jan 29, 2026
d1d0bd7
remove unlimited
sanmai Jan 29, 2026
eb033da
fix
sanmai Jan 29, 2026
ed597ba
docs
sanmai Jan 29, 2026
d495c42
unwind
sanmai Jan 29, 2026
c29a3e7
internal
sanmai Feb 2, 2026
c9fdfa2
SPL
sanmai Feb 2, 2026
eecad8d
simplify
sanmai Feb 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ In general, Pipeline instances are mutable, meaning every Pipeline-returning met
// Exception: Cannot traverse an already closed generator
```

Although there are some cases where a pipeline can be rewound and reused just like a regular array, if you need to pause iteration and continue later, or if you need to iterate strictly once without accidental resets or exceptions, use [`$pipeline->cursor()`](#pipeline-cursor).
Although there are some cases where a pipeline can be rewound and reused just like a regular array, if you need to pause iteration and continue later, or if you need to iterate strictly once without accidental resets or exceptions, use [`$pipeline->cursor()`](#pipeline-cursor). If you need to rewind and replay seen elements, use [`$pipeline->window()`](#pipeline-window).

- Pipeline implements `IteratorAggregate` which is not the same as `Iterator`. Where the latter needed, the pipeline can be wrapped with an `IteratorIterator`:

Expand Down Expand Up @@ -490,6 +490,46 @@ foreach ($cursor as $value) {
$remaining = \Pipeline\take($cursor)->count();
```

## `$pipeline->window()`

Returns a rewindable iterator that caches elements for replay. Unlike `cursor()` which is forward-only, `window()` buffers seen elements allowing rewind within the buffer bounds:

```php
$pipeline = \Pipeline\fromArray([1, 2, 3, 4, 5]);
$window = $pipeline->window();

foreach ($window as $value) {
echo $value; // 1, 2, 3
if ($value === 3) {
break;
}
}

// Rewind and replay from beginning
$window->rewind();
foreach ($window as $value) {
echo $value; // 1, 2, 3, 4, 5 (full replay)
}
```

With a size limit, oldest elements are dropped (sliding window):

```php
$window = $pipeline->window(3); // Keep last 3 elements

foreach ($window as $value) {
if ($value === 4) {
break; // Saw: 1, 2, 3, 4
}
}

// Rewind - oldest element (1) was dropped
$window->rewind();
foreach ($window as $value) {
echo $value; // 2, 3, 4, 5 (buffer had [2,3,4], then continues)
}
```

## `$pipeline->runningVariance()`

Computes online statistics for the sequence: counts, sample mean, sample variance, standard deviation. You can access these numbers on the fly with methods such as `getCount()`, `getMean()`, `getVariance()`, `getStandardDeviation()`.
Expand Down
160 changes: 160 additions & 0 deletions src/Helper/WindowIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
<?php

/**
* Copyright 2017, 2018 Alexey Kopytko <alexey@kopytko.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

declare(strict_types=1);

namespace Pipeline\Helper;

use function count;

use Countable;
use Iterator;
use Override;

/**
* A rewindable iterator that caches elements for replay.
*
* Unlike CursorIterator which is forward-only, WindowIterator buffers
* seen elements allowing rewind within the buffer bounds.
*
* @template TKey
* @template TValue
*
* @implements Iterator<TKey, TValue>
*
* @final
*/
class WindowIterator implements Iterator, Countable
{
/** @var array<int, array{TKey, TValue}> */
private array $buffer = [];

private int $headKey = 0;

private int $position = 0;

private bool $innerExhausted = false;

private bool $initialized = false;

/**
* @param Iterator<TKey, TValue> $inner
* @param int<1, max> $maxSize Maximum buffer size
*/
public function __construct(
private readonly Iterator $inner,
private readonly int $maxSize
) {}

#[Override]
public function current(): mixed
{
if (!$this->valid()) {
return null;
}

return $this->buffer[$this->headKey + $this->position][1];
}

#[Override]
public function key(): mixed
{
if (!$this->valid()) {
return null;
}

return $this->buffer[$this->headKey + $this->position][0];
}

#[Override]
public function next(): void
{
++$this->position;

// If still within buffer or inner exhausted, nothing to fetch
if ($this->position < $this->count() || $this->innerExhausted) {
return;
}

$this->fetch();

while ($this->count() > $this->maxSize) {
unset($this->buffer[$this->headKey]);
++$this->headKey;
--$this->position;
}
}

#[Override]
public function rewind(): void
{
$this->position = 0;
}

#[Override]
public function valid(): bool
{
$this->initialize();

return $this->position < $this->count();
}

private function initialize(): void
{
if ($this->initialized) {
return;
}
$this->initialized = true;

// If inner is already pointing at data (e.g. a started generator), capture it
if ($this->inner->valid()) {
$this->pushFromInner();

return;
}

$this->fetch(rewind: true);
}

private function fetch(bool $rewind = false): void
{
match ($rewind) {
false => $this->inner->next(),
true => $this->inner->rewind(),
};

if (!$this->inner->valid()) {
$this->innerExhausted = true;

return;
}

$this->pushFromInner();
}

private function pushFromInner(): void
{
$this->buffer[] = [$this->inner->key(), $this->inner->current()];
}

#[Override]
public function count(): int
{
return count($this->buffer);
}
}
24 changes: 24 additions & 0 deletions src/Standard.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

use Override;
use Pipeline\Helper\CursorIterator;
use Pipeline\Helper\WindowIterator;
use Traversable;

/**
Expand Down Expand Up @@ -738,6 +739,29 @@ public function cursor(): Iterator
return new CursorIterator($iterator);
}

/**
* Returns a rewindable iterator that caches elements for replay.
*
* Unlike cursor() which is forward-only, window() buffers seen elements
* allowing rewind within the buffer bounds.
*
* With a size limit, oldest elements are dropped (sliding window).
*
* @param int<1, max> $size Maximum buffer size
* @return Iterator<TKey, TValue>
*/
public function window(int $size): Iterator
{
if ($this->empty()) {
return new EmptyIterator();
}

/** @var Iterator $iterator */
$iterator = $this->getIterator();

return new WindowIterator($iterator, $size);
}

/**
* By default, returns all values regardless of keys used, discarding all keys in the process. This is a terminal operation.
* @return list<TValue>
Expand Down
66 changes: 66 additions & 0 deletions tests/Helper/WindowIteratorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

/**
* Copyright 2017, 2018 Alexey Kopytko <alexey@kopytko.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

declare(strict_types=1);

namespace Tests\Pipeline\Helper;

use ArrayIterator;

use function count;

use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\TestCase;
use Pipeline\Helper\WindowIterator;
use ReflectionProperty;
use RuntimeException;

/**
* @internal
*/
#[CoversClass(WindowIterator::class)]
final class WindowIteratorTest extends TestCase
{
public function testTrimLoopTerminatesCorrectly(): void
{
$callCount = 0;

$mock = $this->getMockBuilder(WindowIterator::class)
->setConstructorArgs([new ArrayIterator([1, 2, 3, 4, 5]), 3])
->onlyMethods(['count'])
->getMock();

$mock->method('count')
->willReturnCallback(function () use (&$callCount, $mock): int {
if (++$callCount > 50) {
throw new RuntimeException('count() called >50 times - infinite loop');
}

$buffer = (new ReflectionProperty(WindowIterator::class, 'buffer'))->getValue($mock);

return count($buffer);
});

// Consume all elements - triggers trim operations
foreach ($mock as $_) {
}

// With 5 elements and maxSize 3, count() calls should be bounded
$this->assertLessThan(50, $callCount);
}
}
Loading
Loading