29
29
import com .apple .foundationdb .record .logging .KeyValueLogMessage ;
30
30
import com .apple .foundationdb .record .logging .LogMessageKeys ;
31
31
import com .apple .foundationdb .record .provider .foundationdb .runners .ExponentialDelay ;
32
+ import com .apple .foundationdb .record .provider .foundationdb .runners .FutureAutoClose ;
32
33
import com .apple .foundationdb .record .provider .foundationdb .runners .TransactionalRunner ;
33
34
import com .apple .foundationdb .record .provider .foundationdb .synchronizedsession .SynchronizedSessionRunner ;
34
35
import com .apple .foundationdb .record .util .Result ;
38
39
39
40
import javax .annotation .Nonnull ;
40
41
import javax .annotation .Nullable ;
41
- import java .util .ArrayList ;
42
42
import java .util .List ;
43
43
import java .util .Map ;
44
44
import java .util .UUID ;
@@ -57,6 +57,7 @@ public class FDBDatabaseRunnerImpl implements FDBDatabaseRunner {
57
57
@ Nonnull
58
58
private final FDBDatabase database ;
59
59
private final TransactionalRunner transactionalRunner ;
60
+ private final FutureAutoClose futureManager ;
60
61
@ Nonnull
61
62
private FDBRecordContextConfig .Builder contextConfigBuilder ;
62
63
@ Nonnull
@@ -67,22 +68,19 @@ public class FDBDatabaseRunnerImpl implements FDBDatabaseRunner {
67
68
private long initialDelayMillis ;
68
69
69
70
private boolean closed ;
70
- @ Nonnull
71
- private final List <CompletableFuture <?>> futuresToCompleteExceptionally ;
72
71
73
72
@ API (API .Status .INTERNAL )
74
73
FDBDatabaseRunnerImpl (@ Nonnull FDBDatabase database , FDBRecordContextConfig .Builder contextConfigBuilder ) {
75
74
this .database = database ;
76
75
this .contextConfigBuilder = contextConfigBuilder ;
77
76
this .executor = database .newContextExecutor (contextConfigBuilder .getMdcContext ());
78
77
this .transactionalRunner = new TransactionalRunner (database , contextConfigBuilder );
78
+ this .futureManager = new FutureAutoClose ();
79
79
80
80
final FDBDatabaseFactory factory = database .getFactory ();
81
81
this .maxAttempts = factory .getMaxAttempts ();
82
82
this .maxDelayMillis = factory .getMaxDelayMillis ();
83
83
this .initialDelayMillis = factory .getInitialDelayMillis ();
84
-
85
- futuresToCompleteExceptionally = new ArrayList <>();
86
84
}
87
85
88
86
@ Override
@@ -225,7 +223,7 @@ LogMessageKeys.MAX_ATTEMPTS, getMaxAttempts(),
225
223
if (getTimer () != null ) {
226
224
future = getTimer ().instrument (FDBStoreTimer .Events .RETRY_DELAY , future , executor );
227
225
}
228
- addFutureToCompleteExceptionally (future );
226
+ futureManager . registerFuture (future );
229
227
return future .thenApply (vignore -> {
230
228
currAttempt ++;
231
229
return true ;
@@ -240,8 +238,7 @@ LogMessageKeys.MAX_ATTEMPTS, getMaxAttempts(),
240
238
@ SuppressWarnings ("squid:S1181" )
241
239
public CompletableFuture <T > runAsync (@ Nonnull final Function <? super FDBRecordContext , CompletableFuture <? extends T >> retriable ,
242
240
@ Nonnull final BiFunction <? super T , Throwable , Result <? extends T , ? extends Throwable >> handlePostTransaction ) {
243
- CompletableFuture <T > future = new CompletableFuture <>();
244
- addFutureToCompleteExceptionally (future );
241
+ CompletableFuture <T > future = futureManager .newFuture ();
245
242
AsyncUtil .whileTrue (() -> {
246
243
try {
247
244
return transactionalRunner .runAsync (currAttempt != 0 , retriable )
@@ -314,13 +311,25 @@ public synchronized void close() {
314
311
return ;
315
312
}
316
313
closed = true ;
317
- if (!futuresToCompleteExceptionally .stream ().allMatch (CompletableFuture ::isDone )) {
318
- final Exception exception = new RunnerClosed ();
319
- for (CompletableFuture <?> future : futuresToCompleteExceptionally ) {
320
- future .completeExceptionally (exception );
314
+ // Ensure we call both close() methods, capturing all exceptions
315
+ RuntimeException caught = null ;
316
+ try {
317
+ futureManager .close ();
318
+ } catch (RuntimeException e ) {
319
+ caught = e ;
320
+ }
321
+ try {
322
+ transactionalRunner .close ();
323
+ } catch (RuntimeException e ) {
324
+ if (caught != null ) {
325
+ caught .addSuppressed (e );
326
+ } else {
327
+ caught = e ;
321
328
}
322
329
}
323
- transactionalRunner .close ();
330
+ if (caught != null ) {
331
+ throw caught ;
332
+ }
324
333
}
325
334
326
335
@ Override
@@ -337,15 +346,4 @@ public SynchronizedSessionRunner startSynchronizedSession(@Nonnull Subspace lock
337
346
public SynchronizedSessionRunner joinSynchronizedSession (@ Nonnull Subspace lockSubspace , @ Nonnull UUID sessionId , long leaseLengthMillis ) {
338
347
return SynchronizedSessionRunner .joinSession (lockSubspace , sessionId , leaseLengthMillis , this );
339
348
}
340
-
341
- private synchronized void addFutureToCompleteExceptionally (@ Nonnull CompletableFuture <?> future ) {
342
- if (closed ) {
343
- final RunnerClosed exception = new RunnerClosed ();
344
- future .completeExceptionally (exception );
345
- throw exception ;
346
- }
347
- futuresToCompleteExceptionally .removeIf (CompletableFuture ::isDone );
348
- futuresToCompleteExceptionally .add (future );
349
- }
350
-
351
349
}
0 commit comments