Skip to content

Commit 67509c0

Browse files
authored
Remove CompletableFuture from RecoverySourceHandler (#93665)
In #93290 we added an assertion to verify that `IndexShard#acquirePrimaryOperationPermit` never completes its listener more than once. We have not seen this assertion trip, nor can we see a way for a double-completion to happen, which means it is ok to replace this use of a `CompletableFuture` with something safer. This commit does so.
1 parent 2702b77 commit 67509c0

File tree

2 files changed

+13
-45
lines changed

2 files changed

+13
-45
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import java.util.Objects;
8080
import java.util.Set;
8181
import java.util.concurrent.BlockingQueue;
82-
import java.util.concurrent.CompletableFuture;
8382
import java.util.concurrent.ConcurrentLinkedDeque;
8483
import java.util.concurrent.CopyOnWriteArrayList;
8584
import java.util.concurrent.LinkedBlockingQueue;
@@ -200,12 +199,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
200199
retentionLeaseRef.set(
201200
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))
202201
);
203-
},
204-
shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ",
205-
shard,
206-
cancellableThreads,
207-
logger
208-
);
202+
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads);
209203
final Closeable retentionLock = shard.acquireHistoryRetentionLock();
210204
resources.add(retentionLock);
211205
final long startingSeqNo;
@@ -292,7 +286,7 @@ && isTargetSameHistory()
292286
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
293287
deleteRetentionLeaseStep.onResponse(null);
294288
}
295-
}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger);
289+
}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads);
296290

297291
deleteRetentionLeaseStep.whenComplete(ignored -> {
298292
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
@@ -323,8 +317,7 @@ && isTargetSameHistory()
323317
() -> shard.initiateTracking(request.targetAllocationId()),
324318
shardId + " initiating tracking of " + request.targetAllocationId(),
325319
shard,
326-
cancellableThreads,
327-
logger
320+
cancellableThreads
328321
);
329322

330323
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
@@ -405,44 +398,21 @@ static void runUnderPrimaryPermit(
405398
CancellableThreads.Interruptible runnable,
406399
String reason,
407400
IndexShard primary,
408-
CancellableThreads cancellableThreads,
409-
Logger logger
401+
CancellableThreads cancellableThreads
410402
) {
411403
cancellableThreads.execute(() -> {
412-
CompletableFuture<Releasable> permit = new CompletableFuture<>();
413-
414-
// this wrapping looks unnecessary necessary, see #93290; TODO remove it
415-
final ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
416-
@Override
417-
public void onResponse(Releasable releasable) {
418-
if (permit.complete(releasable) == false) {
419-
releasable.close();
420-
}
421-
}
422-
423-
@Override
424-
public void onFailure(Exception e) {
425-
permit.completeExceptionally(e);
426-
}
427-
};
428-
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
429-
try (Releasable ignored = FutureUtils.get(permit)) {
404+
final var permit = new ListenableActionFuture<Releasable>();
405+
primary.acquirePrimaryOperationPermit(permit, ThreadPool.Names.SAME, reason);
406+
try (var ignored = FutureUtils.get(permit)) {
430407
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
431408
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
432409
if (primary.isRelocatedPrimary()) {
433410
throw new IndexShardRelocatedException(primary.shardId());
434411
}
435412
runnable.run();
436413
} finally {
437-
// just in case we got an exception (likely interrupted) while waiting for the get
438-
permit.whenComplete((r, e) -> {
439-
if (r != null) {
440-
r.close();
441-
}
442-
if (e != null) {
443-
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
444-
}
445-
});
414+
// add a listener to release the permit because we might have been interrupted while waiting (double-releasing is ok)
415+
permit.addListener(ActionListener.wrap(Releasable::close, e -> {}));
446416
}
447417
});
448418
}
@@ -1010,7 +980,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
1010980
addRetentionLeaseStep.addListener(listener.map(rr -> newLease));
1011981
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
1012982
}
1013-
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger);
983+
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads);
1014984
}
1015985

1016986
boolean hasSameLegacySyncId(Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
@@ -1251,8 +1221,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
12511221
() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
12521222
shardId + " marking " + request.targetAllocationId() + " as in sync",
12531223
shard,
1254-
cancellableThreads,
1255-
logger
1224+
cancellableThreads
12561225
);
12571226
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
12581227
final StepListener<Void> finalizeListener = new StepListener<>();
@@ -1263,8 +1232,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
12631232
() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
12641233
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint",
12651234
shard,
1266-
cancellableThreads,
1267-
logger
1235+
cancellableThreads
12681236
);
12691237

12701238
if (request.isPrimaryRelocation()) {

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
798798
Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test"));
799799
cancelingThread.start();
800800
try {
801-
RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger);
801+
RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads);
802802
} catch (CancellableThreads.ExecutionCancelledException e) {
803803
// expected.
804804
}

0 commit comments

Comments
 (0)