Skip to content

Commit 363a435

Browse files
authored
EXISTS queries now honor continuation from previous runs to avoid infinite loops (#3221)
The `RecordQueryFirstOrDefaultPlan` constructs a special cursor which has cardinality at most one. The continuation it returns back to the user is not based on the inner cursor but only on whether the cursor has returned any data. Previously, we were taking the `FutureCursor`'s continuation (which is always `\x00`) and giving it back to the `inner` plan, which would generally result in the plan resuming from the beginning. That is not the appropriate way to interpret the continuation, so this updates the logic to instead skip creating the inner cursor if we get a non-beginning continuation, which is in line with what the `FutureCursor`'s continuation means. This fixes #3219.
1 parent 8682131 commit 363a435

File tree

11 files changed

+474
-33
lines changed

11 files changed

+474
-33
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCursor.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.function.Consumer;
6060
import java.util.function.Function;
6161
import java.util.function.Predicate;
62+
import java.util.function.Supplier;
6263
import java.util.stream.Stream;
6364
import java.util.stream.StreamSupport;
6465

@@ -884,24 +885,57 @@ static <T> RecordCursor<T> fromFuture(@Nonnull CompletableFuture<T> future) {
884885

885886
@Nonnull
886887
static <T> RecordCursor<T> fromFuture(@Nonnull Executor executor, @Nonnull CompletableFuture<T> future) {
887-
return new FutureCursor<>(executor, future);
888+
return fromFuture(executor, future, null);
888889
}
889890

890891
/**
891892
* Get a new cursor that has the contents of the given future as its only record.
892893
* If the continuation is nonnull, return an empty cursor since a {@link FutureCursor} returns only a single record.
894+
* Note that in general, this can lead to futures being started that are not properly waited on,
895+
* so it is generally advised that the user use {@link #fromFuture(Executor, Supplier, byte[])} instead
896+
* of this one. See also the warning on {@link #fromFuture(Executor, Supplier, byte[])} for a word
897+
* about if the work powering the underlying future has more complicated state that should be
898+
* included in the cursor's continuation.
899+
*
893900
* @param executor an executor for executing the future
894901
* @param future a future that completes to the only element of the cursor
895902
* @param continuation a continuation from a future cursor to determine whether the cursor has a single element or no elements
896903
* @param <T> the result type of the future
897904
* @return a new cursor producing the contents of {@code future} if the continuation is nonnull and an empty cursor otherwise
905+
* @see #fromFuture(Executor, Supplier, byte[])
898906
*/
899907
@Nonnull
900908
static <T> RecordCursor<T> fromFuture(@Nonnull Executor executor, @Nonnull CompletableFuture<T> future, @Nullable byte[] continuation) {
909+
return fromFuture(executor, () -> future, continuation);
910+
}
911+
912+
/**
913+
* Get a new cursor that has the contents of the lazily-evaluated future as its only record.
914+
* If the continuation is nonnull, return an empty cursor since a {@link FutureCursor} returns only a single record.
915+
* In that case, the {@code futureSupplier} will not be evaluated to avoid queuing up asynchronous work
916+
* that is not guaranteed to have anyone wait on. For that reason, this should generally be preferred
917+
* to {@link #fromFuture(Executor, CompletableFuture, byte[])}.
918+
*
919+
* <p>
920+
* <em>Warning:</em> Using a {@link FutureCursor} here may lead to incorrect results if the underlying
921+
* future is backed by something that has some kind of state that should be included in the continuation.
922+
* For example, if the future is backed by a cursor that may stop for an out-of-band {@link NoNextReason},
923+
* it may be desirable to include the progress included in the underlying cursor in the final continuation.
924+
* That is not done by this class, so something more bespoke may be required.
925+
* </p>
926+
*
927+
* @param executor an executor for executing the future
928+
* @param futureSupplier a supplier of a future that completes to the only element of the cursor
929+
* @param continuation a continuation from a future cursor to determine whether the cursor has a single element or no elements
930+
* @param <T> the result type of the future
931+
* @return a new cursor producing the contents of {@code future} if the continuation is nonnull and an empty cursor otherwise
932+
*/
933+
@Nonnull
934+
static <T> RecordCursor<T> fromFuture(@Nonnull Executor executor, @Nonnull Supplier<CompletableFuture<T>> futureSupplier, @Nullable byte[] continuation) {
901935
if (continuation == null) {
902-
return new FutureCursor<>(executor, future);
936+
return new FutureCursor<>(executor, futureSupplier.get());
903937
} else {
904-
return RecordCursor.empty();
938+
return RecordCursor.empty(executor);
905939
}
906940
}
907941

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/FutureCursor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ public class FutureCursor<T> implements RecordCursor<T> {
5050
@Nonnull
5151
private static final RecordCursorContinuation notDoneContinuation = ByteArrayContinuation.fromNullable(new byte[] {0});
5252

53+
/**
54+
* Internal constructor. Users should generally call
55+
* {@link RecordCursor#fromFuture(Executor, java.util.function.Supplier, byte[])}
56+
* in order to ensure that continuations from this cursor are properly handled.
57+
*
58+
* @param executor the executor to associate with this cursor
59+
* @param future a future that when completed will provide the single element returned by this cursor
60+
*/
61+
@API(API.Status.INTERNAL)
5362
public FutureCursor(@Nonnull Executor executor, @Nonnull CompletableFuture<T> future) {
5463
this.executor = executor;
5564
this.future = future;

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryFirstOrDefaultPlan.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@
2828
import com.apple.foundationdb.record.PlanHashable;
2929
import com.apple.foundationdb.record.PlanSerializationContext;
3030
import com.apple.foundationdb.record.RecordCursor;
31-
import com.apple.foundationdb.record.cursors.FutureCursor;
3231
import com.apple.foundationdb.record.planprotos.PRecordQueryFirstOrDefaultPlan;
3332
import com.apple.foundationdb.record.planprotos.PRecordQueryPlan;
3433
import com.apple.foundationdb.record.provider.common.StoreTimer;
3534
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreBase;
36-
import com.apple.foundationdb.record.query.plan.explain.ExplainPlanVisitor;
3735
import com.apple.foundationdb.record.query.plan.cascades.AliasMap;
3836
import com.apple.foundationdb.record.query.plan.cascades.CorrelationIdentifier;
3937
import com.apple.foundationdb.record.query.plan.cascades.Memoizer;
@@ -47,6 +45,7 @@
4745
import com.apple.foundationdb.record.query.plan.cascades.values.DerivedValue;
4846
import com.apple.foundationdb.record.query.plan.cascades.values.Value;
4947
import com.apple.foundationdb.record.query.plan.cascades.values.translation.TranslationMap;
48+
import com.apple.foundationdb.record.query.plan.explain.ExplainPlanVisitor;
5049
import com.google.auto.service.AutoService;
5150
import com.google.common.base.Verify;
5251
import com.google.common.collect.ImmutableList;
@@ -96,10 +95,17 @@ public <M extends Message> RecordCursor<QueryResult> executePlan(@Nonnull final
9695
@Nonnull final EvaluationContext context,
9796
@Nullable final byte[] continuation,
9897
@Nonnull final ExecuteProperties executeProperties) {
99-
return new FutureCursor<>(store.getExecutor(),
100-
getChild().executePlan(store, context, continuation, executeProperties).first()
101-
.thenApply(resultOptional ->
102-
resultOptional.orElseGet(() -> QueryResult.ofComputed(onEmptyResultValue.eval(store, context)))));
98+
// Note that a null child continuation is always handed to the child cursor below.
99+
// This is because the returned FutureCursor only ever returns a single value, and so if
100+
// if that lambda is called, it indicates that the original continuation is null, and we
101+
// are starting the plan from the beginning. That this doesn't handle the inner cursor
102+
// halting with an out-of-band no-next-reasons, which currently is treated the same way
103+
// as the inner cursor being empty.
104+
// See: https://github.com/FoundationDB/fdb-record-layer/issues/3220
105+
return RecordCursor.fromFuture(store.getExecutor(),
106+
() -> getChild().executePlan(store, context, null, executeProperties).first()
107+
.thenApply(resultOptional -> resultOptional.orElseGet(() -> QueryResult.ofComputed(onEmptyResultValue.eval(store, context)))),
108+
continuation);
103109
}
104110

105111
@Override

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/RecordCursorTest.java

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.apple.foundationdb.async.MoreAsyncUtil;
2525
import com.apple.foundationdb.record.cursors.FilterCursor;
2626
import com.apple.foundationdb.record.cursors.FirableCursor;
27-
import com.apple.foundationdb.record.cursors.FutureCursor;
2827
import com.apple.foundationdb.record.cursors.LazyCursor;
2928
import com.apple.foundationdb.record.cursors.MapResultCursor;
3029
import com.apple.foundationdb.record.cursors.RowLimitedCursor;
@@ -63,6 +62,7 @@
6362
import java.util.TimerTask;
6463
import java.util.concurrent.CancellationException;
6564
import java.util.concurrent.CompletableFuture;
65+
import java.util.concurrent.CompletionException;
6666
import java.util.concurrent.ExecutionException;
6767
import java.util.concurrent.Executor;
6868
import java.util.concurrent.ScheduledExecutorService;
@@ -81,11 +81,13 @@
8181
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
8282
import static org.hamcrest.Matchers.instanceOf;
8383
import static org.hamcrest.Matchers.is;
84+
import static org.hamcrest.Matchers.notNullValue;
8485
import static org.hamcrest.Matchers.oneOf;
8586
import static org.junit.jupiter.api.Assertions.assertEquals;
8687
import static org.junit.jupiter.api.Assertions.assertFalse;
8788
import static org.junit.jupiter.api.Assertions.assertNotNull;
8889
import static org.junit.jupiter.api.Assertions.assertNull;
90+
import static org.junit.jupiter.api.Assertions.assertSame;
8991
import static org.junit.jupiter.api.Assertions.assertThrows;
9092
import static org.junit.jupiter.api.Assertions.assertTrue;
9193
import static org.junit.jupiter.api.Assertions.fail;
@@ -1313,7 +1315,7 @@ private static RecordCursor<Integer> mapPipelinedCursorToClose(int iteration, Co
13131315
private static RecordCursor<String> singletonFlatMapPipelinedCursorToClose(int iteration, CompletableFuture<Void> signal) {
13141316
return RecordCursor.flatMapPipelined(
13151317
outerContinuation -> RecordCursor.fromList(EXECUTOR, IntStream.range(0, iteration % 199).boxed().collect(Collectors.toList()), outerContinuation),
1316-
(outerValue, innerContinuation) -> new FutureCursor<>(EXECUTOR, signal.thenApplyAsync(ignore -> String.valueOf(outerValue), EXECUTOR)),
1318+
(outerValue, innerContinuation) -> RecordCursor.fromFuture(EXECUTOR, signal.thenApplyAsync(ignore -> String.valueOf(outerValue), EXECUTOR), innerContinuation),
13171319
null,
13181320
null,
13191321
iteration % 19 + 2
@@ -1393,4 +1395,113 @@ void pipelinedCursorAfterClosing(@Nonnull BiFunction<Integer, CompletableFuture<
13931395
assertThat(executionException.getCause(), Matchers.instanceOf(CancellationException.class));
13941396
}
13951397
}
1398+
1399+
@Test
1400+
void futureCursorTest() {
1401+
CompletableFuture<Integer> future = new CompletableFuture<>();
1402+
final RecordCursorContinuation continuation;
1403+
try (RecordCursor<Integer> fromFuture = RecordCursor.fromFuture(EXECUTOR, future, null)) {
1404+
assertSame(EXECUTOR, fromFuture.getExecutor());
1405+
future.complete(42);
1406+
RecordCursorResult<Integer> result = fromFuture.getNext();
1407+
assertTrue(result.hasNext(), "first result from future should have value");
1408+
assertEquals(42, result.get());
1409+
assertFalse(result.getContinuation().isEnd());
1410+
continuation = result.getContinuation();
1411+
1412+
result = fromFuture.getNext();
1413+
assertFalse(result.hasNext(), "second result from future should have value");
1414+
assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, result.getNoNextReason());
1415+
assertTrue(result.getContinuation().isEnd());
1416+
}
1417+
1418+
CompletableFuture<Integer> secondFuture = new CompletableFuture<>();
1419+
try (RecordCursor<Integer> fromFuture = RecordCursor.fromFuture(EXECUTOR, secondFuture, continuation.toBytes())) {
1420+
assertSame(EXECUTOR, fromFuture.getExecutor());
1421+
1422+
RecordCursorResult<Integer> result = fromFuture.getNext();
1423+
assertFalse(result.hasNext(), "future should not have value when resuming from a continuation");
1424+
assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, result.getNoNextReason());
1425+
assertTrue(result.getContinuation().isEnd());
1426+
}
1427+
}
1428+
1429+
@Test
1430+
void futureCursorFromSupplierTest() {
1431+
CompletableFuture<Integer> future = new CompletableFuture<>();
1432+
final RecordCursorContinuation continuation;
1433+
try (RecordCursor<Integer> fromFuture = RecordCursor.fromFuture(EXECUTOR, () -> future, null)) {
1434+
assertSame(EXECUTOR, fromFuture.getExecutor());
1435+
future.complete(1066);
1436+
RecordCursorResult<Integer> result = fromFuture.getNext();
1437+
assertTrue(result.hasNext(), "first result from future should have value");
1438+
assertEquals(1066, result.get());
1439+
assertFalse(result.getContinuation().isEnd());
1440+
continuation = result.getContinuation();
1441+
1442+
result = fromFuture.getNext();
1443+
assertFalse(result.hasNext(), "second result from future should have value");
1444+
assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, result.getNoNextReason());
1445+
assertTrue(result.getContinuation().isEnd());
1446+
}
1447+
1448+
try (RecordCursor<Integer> fromFuture = RecordCursor.fromFuture(EXECUTOR, () -> fail("should not be run"), continuation.toBytes())) {
1449+
assertSame(EXECUTOR, fromFuture.getExecutor());
1450+
1451+
RecordCursorResult<Integer> result = fromFuture.getNext();
1452+
assertFalse(result.hasNext(), "future should not have value when resuming from a continuation");
1453+
assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, result.getNoNextReason());
1454+
assertTrue(result.getContinuation().isEnd());
1455+
}
1456+
}
1457+
1458+
@Test
1459+
void futureCursorCompletesWhenUnderlyingCompletes() throws Exception {
1460+
final CompletableFuture<Integer> future = new CompletableFuture<>();
1461+
final RecordCursorContinuation continuation;
1462+
try (RecordCursor<Integer> fromFuture = RecordCursor.fromFuture(EXECUTOR, future, null)) {
1463+
assertSame(EXECUTOR, fromFuture.getExecutor());
1464+
CompletableFuture<RecordCursorResult<Integer>> resultFuture = fromFuture.onNext();
1465+
assertFalse(resultFuture.isDone(), "result should not be done until underlying completes");
1466+
future.complete(1819);
1467+
1468+
// The previous result future should complete quickly. The timeout here is
1469+
// a bit of a hedge, but as currently written, the resultFuture should actually complete
1470+
// on this thread, and so the resultFuture should already be completed
1471+
// by now. But that's not necessarily part of the contract of the cursor, so
1472+
// instead, just assert here that we complete in a reasonable amount of time
1473+
RecordCursorResult<Integer> result = resultFuture.get(1, TimeUnit.SECONDS);
1474+
assertTrue(result.hasNext());
1475+
assertEquals(1819, result.get());
1476+
continuation = result.getContinuation();
1477+
assertFalse(continuation.isEnd());
1478+
assertFalse(continuation.toByteString().isEmpty());
1479+
assertThat(continuation.toBytes(), notNullValue());
1480+
}
1481+
1482+
try (RecordCursor<Integer> fromFuture = RecordCursor.fromFuture(EXECUTOR, () -> fail("should not run"), continuation.toBytes())) {
1483+
assertSame(EXECUTOR, fromFuture.getExecutor());
1484+
CompletableFuture<RecordCursorResult<Integer>> resultFuture = fromFuture.onNext();
1485+
assertTrue(resultFuture.isDone(), "empty result should not need to wait");
1486+
RecordCursorResult<Integer> result = resultFuture.get();
1487+
assertFalse(result.hasNext());
1488+
assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, result.getNoNextReason());
1489+
assertTrue(result.getContinuation().isEnd());
1490+
}
1491+
}
1492+
1493+
@Test
1494+
void futureCursorPropagatesError() {
1495+
final CompletableFuture<Integer> future = new CompletableFuture<>();
1496+
try (RecordCursor<Integer> fromFuture = RecordCursor.fromFuture(EXECUTOR, future, null)) {
1497+
assertSame(EXECUTOR, fromFuture.getExecutor());
1498+
CompletableFuture<RecordCursorResult<Integer>> resultFuture = fromFuture.onNext();
1499+
assertFalse(resultFuture.isDone(), "result should not be done until underlying completes");
1500+
1501+
final RecordCoreException error = new RecordCoreException("test error");
1502+
future.completeExceptionally(error);
1503+
CompletionException futureError = assertThrows(CompletionException.class, future::join);
1504+
assertSame(error, futureError.getCause(), "result should complete exception while propagating the cause");
1505+
}
1506+
}
13961507
}

0 commit comments

Comments
 (0)