Skip to content

Commit 1ae9d4d

Browse files
committed
scan limit not working?
1 parent 0ec127b commit 1ae9d4d

File tree

2 files changed

+59
-23
lines changed

2 files changed

+59
-23
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.apple.foundationdb.async.AsyncUtil;
2525
import com.apple.foundationdb.record.RecordCursor;
2626
import com.apple.foundationdb.record.RecordCursorContinuation;
27+
import com.apple.foundationdb.record.RecordCursorEndContinuation;
2728
import com.apple.foundationdb.record.RecordCursorResult;
2829
import com.apple.foundationdb.record.RecordCursorStartContinuation;
2930
import com.apple.foundationdb.record.RecordCursorVisitor;
@@ -71,7 +72,9 @@ public AggregateCursor(@Nonnull RecordCursor<QueryResult> inner,
7172
public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
7273
if (previousResult != null && !previousResult.hasNext()) {
7374
// we are done
74-
return CompletableFuture.completedFuture(RecordCursorResult.exhausted());
75+
return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(previousResult.getContinuation(),
76+
previousResult.getNoNextReason()));
77+
// return CompletableFuture.completedFuture(RecordCursorResult.exhausted());
7578
}
7679

7780
return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> {
@@ -84,7 +87,9 @@ public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
8487
} else {
8588
final QueryResult queryResult = Objects.requireNonNull(innerResult.get());
8689
boolean groupBreak = streamGrouping.apply(queryResult);
87-
previousValidResult = innerResult;
90+
if (!groupBreak) {
91+
previousValidResult = innerResult;
92+
}
8893
return (!groupBreak);
8994
}
9095
}), getExecutor()).thenApply(vignore -> {
@@ -98,6 +103,7 @@ public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
98103
}
99104
// Use the last valid result for the continuation as we need non-terminal one here.
100105
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation();
106+
previousValidResult = previousResult;
101107
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation);
102108
});
103109
}

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222

2323
import com.apple.foundationdb.record.EvaluationContext;
2424
import com.apple.foundationdb.record.ExecuteProperties;
25+
import com.apple.foundationdb.record.ExecuteState;
2526
import com.apple.foundationdb.record.RecordCursor;
2627
import com.apple.foundationdb.record.RecordCursorResult;
2728
import com.apple.foundationdb.record.RecordMetaData;
29+
import com.apple.foundationdb.record.RecordScanLimiterFactory;
2830
import com.apple.foundationdb.record.TestRecords1Proto;
2931
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
3032
import com.apple.foundationdb.record.query.plan.ScanComparisons;
@@ -292,36 +294,56 @@ void test(final boolean useNestedResult) {
292294
final var plan =
293295
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord")
294296
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value))
295-
.withGroupCriterion("num_value_3_indexed")
296297
.withGroupCriterion("str_value_indexed")
297298
.build(useNestedResult);
298299

299-
final var result = executePlanWithRecordScanLimit(plan, 2);
300-
assertResults(useNestedResult ? this::assertResultNested : this::assertResultFlattened, result, resultOf(0, "0", 1), resultOf(1, "0", 2), resultOf(1, "1", 3), resultOf(2, "1", 9));
300+
/*
301+
num_value_2 str_value_indexed
302+
0 "0"
303+
1 "0"
304+
2 "0"
305+
*/
306+
/*
307+
RecordCursorResult<QueryResult> result1 = executePlanWithRecordScanLimit(plan, 1, null, useNestedResult,
308+
List.of(resultOf("0", 0)));
309+
RecordCursorResult<QueryResult> result2 = executePlanWithRecordScanLimit(plan, 2, null, useNestedResult,
310+
List.of(resultOf("0", 1)));
311+
// only scanned 2 rows?
312+
RecordCursorResult<QueryResult> result3 = executePlanWithRecordScanLimit(plan, 3, null, useNestedResult,
313+
List.of(resultOf("0", 1)));
314+
315+
*/
316+
// only scanned 3 rows?
317+
RecordCursorResult<QueryResult> result4 = executePlanWithRecordScanLimit(plan, 4, null, useNestedResult,
318+
List.of(resultOf("0", 3)));
319+
/*
320+
RecordCursorResult<QueryResult> result5 = executePlanWithRecordScanLimit(plan, 5, null, useNestedResult,
321+
List.of(resultOf("0", 3), resultOf("1", 3)));
322+
RecordCursorResult<QueryResult> result6 = executePlanWithRecordScanLimit(plan, 6, null, useNestedResult,
323+
List.of(resultOf("0", 3), resultOf("1", 7)));
324+
RecordCursorResult<QueryResult> result7 = executePlanWithRecordScanLimit(plan, 7, null, useNestedResult,
325+
List.of(resultOf("0", 3), resultOf("1", 12)));
326+
327+
*/
301328
}
302329
}
303330

304-
private List<QueryResult> executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit) {
305-
byte[] continuation = null;
306-
List<QueryResult> queryResults = new LinkedList<>();
331+
private RecordCursorResult<QueryResult> executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit, byte[] continuation, final boolean useNestedResult, List<?> expectedResult) {
332+
RecordCursorResult<QueryResult> currentCursorResult;
333+
List<QueryResult> currentQueryResults = new LinkedList<>();
307334
while (true) {
308335
RecordCursor<QueryResult> currentCursor = executePlan(plan, 0, recordScanLimit, continuation);
309-
RecordCursorResult<QueryResult> currentCursorResult;
310-
while (true) {
311-
currentCursorResult = currentCursor.getNext();
312-
continuation = currentCursorResult.getContinuation().toBytes();
313-
if (!currentCursorResult.hasNext()) {
314-
break;
315-
}
316-
queryResults.add(currentCursorResult.get());
317-
System.out.println("current result:" + currentCursorResult.get().getMessage());
318-
}
319-
System.out.println("getNoNextReson:" + currentCursorResult.getNoNextReason());
320-
if (currentCursorResult.getNoNextReason() == RecordCursor.NoNextReason.SOURCE_EXHAUSTED) {
336+
currentCursorResult = currentCursor.getNext();
337+
continuation = currentCursorResult.getContinuation().toBytes();
338+
if (!currentCursorResult.hasNext()) {
321339
break;
322340
}
341+
currentQueryResults.add(currentCursorResult.get());
342+
System.out.println("current result:" + currentCursorResult.get().getMessage());
323343
}
324-
return queryResults;
344+
assertResults(useNestedResult ? this::assertResultNested : this::assertResultFlattened, currentQueryResults, expectedResult.toArray(new List<?>[0]));
345+
System.out.println("getNoNextReson:" + currentCursorResult.getNoNextReason());
346+
return currentCursorResult;
325347
}
326348

327349
private void populateDB(final int numRecords) throws Exception {
@@ -341,11 +363,19 @@ private void populateDB(final int numRecords) throws Exception {
341363
}
342364

343365
@Nonnull
344-
private List<QueryResult> executePlan(final RecordQueryPlan plan) {
366+
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final int rowLimit, final int recordScanLimit, final byte[] continuation) {
345367
final var types = plan.getDynamicTypes();
346368
final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build();
369+
ExecuteState executeState;
370+
if (recordScanLimit > 0) {
371+
executeState = new ExecuteState(RecordScanLimiterFactory.enforce(recordScanLimit), null);
372+
} else {
373+
executeState = ExecuteState.NO_LIMITS;
374+
}
375+
ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE;
376+
executeProperties = executeProperties.setReturnedRowLimit(rowLimit).setState(executeState);
347377
try {
348-
return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), null, ExecuteProperties.SERIAL_EXECUTE).asList().get();
378+
return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties);
349379
} catch (final Throwable t) {
350380
throw Assertions.<RuntimeException>fail(t);
351381
}

0 commit comments

Comments
 (0)