diff --git a/src/Builder/Accumulator/AvgAccumulator.php b/src/Builder/Accumulator/AvgAccumulator.php index afbb1747b..0d783d563 100644 --- a/src/Builder/Accumulator/AvgAccumulator.php +++ b/src/Builder/Accumulator/AvgAccumulator.php @@ -47,4 +47,24 @@ public function __construct(Decimal128|Int64|ResolvesToNumber|float|int|string $ $this->expression = $expression; } + + /** + * Accumulate the average for the current group state using the provided document. + * + * @param array $state Reference to the group state + * @param array $doc The current document + */ + public function accumulate(array &$state, array $doc): void + { + $expr = $this->expression; + $val = 0; + if (is_string($expr) && str_starts_with($expr, '$')) { + $field = substr($expr, 1); + $val = $doc[$field] ?? 0; + } elseif (is_numeric($expr)) { + $val = $expr; + } + $state['sum'] = ($state['sum'] ?? 0) + $val; + $state['count'] = ($state['count'] ?? 0) + 1; + } } diff --git a/src/Builder/Accumulator/CountAccumulator.php b/src/Builder/Accumulator/CountAccumulator.php index 933d86c6d..9c1287c12 100644 --- a/src/Builder/Accumulator/CountAccumulator.php +++ b/src/Builder/Accumulator/CountAccumulator.php @@ -29,4 +29,15 @@ final class CountAccumulator implements AccumulatorInterface, WindowInterface, O public function __construct() { } + + /** + * Accumulate the count for the current group state using the provided document. + * + * @param array $state Reference to the group state + * @param array $doc The current document + */ + public function accumulate(array &$state, array $doc): void + { + $state['value'] = ($state['value'] ?? 0) + 1; + } } diff --git a/src/Builder/Accumulator/MaxAccumulator.php b/src/Builder/Accumulator/MaxAccumulator.php index 7870591d2..bf8cb8700 100644 --- a/src/Builder/Accumulator/MaxAccumulator.php +++ b/src/Builder/Accumulator/MaxAccumulator.php @@ -41,4 +41,25 @@ public function __construct( ) { $this->expression = $expression; } + + /** + * Accumulate the maximum for the current group state using the provided document. + * + * @param array $state Reference to the group state + * @param array $doc The current document + */ + public function accumulate(array &$state, array $doc): void + { + $expr = $this->expression; + $val = null; + if (is_string($expr) && str_starts_with($expr, '$')) { + $field = substr($expr, 1); + $val = $doc[$field] ?? null; + } elseif (is_numeric($expr)) { + $val = $expr; + } + if (!isset($state['value']) || ($val !== null && $val > $state['value'])) { + $state['value'] = $val; + } + } } diff --git a/src/Builder/Accumulator/MinAccumulator.php b/src/Builder/Accumulator/MinAccumulator.php index 3b8f5a05b..0c1332392 100644 --- a/src/Builder/Accumulator/MinAccumulator.php +++ b/src/Builder/Accumulator/MinAccumulator.php @@ -41,4 +41,25 @@ public function __construct( ) { $this->expression = $expression; } + + /** + * Accumulate the minimum for the current group state using the provided document. + * + * @param array $state Reference to the group state + * @param array $doc The current document + */ + public function accumulate(array &$state, array $doc): void + { + $expr = $this->expression; + $val = null; + if (is_string($expr) && str_starts_with($expr, '$')) { + $field = substr($expr, 1); + $val = $doc[$field] ?? null; + } elseif (is_numeric($expr)) { + $val = $expr; + } + if (!isset($state['value']) || ($val !== null && $val < $state['value'])) { + $state['value'] = $val; + } + } } diff --git a/src/Builder/Accumulator/OutputWindowAccumulator.php b/src/Builder/Accumulator/OutputWindowAccumulator.php new file mode 100644 index 000000000..43191fde3 --- /dev/null +++ b/src/Builder/Accumulator/OutputWindowAccumulator.php @@ -0,0 +1,57 @@ +accumulator = $accumulator; + $this->documents = $documents; + } + + /** + * Accumulate the window for the given documents. + * + * @param array $docs Window documents + * @return mixed + */ + public function accumulateWindow(array $docs): mixed + { + if ($this->accumulator instanceof SumAccumulator) { + $sum = 0; + foreach ($docs as $doc) { + $state = []; + $this->accumulator->accumulate($state, $doc); + $sum += $state['value'] ?? 0; + } + return $sum; + } + if ($this->accumulator instanceof MaxAccumulator) { + $max = null; + foreach ($docs as $doc) { + $state = []; + $this->accumulator->accumulate($state, $doc); + if ($max === null || ($state['value'] ?? null) > $max) { + $max = $state['value'] ?? null; + } + } + return $max; + } + // Add more accumulator support as needed + return null; + } +} + diff --git a/src/Builder/Accumulator/SumAccumulator.php b/src/Builder/Accumulator/SumAccumulator.php index fa8cdb642..44bbf23d9 100644 --- a/src/Builder/Accumulator/SumAccumulator.php +++ b/src/Builder/Accumulator/SumAccumulator.php @@ -47,4 +47,23 @@ public function __construct(Decimal128|Int64|ResolvesToNumber|float|int|string $ $this->expression = $expression; } + + /** + * Accumulate the sum for the current group state using the provided document. + * + * @param array $state Reference to the group state + * @param array $doc The current document + */ + public function accumulate(array &$state, array $doc): void + { + $expr = $this->expression; + $val = 0; + if (is_string($expr) && str_starts_with($expr, '$')) { + $field = substr($expr, 1); + $val = $doc[$field] ?? 0; + } elseif (is_numeric($expr)) { + $val = $expr; + } + $state['value'] = ($state['value'] ?? 0) + $val; + } } diff --git a/src/Builder/Pipeline.php b/src/Builder/Pipeline.php index e481277a9..07a4daa7f 100644 --- a/src/Builder/Pipeline.php +++ b/src/Builder/Pipeline.php @@ -57,4 +57,24 @@ public function getIterator(): ArrayIterator { return new ArrayIterator($this->stages); } + + /** + * Executes the pipeline locally on the provided documents. + * Only for test/local execution purposes. + * + * @param array $documents Input documents + * @return array Resulting documents after pipeline execution + */ + public function processLocally(array $documents): array + { + $result = $documents; + foreach ($this->stages as $stage) { + if (method_exists($stage, 'processLocally')) { + $result = $stage->processLocally($result); + } else { + throw new \RuntimeException('Stage does not support local execution: ' . (is_object($stage) ? get_class($stage) : gettype($stage))); + } + } + return $result; + } } diff --git a/src/Builder/Stage/FacetStage.php b/src/Builder/Stage/FacetStage.php index 6dc36bebd..44a2fcd26 100644 --- a/src/Builder/Stage/FacetStage.php +++ b/src/Builder/Stage/FacetStage.php @@ -52,4 +52,28 @@ public function __construct(PackedArray|Pipeline|BSONArray|array ...$facet) $facet = (object) $facet; $this->facet = $facet; } + + /** + * Executes the $facet stage locally on the provided documents. + * Only for test/local execution purposes. + * + * @param array $documents Input documents + * @return array Resulting documents after facet pipelines + */ + public function processLocally(array $documents): array + { + $result = []; + foreach ($this->facet as $facetName => $pipeline) { + $facetDocs = $documents; + foreach ($pipeline as $stage) { + if (method_exists($stage, 'processLocally')) { + $facetDocs = $stage->processLocally($facetDocs); + } else { + throw new \RuntimeException("Stage does not support local execution: " . get_class($stage)); + } + } + $result[$facetName] = array_values($facetDocs); + } + return [$result]; + } } diff --git a/src/Builder/Stage/GroupStage.php b/src/Builder/Stage/GroupStage.php index 8577b4ad1..2aad5aecf 100644 --- a/src/Builder/Stage/GroupStage.php +++ b/src/Builder/Stage/GroupStage.php @@ -58,4 +58,60 @@ public function __construct( $field = (object) $field; $this->field = $field; } + + /** + * Executes the $group stage locally on the provided documents. + * Only for test/local execution purposes. + * Supports grouping by _id and $sum accumulator. + * + * @param array $documents Input documents + * @return array Grouped documents + */ + public function processLocally(array $documents): array + { + $groups = []; + $states = []; + foreach ($documents as $doc) { + $groupKey = is_string($this->_id) ? ($doc[$this->_id] ?? null) : $this->_id; + if (!isset($groups[$groupKey])) { + $groups[$groupKey] = ['_id' => $groupKey]; + $states[$groupKey] = []; + } + foreach ($this->field as $fieldName => $accumulator) { + if ($accumulator instanceof AccumulatorInterface) { + if (!isset($states[$groupKey][$fieldName])) { + $states[$groupKey][$fieldName] = []; + } + $accumulator->accumulate($states[$groupKey][$fieldName], $doc); + } elseif (is_array($accumulator) && isset($accumulator['$sum'])) { + // Legacy array accumulator for $sum + $sumField = $accumulator['$sum']; + $states[$groupKey][$fieldName]['value'] = ($states[$groupKey][$fieldName]['value'] ?? 0) + ($doc[$sumField] ?? 0); + } + } + } + // Finalize results + foreach ($groups as $groupKey => &$result) { + foreach ($this->field as $fieldName => $accumulator) { + if ($accumulator instanceof AccumulatorInterface) { + $state = $states[$groupKey][$fieldName] ?? null; + if ($state === null) { + $result[$fieldName] = null; + continue; + } + // If avg, compute average if sum/count present + if (isset($state['sum']) && isset($state['count'])) { + $result[$fieldName] = $state['count'] ? $state['sum'] / $state['count'] : null; + } elseif (isset($state['value'])) { + $result[$fieldName] = $state['value']; + } else { + $result[$fieldName] = $state; + } + } elseif (is_array($accumulator) && isset($accumulator['$sum'])) { + $result[$fieldName] = $states[$groupKey][$fieldName]['value'] ?? null; + } + } + } + return array_values($groups); + } } diff --git a/src/Builder/Stage/LimitStage.php b/src/Builder/Stage/LimitStage.php index ec765fa8a..d24338497 100644 --- a/src/Builder/Stage/LimitStage.php +++ b/src/Builder/Stage/LimitStage.php @@ -34,4 +34,16 @@ public function __construct(int $limit) { $this->limit = $limit; } + + /** + * Executes the $limit stage locally on the provided documents. + * Only for test/local execution purposes. + * + * @param array $documents Input documents + * @return array Limited documents + */ + public function processLocally(array $documents): array + { + return array_slice($documents, 0, $this->limit); + } } diff --git a/src/Builder/Stage/MatchStage.php b/src/Builder/Stage/MatchStage.php index 0c88db2cf..54d6a673a 100644 --- a/src/Builder/Stage/MatchStage.php +++ b/src/Builder/Stage/MatchStage.php @@ -42,4 +42,33 @@ public function __construct(QueryInterface|array $query) $this->query = $query; } + + /** + * Executes the $match stage locally on the provided documents. + * Only for test/local execution purposes. + * + * @param array $documents Input documents + * @return array Filtered documents + */ + public function processLocally(array $documents): array + { + // Safely convert query to array for local matching + if (is_array($this->query)) { + $query = $this->query; + } elseif ($this->query instanceof \MongoDB\Builder\Type\QueryObject) { + $query = $this->query->queries; + } elseif (method_exists($this->query, 'getArrayCopy')) { + $query = $this->query->getArrayCopy(); + } else { + $query = []; + } + return array_values(array_filter($documents, function ($doc) use ($query) { + foreach ($query as $field => $value) { + if (!isset($doc[$field]) || $doc[$field] !== $value) { + return false; + } + } + return true; + })); + } } diff --git a/src/Builder/Stage/ProjectStage.php b/src/Builder/Stage/ProjectStage.php index e2f6c4977..526c392de 100644 --- a/src/Builder/Stage/ProjectStage.php +++ b/src/Builder/Stage/ProjectStage.php @@ -53,4 +53,25 @@ public function __construct( $specification = (object) $specification; $this->specification = $specification; } + + /** + * Executes the $project stage locally on the provided documents. + * Only for test/local execution purposes. + * + * @param array $documents Input documents + * @return array Projected documents + */ + public function processLocally(array $documents): array + { + $spec = (array) $this->specification; + return array_map(function ($doc) use ($spec) { + $result = []; + foreach ($spec as $field => $include) { + if ($include && isset($doc[$field])) { + $result[$field] = $doc[$field]; + } + } + return $result; + }, $documents); + } } diff --git a/src/Builder/Stage/SetWindowFieldsStage.php b/src/Builder/Stage/SetWindowFieldsStage.php index 7d45bc5aa..27666b3c6 100644 --- a/src/Builder/Stage/SetWindowFieldsStage.php +++ b/src/Builder/Stage/SetWindowFieldsStage.php @@ -59,4 +59,81 @@ public function __construct( $this->output = $output; $this->partitionBy = $partitionBy; } + + /** + * Executes the $setWindowFields stage locally on the provided documents. + * Only for test/local execution purposes. + * Supports partitioning, sorting, and window accumulators (sum, max). + * + * @param array $documents Input documents + * @return array Documents with window fields + */ + public function processLocally(array $documents): array + { + // Partition documents generically + $partitions = []; + foreach ($documents as $doc) { + $key = null; + if (is_callable($this->partitionBy)) { + $key = ($this->partitionBy)($doc); + } elseif (is_object($this->partitionBy) && method_exists($this->partitionBy, 'extract')) { + $key = $this->partitionBy->extract($doc); + } elseif (is_object($this->partitionBy) && method_exists($this->partitionBy, '__toString')) { + $field = (string) $this->partitionBy; + $key = $doc[$field] ?? null; + } elseif (is_string($this->partitionBy)) { + $key = $doc[$this->partitionBy] ?? null; + } elseif ($this->partitionBy instanceof Optional && $this->partitionBy === Optional::Undefined) { + $key = null; + } + $partitions[$key][] = $doc; + } + // Sort partitions + $sortSpec = (array) $this->sortBy; + foreach ($partitions as &$docs) { + usort($docs, function ($a, $b) use ($sortSpec) { + foreach ($sortSpec as $field => $direction) { + $dir = ($direction === -1 || $direction === 'desc' || $direction === \MongoDB\Builder\Type\Sort::Desc) ? -1 : 1; + $aVal = $a[$field] ?? null; + $bVal = $b[$field] ?? null; + if ($aVal === $bVal) { + continue; + } + return ($aVal < $bVal ? -1 : 1) * $dir; + } + return 0; + }); + } + // Apply window accumulators + $outputSpec = (array) $this->output; + $result = []; + foreach ($partitions as $docs) { + $count = count($docs); + for ($i = 0; $i < $count; $i++) { + $doc = $docs[$i]; + $newDoc = $doc; + foreach ($outputSpec as $field => $windowOp) { + // Only support outputWindow for now + if (method_exists($windowOp, 'accumulateWindow')) { + // Determine window bounds + $windowDocs = []; + if (isset($windowOp->documents)) { + $bounds = $windowOp->documents; + // ['unbounded', 'current'] means from 0 to $i + $start = ($bounds[0] === 'unbounded') ? 0 : max(0, $i + $bounds[0]); + $end = ($bounds[1] === 'current') ? $i : (($bounds[1] === 'unbounded') ? $count - 1 : min($count - 1, $i + $bounds[1])); + for ($j = $start; $j <= $end; $j++) { + $windowDocs[] = $docs[$j]; + } + } else { + $windowDocs = $docs; + } + $newDoc[$field] = $windowOp->accumulateWindow($windowDocs); + } + } + $result[] = $newDoc; + } + } + return $result; + } } diff --git a/src/Builder/Stage/SkipStage.php b/src/Builder/Stage/SkipStage.php index aab0242d8..af890afd2 100644 --- a/src/Builder/Stage/SkipStage.php +++ b/src/Builder/Stage/SkipStage.php @@ -34,4 +34,16 @@ public function __construct(int $skip) { $this->skip = $skip; } + + /** + * Executes the $skip stage locally on the provided documents. + * Only for test/local execution purposes. + * + * @param array $documents Input documents + * @return array Skipped documents + */ + public function processLocally(array $documents): array + { + return array_slice($documents, $this->skip); + } } diff --git a/src/Builder/Stage/SortStage.php b/src/Builder/Stage/SortStage.php index fc8899a4c..0161f1a5e 100644 --- a/src/Builder/Stage/SortStage.php +++ b/src/Builder/Stage/SortStage.php @@ -54,4 +54,29 @@ public function __construct( $sort = (object) $sort; $this->sort = $sort; } + + /** + * Executes the $sort stage locally on the provided documents. + * Only for test/local execution purposes. + * + * @param array $documents Input documents + * @return array Sorted documents + */ + public function processLocally(array $documents): array + { + $sortSpec = (array) $this->sort; + usort($documents, function ($a, $b) use ($sortSpec) { + foreach ($sortSpec as $field => $direction) { + $dir = ($direction === -1 || $direction === 'desc' || $direction === Sort::Desc) ? -1 : 1; + $aVal = $a[$field] ?? null; + $bVal = $b[$field] ?? null; + if ($aVal === $bVal) { + continue; + } + return ($aVal < $bVal ? -1 : 1) * $dir; + } + return 0; + }); + return $documents; + } } diff --git a/tests/Builder/Stage/LocalExecutionTest.php b/tests/Builder/Stage/LocalExecutionTest.php new file mode 100644 index 000000000..1c9836d9f --- /dev/null +++ b/tests/Builder/Stage/LocalExecutionTest.php @@ -0,0 +1,55 @@ + 'dave', 'score' => 80], + ['author' => 'john', 'score' => 90], + ]; + $stage = new MatchStage(['author' => 'dave']); + $result = $stage->processLocally($docs); + $this->assertSame([ + ['author' => 'dave', 'score' => 80], + ], $result); + } + + public function testProjectStageProcessLocally(): void + { + $docs = [ + ['author' => 'dave', 'score' => 80], + ['author' => 'john', 'score' => 90], + ]; + $stage = new ProjectStage(...['author' => 1]); + $result = $stage->processLocally($docs); + $this->assertSame([ + ['author' => 'dave'], + ['author' => 'john'], + ], $result); + } + + public function testGroupStageProcessLocallySum(): void + { + $docs = [ + ['category' => 'A', 'value' => 10], + ['category' => 'A', 'value' => 5], + ['category' => 'B', 'value' => 7], + ]; + $stage = new GroupStage('category', ...['total' => ['$sum' => 'value']]); + $result = $stage->processLocally($docs); + $this->assertSame([ + ['_id' => 'A', 'total' => 15], + ['_id' => 'B', 'total' => 7], + ], $result); + } +} diff --git a/tests/Builder/Stage/SetWindowFieldsLocalTest.php b/tests/Builder/Stage/SetWindowFieldsLocalTest.php new file mode 100644 index 000000000..2aae4b750 --- /dev/null +++ b/tests/Builder/Stage/SetWindowFieldsLocalTest.php @@ -0,0 +1,59 @@ + new \DateTime('2022-01-01'), 'quantity' => 5], + ['orderDate' => new \DateTime('2022-02-01'), 'quantity' => 7], + ['orderDate' => new \DateTime('2023-01-01'), 'quantity' => 3], + ['orderDate' => new \DateTime('2023-02-01'), 'quantity' => 8], + ]; + $pipeline = new Pipeline( + new SetWindowFieldsStage( + sortBy: ['orderDate' => Sort::Asc], + output: [ + 'cumulativeQuantityForYear' => new OutputWindowAccumulator( + new SumAccumulator('$quantity'), + ['documents' => ['unbounded', 'current']] + ), + 'maximumQuantityForYear' => new OutputWindowAccumulator( + new MaxAccumulator('$quantity'), + ['documents' => ['unbounded', 'unbounded']] + ), + ], + partitionBy: new class { + // Mimic YearExpression for partitioning + public function __toString() { return 'year'; } + }, + ) + ); + $result = $pipeline->processLocally($docs); + $expected = [ + ['orderDate' => new \DateTime('2022-01-01'), 'quantity' => 5, 'cumulativeQuantityForYear' => 5, 'maximumQuantityForYear' => 7], + ['orderDate' => new \DateTime('2022-02-01'), 'quantity' => 7, 'cumulativeQuantityForYear' => 12, 'maximumQuantityForYear' => 7], + ['orderDate' => new \DateTime('2023-01-01'), 'quantity' => 3, 'cumulativeQuantityForYear' => 3, 'maximumQuantityForYear' => 8], + ['orderDate' => new \DateTime('2023-02-01'), 'quantity' => 8, 'cumulativeQuantityForYear' => 11, 'maximumQuantityForYear' => 8], + ]; + foreach ($result as $i => $doc) { + $this->assertEquals($expected[$i]['orderDate'], $doc['orderDate']); + $this->assertEquals($expected[$i]['quantity'], $doc['quantity']); + $this->assertEquals($expected[$i]['cumulativeQuantityForYear'], $doc['cumulativeQuantityForYear']); + $this->assertEquals($expected[$i]['maximumQuantityForYear'], $doc['maximumQuantityForYear']); + } + } +} + diff --git a/tests/Builder/Stage/SortLimitSkipLocalTest.php b/tests/Builder/Stage/SortLimitSkipLocalTest.php new file mode 100644 index 000000000..7deb6f685 --- /dev/null +++ b/tests/Builder/Stage/SortLimitSkipLocalTest.php @@ -0,0 +1,86 @@ + 2, 'y' => 'b'], + ['x' => 1, 'y' => 'a'], + ['x' => 3, 'y' => 'c'], + ]; + $pipelineAsc = new Pipeline( + new SortStage(...['x' => 1]) + ); + $resultAsc = $pipelineAsc->processLocally($docs); + $this->assertSame([ + ['x' => 1, 'y' => 'a'], + ['x' => 2, 'y' => 'b'], + ['x' => 3, 'y' => 'c'], + ], $resultAsc); + + $pipelineDesc = new Pipeline( + new SortStage(...['x' => -1]) + ); + $resultDesc = $pipelineDesc->processLocally($docs); + $this->assertSame([ + ['x' => 3, 'y' => 'c'], + ['x' => 2, 'y' => 'b'], + ['x' => 1, 'y' => 'a'], + ], $resultDesc); + } + + public function testLimitStageProcessLocally(): void + { + $docs = [ + ['x' => 1], ['x' => 2], ['x' => 3], ['x' => 4], + ]; + $pipeline = new Pipeline( + new LimitStage(2) + ); + $result = $pipeline->processLocally($docs); + $this->assertSame([ + ['x' => 1], ['x' => 2], + ], $result); + } + + public function testSkipStageProcessLocally(): void + { + $docs = [ + ['x' => 1], ['x' => 2], ['x' => 3], ['x' => 4], + ]; + $pipeline = new Pipeline( + new SkipStage(2) + ); + $result = $pipeline->processLocally($docs); + $this->assertSame([ + ['x' => 3], ['x' => 4], + ], $result); + } + + public function testSortLimitSkipCombined(): void + { + $docs = [ + ['x' => 5], ['x' => 2], ['x' => 8], ['x' => 1], + ]; + $pipeline = new Pipeline( + new SortStage(...['x' => 1]), + new LimitStage(2), + new SkipStage(1) + ); + $result = $pipeline->processLocally($docs); + $this->assertSame([ + ['x' => 2], + ], $result); + } +}