Skip to content

Commit 3a153b0

Browse files
committed
test work
1 parent 1ae9d4d commit 3a153b0

File tree

3 files changed

+145
-56
lines changed

3 files changed

+145
-56
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222

2323
import com.apple.foundationdb.annotation.API;
2424
import com.apple.foundationdb.async.AsyncUtil;
25+
import com.apple.foundationdb.record.ByteArrayContinuation;
2526
import com.apple.foundationdb.record.RecordCursor;
2627
import com.apple.foundationdb.record.RecordCursorContinuation;
2728
import com.apple.foundationdb.record.RecordCursorEndContinuation;
2829
import com.apple.foundationdb.record.RecordCursorResult;
2930
import com.apple.foundationdb.record.RecordCursorStartContinuation;
3031
import com.apple.foundationdb.record.RecordCursorVisitor;
32+
import com.apple.foundationdb.record.provider.foundationdb.KeyValueCursorBase;
3133
import com.apple.foundationdb.record.query.plan.plans.QueryResult;
3234
import com.google.common.base.Verify;
3335
import com.google.protobuf.Message;
@@ -58,13 +60,19 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes
5860
// Previous non-empty record processed by this cursor
5961
@Nullable
6062
private RecordCursorResult<QueryResult> previousValidResult;
63+
// last row in last group, is null if the current group is the first group
64+
@Nullable
65+
private RecordCursorResult<QueryResult> lastInLastGroup;
66+
byte[] continuation;
6167

6268
public AggregateCursor(@Nonnull RecordCursor<QueryResult> inner,
6369
@Nonnull final StreamGrouping<M> streamGrouping,
64-
boolean isCreateDefaultOnEmpty) {
70+
boolean isCreateDefaultOnEmpty,
71+
byte[] continuation) {
6572
this.inner = inner;
6673
this.streamGrouping = streamGrouping;
6774
this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty;
75+
this.continuation = continuation;
6876
}
6977

7078
@Nonnull
@@ -74,37 +82,55 @@ public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
7482
// we are done
7583
return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(previousResult.getContinuation(),
7684
previousResult.getNoNextReason()));
77-
// return CompletableFuture.completedFuture(RecordCursorResult.exhausted());
7885
}
7986

8087
return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> {
8188
previousResult = innerResult;
8289
if (!innerResult.hasNext()) {
8390
if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) {
91+
// the method streamGrouping.finalizeGroup() computes previousCompleteResult and resets the accumulator
8492
streamGrouping.finalizeGroup();
8593
}
8694
return false;
8795
} else {
8896
final QueryResult queryResult = Objects.requireNonNull(innerResult.get());
8997
boolean groupBreak = streamGrouping.apply(queryResult);
90-
if (!groupBreak) {
98+
if (groupBreak) {
99+
lastInLastGroup = previousValidResult;
100+
} else {
91101
previousValidResult = innerResult;
92102
}
93103
return (!groupBreak);
94104
}
95105
}), getExecutor()).thenApply(vignore -> {
96-
if (isNoRecords()) {
97-
// Edge case where there are no records at all
98-
if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) {
99-
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START);
106+
// either innerResult.hasNext() = false; or groupBreak = true
107+
if (Verify.verifyNotNull(previousResult).hasNext()) {
108+
// in this case groupBreak = true, return aggregated result and continuation
109+
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation();
110+
previousValidResult = previousResult;
111+
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation);
112+
} else {
113+
if (Verify.verifyNotNull(previousResult).getNoNextReason() == NoNextReason.SOURCE_EXHAUSTED) {
114+
if (previousValidResult == null) {
115+
return RecordCursorResult.exhausted();
116+
} else {
117+
RecordCursorContinuation continuation =previousValidResult.getContinuation();
118+
previousValidResult = previousResult;
119+
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation);
120+
}
100121
} else {
101-
return RecordCursorResult.exhausted();
122+
RecordCursorContinuation currentContinuation;
123+
// in the current scan, if current group is the first group, set the continuation to the start of the current scan
124+
// otherwise set the continuation to the last row in the last group
125+
if (lastInLastGroup == null) {
126+
currentContinuation = continuation == null ? RecordCursorStartContinuation.START : ByteArrayContinuation.fromNullable(continuation);
127+
} else {
128+
currentContinuation = lastInLastGroup.getContinuation();
129+
}
130+
previousValidResult = lastInLastGroup;
131+
return RecordCursorResult.withoutNextValue(currentContinuation, Verify.verifyNotNull(previousResult).getNoNextReason());
102132
}
103133
}
104-
// Use the last valid result for the continuation as we need non-terminal one here.
105-
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation();
106-
previousValidResult = previousResult;
107-
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation);
108134
});
109135
}
110136

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public <M extends Message> RecordCursor<QueryResult> executePlan(@Nonnull FDBRec
148148
(FDBRecordStoreBase<Message>)store,
149149
context,
150150
inner.getAlias());
151-
return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty)
151+
return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty, continuation)
152152
.skipThenLimit(executeProperties.getSkip(),
153153
executeProperties.getReturnedRowLimit());
154154
}

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java

Lines changed: 106 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020

2121
package com.apple.foundationdb.record.provider.foundationdb.query;
2222

23+
import com.apple.foundationdb.record.ByteScanLimiterFactory;
2324
import com.apple.foundationdb.record.EvaluationContext;
2425
import com.apple.foundationdb.record.ExecuteProperties;
2526
import com.apple.foundationdb.record.ExecuteState;
2627
import com.apple.foundationdb.record.RecordCursor;
28+
import com.apple.foundationdb.record.RecordCursorContinuation;
29+
import com.apple.foundationdb.record.RecordCursorEndContinuation;
2730
import com.apple.foundationdb.record.RecordCursorResult;
31+
import com.apple.foundationdb.record.RecordCursorStartContinuation;
2832
import com.apple.foundationdb.record.RecordMetaData;
2933
import com.apple.foundationdb.record.RecordScanLimiterFactory;
3034
import com.apple.foundationdb.record.TestRecords1Proto;
@@ -52,16 +56,21 @@
5256
import org.junit.jupiter.api.Assertions;
5357
import org.junit.jupiter.api.BeforeEach;
5458
import org.junit.jupiter.api.Tag;
59+
import org.junit.jupiter.api.Test;
5560
import org.junit.jupiter.params.ParameterizedTest;
61+
import org.junit.jupiter.params.provider.Arguments;
62+
import org.junit.jupiter.params.provider.MethodSource;
5663

5764
import javax.annotation.Nonnull;
65+
import javax.annotation.Nullable;
5866
import java.util.ArrayList;
5967
import java.util.Arrays;
6068
import java.util.Collections;
6169
import java.util.LinkedList;
6270
import java.util.List;
6371
import java.util.function.BiConsumer;
6472
import java.util.function.Function;
73+
import java.util.stream.Stream;
6574

6675
/**
6776
* Tests related to planning and executing queries with string collation.
@@ -218,6 +227,26 @@ void aggregateThreeGroupByTwo(final boolean useNestedResult) {
218227
}
219228
}
220229

230+
@ParameterizedTest(name = "[{displayName}-{index}] {0}")
231+
@MethodSource("provideArguments")
232+
void aggregateOneGroupByThree(final boolean useNestedResult, final int rowLimit) {
233+
// each group only has one row
234+
try (final var context = openContext()) {
235+
openSimpleRecordStore(context, NO_HOOK);
236+
237+
final var plan =
238+
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord")
239+
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value))
240+
.withGroupCriterion("num_value_3_indexed")
241+
.withGroupCriterion("str_value_indexed")
242+
.withGroupCriterion("num_value_unique")
243+
.build(useNestedResult);
244+
245+
final var result = executePlanWithRowLimit(plan, rowLimit);
246+
assertResults(useNestedResult ? this::assertResultNested : this::assertResultFlattened, result, resultOf(0, "0", 0, 0), resultOf(0, "0", 1, 1), resultOf(1, "0", 2, 2), resultOf(1, "1", 3, 3), resultOf(2, "1", 4, 4), resultOf(2, "1", 5, 5));
247+
}
248+
}
249+
221250
@ParameterizedTest(name = "[{displayName}-{index}] {0}")
222251
@BooleanSource
223252
void aggregateNoRecords(final boolean useNestedResult) {
@@ -285,65 +314,64 @@ void aggregateNoRecordsNoGroupNoAggregate(final boolean useNestedResult) {
285314
}
286315
}
287316

288-
@ParameterizedTest(name = "[{displayName}-{index}] {0}")
289-
@BooleanSource
290-
void test(final boolean useNestedResult) {
317+
@Test
318+
void aggregateHitScanLimitReached() {
291319
try (final var context = openContext()) {
292320
openSimpleRecordStore(context, NO_HOOK);
293321

294322
final var plan =
295323
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord")
296324
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value))
297325
.withGroupCriterion("str_value_indexed")
298-
.build(useNestedResult);
299-
300-
/*
301-
num_value_2 str_value_indexed
302-
0 "0"
303-
1 "0"
304-
2 "0"
305-
*/
306-
/*
307-
RecordCursorResult<QueryResult> result1 = executePlanWithRecordScanLimit(plan, 1, null, useNestedResult,
308-
List.of(resultOf("0", 0)));
309-
RecordCursorResult<QueryResult> result2 = executePlanWithRecordScanLimit(plan, 2, null, useNestedResult,
310-
List.of(resultOf("0", 1)));
311-
// only scanned 2 rows?
312-
RecordCursorResult<QueryResult> result3 = executePlanWithRecordScanLimit(plan, 3, null, useNestedResult,
313-
List.of(resultOf("0", 1)));
314-
315-
*/
316-
// only scanned 3 rows?
317-
RecordCursorResult<QueryResult> result4 = executePlanWithRecordScanLimit(plan, 4, null, useNestedResult,
318-
List.of(resultOf("0", 3)));
319-
/*
320-
RecordCursorResult<QueryResult> result5 = executePlanWithRecordScanLimit(plan, 5, null, useNestedResult,
321-
List.of(resultOf("0", 3), resultOf("1", 3)));
322-
RecordCursorResult<QueryResult> result6 = executePlanWithRecordScanLimit(plan, 6, null, useNestedResult,
323-
List.of(resultOf("0", 3), resultOf("1", 7)));
324-
RecordCursorResult<QueryResult> result7 = executePlanWithRecordScanLimit(plan, 7, null, useNestedResult,
325-
List.of(resultOf("0", 3), resultOf("1", 12)));
326-
327-
*/
326+
.build(false);
327+
328+
// In the testing data, there are 2 groups, each group has 3 rows.
329+
// recordScanLimit = 5: scans 3 rows, and the 4th scan hits SCAN_LIMIT_REACHED
330+
// although the first group contains exactly 3 rows, we don't know we've finished the first group before we get to the 4th row, so nothing is returned, continuation is back to START
331+
RecordCursorContinuation continuation1 = executePlanWithRecordScanLimit(plan, 5, null, null);
332+
Assertions.assertEquals(RecordCursorStartContinuation.START, continuation1);
333+
// recordScanLimit = 6: scans 4 rows, and the 5th scan hits SCAN_LIMIT_REACHED, we know that we've finished the 1st group, aggregated result is returned
334+
RecordCursorContinuation continuation2 = executePlanWithRecordScanLimit(plan, 6, continuation1.toBytes(), resultOf("0", 3));
335+
// continue with recordScanLimit = 5, scans 3 rows and hits SCAN_LIMIT_REACHED
336+
// again, we don't know that we've finished the 2nd group, nothing is returned, continuation is back to where the scan starts
337+
RecordCursorContinuation continuation3 = executePlanWithRecordScanLimit(plan, 5, continuation2.toBytes(), null);
338+
Assertions.assertTrue(Arrays.equals(continuation2.toBytes(), continuation3.toBytes()));
339+
// finish the 2nd group, aggregated result is returned, exhausted the source
340+
RecordCursorContinuation continuation4 = executePlanWithRecordScanLimit(plan, 6, continuation3.toBytes(), resultOf("1", 12));
341+
Assertions.assertEquals(RecordCursorEndContinuation.END, continuation4);
328342
}
329343
}
330344

331-
private RecordCursorResult<QueryResult> executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit, byte[] continuation, final boolean useNestedResult, List<?> expectedResult) {
345+
private RecordCursorContinuation executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit, byte[] continuation, @Nullable List<?> expectedResult) {
346+
List<QueryResult> queryResults = new LinkedList<>();
347+
RecordCursor<QueryResult> currentCursor = executePlan(plan, 0, recordScanLimit, continuation);
332348
RecordCursorResult<QueryResult> currentCursorResult;
333-
List<QueryResult> currentQueryResults = new LinkedList<>();
349+
RecordCursorContinuation cursorContinuation;
334350
while (true) {
335-
RecordCursor<QueryResult> currentCursor = executePlan(plan, 0, recordScanLimit, continuation);
336351
currentCursorResult = currentCursor.getNext();
337-
continuation = currentCursorResult.getContinuation().toBytes();
352+
cursorContinuation = currentCursorResult.getContinuation();
338353
if (!currentCursorResult.hasNext()) {
339354
break;
340355
}
341-
currentQueryResults.add(currentCursorResult.get());
342-
System.out.println("current result:" + currentCursorResult.get().getMessage());
356+
queryResults.add(currentCursorResult.get());
343357
}
344-
assertResults(useNestedResult ? this::assertResultNested : this::assertResultFlattened, currentQueryResults, expectedResult.toArray(new List<?>[0]));
345-
System.out.println("getNoNextReson:" + currentCursorResult.getNoNextReason());
346-
return currentCursorResult;
358+
if (expectedResult == null) {
359+
Assertions.assertTrue(queryResults.isEmpty());
360+
} else {
361+
assertResults(this::assertResultFlattened, queryResults, expectedResult);
362+
}
363+
return cursorContinuation;
364+
}
365+
366+
private static Stream<Arguments> provideArguments() {
367+
// (boolean, rowLimit)
368+
// setting rowLimit = 0 is equivalent to no limit
369+
List<Arguments> arguments = new LinkedList<>();
370+
for (int i = 0; i <= 4; i++) {
371+
arguments.add(Arguments.of(false, i));
372+
arguments.add(Arguments.of(true, i));
373+
}
374+
return arguments.stream();
347375
}
348376

349377
private void populateDB(final int numRecords) throws Exception {
@@ -356,6 +384,7 @@ private void populateDB(final int numRecords) throws Exception {
356384
recBuilder.setNumValue2(i);
357385
recBuilder.setNumValue3Indexed(i / 2); // some field that changes every 2nd record
358386
recBuilder.setStrValueIndexed(Integer.toString(i / 3)); // some field that changes every 3rd record
387+
recBuilder.setNumValueUnique(i);
359388
recordStore.saveRecord(recBuilder.build());
360389
}
361390
commit(context);
@@ -368,7 +397,7 @@ private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final
368397
final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build();
369398
ExecuteState executeState;
370399
if (recordScanLimit > 0) {
371-
executeState = new ExecuteState(RecordScanLimiterFactory.enforce(recordScanLimit), null);
400+
executeState = new ExecuteState(RecordScanLimiterFactory.enforce(recordScanLimit), ByteScanLimiterFactory.tracking());
372401
} else {
373402
executeState = ExecuteState.NO_LIMITS;
374403
}
@@ -392,6 +421,40 @@ private List<QueryResult> executePlan(final RecordQueryPlan plan) {
392421
}
393422
}
394423

424+
@Nonnull
425+
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final int rowLimit, final byte[] continuation) {
426+
final var types = plan.getDynamicTypes();
427+
final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build();
428+
ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE;
429+
executeProperties = executeProperties.setReturnedRowLimit(rowLimit);
430+
try {
431+
return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties);
432+
} catch (final Throwable t) {
433+
throw Assertions.<RuntimeException>fail(t);
434+
}
435+
}
436+
437+
private List<QueryResult> executePlanWithRowLimit(final RecordQueryPlan plan, final int rowLimit) {
438+
byte[] continuation = null;
439+
List<QueryResult> queryResults = new LinkedList<>();
440+
while (true) {
441+
RecordCursor<QueryResult> currentCursor = executePlan(plan, rowLimit, continuation);
442+
RecordCursorResult<QueryResult> currentCursorResult;
443+
while (true) {
444+
currentCursorResult = currentCursor.getNext();
445+
continuation = currentCursorResult.getContinuation().toBytes();
446+
if (!currentCursorResult.hasNext()) {
447+
break;
448+
}
449+
queryResults.add(currentCursorResult.get());
450+
}
451+
if (currentCursorResult.getNoNextReason() == RecordCursor.NoNextReason.SOURCE_EXHAUSTED) {
452+
break;
453+
}
454+
}
455+
return queryResults;
456+
}
457+
395458
private void assertResults(@Nonnull final BiConsumer<QueryResult, List<?>> checkConsumer, @Nonnull final List<QueryResult> actual, @Nonnull final List<?>... expected) {
396459
Assertions.assertEquals(expected.length, actual.size());
397460
for (var i = 0 ; i < actual.size() ; i++) {

0 commit comments

Comments
 (0)