Skip to content

Commit 68f805a

Browse files
authored
Create FutureManagerRunner and convert FDBDatabaseRunnerImpl to use it. (FoundationDB#3347)
This refactoring moves the future management (completing of futures once the runner closes) out of the FDBDatabaseRunnerImpl into a subclass of TransactionalRunner. This allows the new subclass of TransactionalRunner to be used (e.g in the case where we want another throttling implementation). Resolves FoundationDB#3346
1 parent a3b55bf commit 68f805a

File tree

4 files changed

+315
-25
lines changed

4 files changed

+315
-25
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBDatabaseRunnerImpl.java

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
3030
import com.apple.foundationdb.record.logging.LogMessageKeys;
3131
import com.apple.foundationdb.record.provider.foundationdb.runners.ExponentialDelay;
32+
import com.apple.foundationdb.record.provider.foundationdb.runners.FutureAutoClose;
3233
import com.apple.foundationdb.record.provider.foundationdb.runners.TransactionalRunner;
3334
import com.apple.foundationdb.record.provider.foundationdb.synchronizedsession.SynchronizedSessionRunner;
3435
import com.apple.foundationdb.record.util.Result;
@@ -38,7 +39,6 @@
3839

3940
import javax.annotation.Nonnull;
4041
import javax.annotation.Nullable;
41-
import java.util.ArrayList;
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.UUID;
@@ -57,6 +57,7 @@ public class FDBDatabaseRunnerImpl implements FDBDatabaseRunner {
5757
@Nonnull
5858
private final FDBDatabase database;
5959
private final TransactionalRunner transactionalRunner;
60+
private final FutureAutoClose futureManager;
6061
@Nonnull
6162
private FDBRecordContextConfig.Builder contextConfigBuilder;
6263
@Nonnull
@@ -67,22 +68,19 @@ public class FDBDatabaseRunnerImpl implements FDBDatabaseRunner {
6768
private long initialDelayMillis;
6869

6970
private boolean closed;
70-
@Nonnull
71-
private final List<CompletableFuture<?>> futuresToCompleteExceptionally;
7271

7372
@API(API.Status.INTERNAL)
7473
FDBDatabaseRunnerImpl(@Nonnull FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder) {
7574
this.database = database;
7675
this.contextConfigBuilder = contextConfigBuilder;
7776
this.executor = database.newContextExecutor(contextConfigBuilder.getMdcContext());
7877
this.transactionalRunner = new TransactionalRunner(database, contextConfigBuilder);
78+
this.futureManager = new FutureAutoClose();
7979

8080
final FDBDatabaseFactory factory = database.getFactory();
8181
this.maxAttempts = factory.getMaxAttempts();
8282
this.maxDelayMillis = factory.getMaxDelayMillis();
8383
this.initialDelayMillis = factory.getInitialDelayMillis();
84-
85-
futuresToCompleteExceptionally = new ArrayList<>();
8684
}
8785

8886
@Override
@@ -225,7 +223,7 @@ LogMessageKeys.MAX_ATTEMPTS, getMaxAttempts(),
225223
if (getTimer() != null) {
226224
future = getTimer().instrument(FDBStoreTimer.Events.RETRY_DELAY, future, executor);
227225
}
228-
addFutureToCompleteExceptionally(future);
226+
futureManager.registerFuture(future);
229227
return future.thenApply(vignore -> {
230228
currAttempt++;
231229
return true;
@@ -240,8 +238,7 @@ LogMessageKeys.MAX_ATTEMPTS, getMaxAttempts(),
240238
@SuppressWarnings("squid:S1181")
241239
public CompletableFuture<T> runAsync(@Nonnull final Function<? super FDBRecordContext, CompletableFuture<? extends T>> retriable,
242240
@Nonnull final BiFunction<? super T, Throwable, Result<? extends T, ? extends Throwable>> handlePostTransaction) {
243-
CompletableFuture<T> future = new CompletableFuture<>();
244-
addFutureToCompleteExceptionally(future);
241+
CompletableFuture<T> future = futureManager.newFuture();
245242
AsyncUtil.whileTrue(() -> {
246243
try {
247244
return transactionalRunner.runAsync(currAttempt != 0, retriable)
@@ -314,13 +311,25 @@ public synchronized void close() {
314311
return;
315312
}
316313
closed = true;
317-
if (!futuresToCompleteExceptionally.stream().allMatch(CompletableFuture::isDone)) {
318-
final Exception exception = new RunnerClosed();
319-
for (CompletableFuture<?> future : futuresToCompleteExceptionally) {
320-
future.completeExceptionally(exception);
314+
// Ensure we call both close() methods, capturing all exceptions
315+
RuntimeException caught = null;
316+
try {
317+
futureManager.close();
318+
} catch (RuntimeException e) {
319+
caught = e;
320+
}
321+
try {
322+
transactionalRunner.close();
323+
} catch (RuntimeException e) {
324+
if (caught != null) {
325+
caught.addSuppressed(e);
326+
} else {
327+
caught = e;
321328
}
322329
}
323-
transactionalRunner.close();
330+
if (caught != null) {
331+
throw caught;
332+
}
324333
}
325334

326335
@Override
@@ -337,15 +346,4 @@ public SynchronizedSessionRunner startSynchronizedSession(@Nonnull Subspace lock
337346
public SynchronizedSessionRunner joinSynchronizedSession(@Nonnull Subspace lockSubspace, @Nonnull UUID sessionId, long leaseLengthMillis) {
338347
return SynchronizedSessionRunner.joinSession(lockSubspace, sessionId, leaseLengthMillis, this);
339348
}
340-
341-
private synchronized void addFutureToCompleteExceptionally(@Nonnull CompletableFuture<?> future) {
342-
if (closed) {
343-
final RunnerClosed exception = new RunnerClosed();
344-
future.completeExceptionally(exception);
345-
throw exception;
346-
}
347-
futuresToCompleteExceptionally.removeIf(CompletableFuture::isDone);
348-
futuresToCompleteExceptionally.add(future);
349-
}
350-
351349
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* FutureAutoClose.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record.provider.foundationdb.runners;
22+
23+
import com.apple.foundationdb.annotation.API;
24+
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner;
25+
26+
import javax.annotation.Nonnull;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.CompletableFuture;
30+
31+
/**
32+
* A helper that completes futures once closed.
33+
* <p>
34+
* This is meant to be used together with other {@link FDBDatabaseRunner}s and help manage any {@link CompletableFuture} instances
35+
* once the runner closes. This helper should be lifecycle coupled to the runners it accompanies.
36+
* <p>
37+
* Futures created externally can be registered with this class. This class can also create new futures (and register them).
38+
* When {@link #close()} is called, any registered {@link CompletableFuture} which is still incomplete will be completed exceptionally
39+
* with a {@link FDBDatabaseRunner.RunnerClosed} exception.
40+
* <p>
41+
* It is the responsibility of the users of the runner to ensure futures that it creates are registered.
42+
* <p>
43+
* Normally, only the top level (root) of the future chain needs registration. Completing this future
44+
* will cause the completion to trickle down to the dependent futures.
45+
*/
46+
@API(API.Status.INTERNAL)
47+
public class FutureAutoClose implements AutoCloseable {
48+
@Nonnull
49+
private final List<CompletableFuture<?>> futuresToClose;
50+
private boolean closed;
51+
52+
public FutureAutoClose() {
53+
futuresToClose = new ArrayList<>();
54+
closed = false;
55+
}
56+
57+
public <T> CompletableFuture<T> newFuture() {
58+
return registerFuture(new CompletableFuture<>());
59+
}
60+
61+
public synchronized <T> CompletableFuture<T> registerFuture(CompletableFuture<T> future) {
62+
if (isClosed()) {
63+
final FDBDatabaseRunner.RunnerClosed exception = new FDBDatabaseRunner.RunnerClosed();
64+
future.completeExceptionally(exception);
65+
throw exception;
66+
}
67+
futuresToClose.removeIf(CompletableFuture::isDone);
68+
futuresToClose.add(future);
69+
return future;
70+
}
71+
72+
@Override
73+
public synchronized void close() {
74+
if (!isClosed()) {
75+
closed = true;
76+
77+
if (!futuresToClose.stream().allMatch(CompletableFuture::isDone)) {
78+
final Exception exception = new FDBDatabaseRunner.RunnerClosed();
79+
for (CompletableFuture<?> future : futuresToClose) {
80+
future.completeExceptionally(exception);
81+
}
82+
}
83+
futuresToClose.clear();
84+
}
85+
}
86+
87+
public boolean isClosed() {
88+
return closed;
89+
}
90+
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,5 +175,4 @@ public synchronized void close() {
175175
contextsToClose.forEach(FDBRecordContext::close);
176176
this.closed = true;
177177
}
178-
179178
}

0 commit comments

Comments
 (0)