diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java index ad000a659a..eef330b8bd 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java @@ -22,6 +22,7 @@ import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.record.ByteArrayContinuation; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorContinuation; import com.apple.foundationdb.record.RecordCursorResult; @@ -57,13 +58,20 @@ public class AggregateCursor implements RecordCursor previousValidResult; + // last row in last group, is null if the current group is the first group + @Nullable + private RecordCursorResult lastInLastGroup; + @Nullable + byte[] continuation; public AggregateCursor(@Nonnull RecordCursor inner, @Nonnull final StreamGrouping streamGrouping, - boolean isCreateDefaultOnEmpty) { + boolean isCreateDefaultOnEmpty, + @Nullable byte[] continuation) { this.inner = inner; this.streamGrouping = streamGrouping; this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty; + this.continuation = continuation; } @Nonnull @@ -71,63 +79,83 @@ public AggregateCursor(@Nonnull RecordCursor inner, public CompletableFuture> onNext() { if (previousResult != null && !previousResult.hasNext()) { // we are done - return CompletableFuture.completedFuture(RecordCursorResult.exhausted()); + return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(previousResult.getContinuation(), + previousResult.getNoNextReason())); } return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> { previousResult = innerResult; if (!innerResult.hasNext()) { if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) { + // the method streamGrouping.finalizeGroup() computes previousCompleteResult and resets the accumulator streamGrouping.finalizeGroup(); } return false; } else { final QueryResult queryResult = Objects.requireNonNull(innerResult.get()); boolean groupBreak = streamGrouping.apply(queryResult); - if (!groupBreak) { + if (groupBreak) { + lastInLastGroup = previousValidResult; + } else { // previousValidResult is the last row before group break, it sets the continuation previousValidResult = innerResult; } return (!groupBreak); } }), getExecutor()).thenApply(vignore -> { - if (isNoRecords()) { - // Edge case where there are no records at all - if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) { - return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START); + // either innerResult.hasNext() = false; or groupBreak = true + if (Verify.verifyNotNull(previousResult).hasNext()) { + // in this case groupBreak = true, return aggregated result and continuation + RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation(); + /* + * Update the previousValidResult to the next continuation even though it hasn't been returned. This is to return the correct continuation when there are single-element groups. + * Below is an example that shows how continuation(previousValidResult) moves: + * Initial: previousResult = null, previousValidResult = null + row0 groupKey0 groupBreak = False previousValidResult = row0 previousResult = row0 + row1 groupKey0 groupBreak = False previousValidResult = row1 previousResult = row1 + row2 groupKey1 groupBreak = True previousValidResult = row1 previousResult = row2 + * returns result (groupKey0, continuation = row1), and set previousValidResult = row2 + * + * Now there are 2 scenarios, 1) the current iteration continues; 2) the current iteration stops + * In scenario 1, the iteration continues, it gets to row3: + row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 + * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 + * + * In scenario 2, a new iteration starts from row2 (because the last returned continuation = row1), and set initial previousResult = null, previousValidResult = null: + row2 groupKey1 groupBreak = False previousValidResult = row2 previousResult = row2 + * (Note that because a new iteration starts, groupBreak = False for row2.) + row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 + * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 + * + * Both scenarios returns the correct result, and continuation are both set to row3 in the end, row2 is scanned twice if a new iteration starts. + */ + previousValidResult = previousResult; + return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation); + } else { + if (Verify.verifyNotNull(previousResult).getNoNextReason() == NoNextReason.SOURCE_EXHAUSTED) { + if (previousValidResult == null) { + return RecordCursorResult.exhausted(); + } else { + RecordCursorContinuation continuation = previousValidResult.getContinuation(); + previousValidResult = previousResult; + return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation); + } } else { - return RecordCursorResult.exhausted(); + RecordCursorContinuation currentContinuation; + // in the current scan, if current group is the first group, set the continuation to the start of the current scan + // otherwise set the continuation to the last row in the last group + if (lastInLastGroup == null) { + currentContinuation = continuation == null ? RecordCursorStartContinuation.START : ByteArrayContinuation.fromNullable(continuation); + } else { + currentContinuation = lastInLastGroup.getContinuation(); + } + previousValidResult = lastInLastGroup; + return RecordCursorResult.withoutNextValue(currentContinuation, Verify.verifyNotNull(previousResult).getNoNextReason()); } } - // Use the last valid result for the continuation as we need non-terminal one here. - RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation(); - /* - * Update the previousValidResult to the next continuation even though it hasn't been returned. This is to return the correct continuation when there are single-element groups. - * Below is an example that shows how continuation(previousValidResult) moves: - * Initial: previousResult = null, previousValidResult = null - row0 groupKey0 groupBreak = False previousValidResult = row0 previousResult = row0 - row1 groupKey0 groupBreak = False previousValidResult = row1 previousResult = row1 - row2 groupKey1 groupBreak = True previousValidResult = row1 previousResult = row2 - * returns result (groupKey0, continuation = row1), and set previousValidResult = row2 - * - * Now there are 2 scenarios, 1) the current iteration continues; 2) the current iteration stops - * In scenario 1, the iteration continues, it gets to row3: - row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 - * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 - * - * In scenario 2, a new iteration starts from row2 (because the last returned continuation = row1), and set initial previousResult = null, previousValidResult = null: - row2 groupKey1 groupBreak = False previousValidResult = row2 previousResult = row2 - * (Note that because a new iteration starts, groupBreak = False for row2.) - row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3 - * returns result (groupKey1, continuation = row2), and set previousValidResult = row3 - * - * Both scenarios returns the correct result, and continuation are both set to row3 in the end, row2 is scanned twice if a new iteration starts. - */ - previousValidResult = previousResult; - return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation); }); } - + private boolean isNoRecords() { return ((previousValidResult == null) && (!Verify.verifyNotNull(previousResult).hasNext())); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java index 4342b4b65d..323be325ef 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java @@ -148,7 +148,7 @@ public RecordCursor executePlan(@Nonnull FDBRec (FDBRecordStoreBase)store, context, inner.getAlias()); - return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty) + return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty, continuation) .skipThenLimit(executeProperties.getSkip(), executeProperties.getReturnedRowLimit()); } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java index 42e4105005..98fda6dba1 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java @@ -20,11 +20,17 @@ package com.apple.foundationdb.record.provider.foundationdb.query; +import com.apple.foundationdb.record.ByteScanLimiterFactory; import com.apple.foundationdb.record.EvaluationContext; import com.apple.foundationdb.record.ExecuteProperties; +import com.apple.foundationdb.record.ExecuteState; import com.apple.foundationdb.record.RecordCursor; +import com.apple.foundationdb.record.RecordCursorContinuation; +import com.apple.foundationdb.record.RecordCursorEndContinuation; import com.apple.foundationdb.record.RecordCursorResult; +import com.apple.foundationdb.record.RecordCursorStartContinuation; import com.apple.foundationdb.record.RecordMetaData; +import com.apple.foundationdb.record.RecordScanLimiterFactory; import com.apple.foundationdb.record.TestRecords1Proto; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.query.plan.ScanComparisons; @@ -49,11 +55,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -305,6 +313,34 @@ void aggregateNoRecordsNoGroupNoAggregate(final boolean useNestedResult, final i } } + @Test + void aggregateHitScanLimitReached() { + try (final var context = openContext()) { + openSimpleRecordStore(context, NO_HOOK); + + final var plan = + new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord") + .withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value)) + .withGroupCriterion("str_value_indexed") + .build(false); + + // In the testing data, there are 2 groups, each group has 3 rows. + // recordScanLimit = 5: scans 3 rows, and the 4th scan hits SCAN_LIMIT_REACHED + // although the first group contains exactly 3 rows, we don't know we've finished the first group before we get to the 4th row, so nothing is returned, continuation is back to START + RecordCursorContinuation continuation1 = executePlanWithRecordScanLimit(plan, 5, null, null); + Assertions.assertEquals(RecordCursorStartContinuation.START, continuation1); + // recordScanLimit = 6: scans 4 rows, and the 5th scan hits SCAN_LIMIT_REACHED, we know that we've finished the 1st group, aggregated result is returned + RecordCursorContinuation continuation2 = executePlanWithRecordScanLimit(plan, 6, continuation1.toBytes(), resultOf("0", 3)); + // continue with recordScanLimit = 5, scans 3 rows and hits SCAN_LIMIT_REACHED + // again, we don't know that we've finished the 2nd group, nothing is returned, continuation is back to where the scan starts + RecordCursorContinuation continuation3 = executePlanWithRecordScanLimit(plan, 5, continuation2.toBytes(), null); + Assertions.assertArrayEquals(continuation2.toBytes(), continuation3.toBytes()); + // finish the 2nd group, aggregated result is returned, exhausted the source + RecordCursorContinuation continuation4 = executePlanWithRecordScanLimit(plan, 6, continuation3.toBytes(), resultOf("1", 12)); + Assertions.assertEquals(RecordCursorEndContinuation.END, continuation4); + } + } + private static Stream provideArguments() { // (boolean, rowLimit) // setting rowLimit = 0 is equivalent to no limit @@ -334,11 +370,17 @@ private void populateDB(final int numRecords) throws Exception { } @Nonnull - private RecordCursor executePlan(final RecordQueryPlan plan, final int rowLimit, final byte[] continuation) { + private RecordCursor executePlan(final RecordQueryPlan plan, final int rowLimit, final int recordScanLimit, final byte[] continuation) { final var types = plan.getDynamicTypes(); final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build(); + ExecuteState executeState; + if (recordScanLimit > 0) { + executeState = new ExecuteState(RecordScanLimiterFactory.enforce(recordScanLimit), ByteScanLimiterFactory.tracking()); + } else { + executeState = ExecuteState.NO_LIMITS; + } ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE; - executeProperties = executeProperties.setReturnedRowLimit(rowLimit); + executeProperties = executeProperties.setReturnedRowLimit(rowLimit).setState(executeState); try { return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties); } catch (final Throwable t) { @@ -346,11 +388,32 @@ private RecordCursor executePlan(final RecordQueryPlan plan, final } } + private RecordCursorContinuation executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit, byte[] continuation, @Nullable List expectedResult) { + List queryResults = new LinkedList<>(); + RecordCursor currentCursor = executePlan(plan, 0, recordScanLimit, continuation); + RecordCursorResult currentCursorResult; + RecordCursorContinuation cursorContinuation; + while (true) { + currentCursorResult = currentCursor.getNext(); + cursorContinuation = currentCursorResult.getContinuation(); + if (!currentCursorResult.hasNext()) { + break; + } + queryResults.add(currentCursorResult.get()); + } + if (expectedResult == null) { + Assertions.assertTrue(queryResults.isEmpty()); + } else { + assertResults(this::assertResultFlattened, queryResults, expectedResult); + } + return cursorContinuation; + } + private List executePlanWithRowLimit(final RecordQueryPlan plan, final int rowLimit) { byte[] continuation = null; List queryResults = new LinkedList<>(); while (true) { - RecordCursor currentCursor = executePlan(plan, rowLimit, continuation); + RecordCursor currentCursor = executePlan(plan, rowLimit, 0, continuation); RecordCursorResult currentCursorResult; while (true) { currentCursorResult = currentCursor.getNext(); @@ -369,7 +432,7 @@ private List executePlanWithRowLimit(final RecordQueryPlan plan, fi private void assertResults(@Nonnull final BiConsumer> checkConsumer, @Nonnull final List actual, @Nonnull final List... expected) { Assertions.assertEquals(expected.length, actual.size()); - for (var i = 0 ; i < actual.size() ; i++) { + for (var i = 0; i < actual.size(); i++) { checkConsumer.accept(actual.get(i), expected[i]); } } @@ -385,7 +448,7 @@ private void assertResultFlattened(final QueryResult actual, final List expec final var resultFields = resultFieldsBuilder.build(); Assertions.assertEquals(resultFields.size(), expected.size()); - for (var i = 0 ; i < resultFields.size() ; i++) { + for (var i = 0; i < resultFields.size(); i++) { Assertions.assertEquals(expected.get(i), resultFields.get(i)); } } @@ -416,7 +479,7 @@ private void assertResultNested(final QueryResult actual, final List expected final var resultFields = resultFieldsBuilder.build(); Assertions.assertEquals(resultFields.size(), expected.size()); - for (var i = 0 ; i < resultFields.size() ; i++) { + for (var i = 0; i < resultFields.size(); i++) { Assertions.assertEquals(expected.get(i), resultFields.get(i)); } }