Skip to content

Commit 5bec9ad

Browse files
committed
Revert "Remove CompletableFuture from RecoverySourceHandler (#93665)"
This reverts commit 67509c0.
1 parent 8e3b57a commit 5bec9ad

File tree

2 files changed

+45
-13
lines changed

2 files changed

+45
-13
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import java.util.Objects;
8080
import java.util.Set;
8181
import java.util.concurrent.BlockingQueue;
82+
import java.util.concurrent.CompletableFuture;
8283
import java.util.concurrent.ConcurrentLinkedDeque;
8384
import java.util.concurrent.CopyOnWriteArrayList;
8485
import java.util.concurrent.LinkedBlockingQueue;
@@ -199,7 +200,12 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
199200
retentionLeaseRef.set(
200201
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))
201202
);
202-
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads);
203+
},
204+
shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ",
205+
shard,
206+
cancellableThreads,
207+
logger
208+
);
203209
final Closeable retentionLock = shard.acquireHistoryRetentionLock();
204210
resources.add(retentionLock);
205211
final long startingSeqNo;
@@ -286,7 +292,7 @@ && isTargetSameHistory()
286292
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
287293
deleteRetentionLeaseStep.onResponse(null);
288294
}
289-
}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads);
295+
}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger);
290296

291297
deleteRetentionLeaseStep.whenComplete(ignored -> {
292298
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
@@ -317,7 +323,8 @@ && isTargetSameHistory()
317323
() -> shard.initiateTracking(request.targetAllocationId()),
318324
shardId + " initiating tracking of " + request.targetAllocationId(),
319325
shard,
320-
cancellableThreads
326+
cancellableThreads,
327+
logger
321328
);
322329

323330
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
@@ -398,21 +405,44 @@ static void runUnderPrimaryPermit(
398405
CancellableThreads.Interruptible runnable,
399406
String reason,
400407
IndexShard primary,
401-
CancellableThreads cancellableThreads
408+
CancellableThreads cancellableThreads,
409+
Logger logger
402410
) {
403411
cancellableThreads.execute(() -> {
404-
final var permit = new ListenableActionFuture<Releasable>();
405-
primary.acquirePrimaryOperationPermit(permit, ThreadPool.Names.SAME, reason);
406-
try (var ignored = FutureUtils.get(permit)) {
412+
CompletableFuture<Releasable> permit = new CompletableFuture<>();
413+
414+
// this wrapping looks unnecessary necessary, see #93290; TODO remove it
415+
final ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
416+
@Override
417+
public void onResponse(Releasable releasable) {
418+
if (permit.complete(releasable) == false) {
419+
releasable.close();
420+
}
421+
}
422+
423+
@Override
424+
public void onFailure(Exception e) {
425+
permit.completeExceptionally(e);
426+
}
427+
};
428+
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
429+
try (Releasable ignored = FutureUtils.get(permit)) {
407430
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
408431
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
409432
if (primary.isRelocatedPrimary()) {
410433
throw new IndexShardRelocatedException(primary.shardId());
411434
}
412435
runnable.run();
413436
} finally {
414-
// add a listener to release the permit because we might have been interrupted while waiting (double-releasing is ok)
415-
permit.addListener(ActionListener.wrap(Releasable::close, e -> {}));
437+
// just in case we got an exception (likely interrupted) while waiting for the get
438+
permit.whenComplete((r, e) -> {
439+
if (r != null) {
440+
r.close();
441+
}
442+
if (e != null) {
443+
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
444+
}
445+
});
416446
}
417447
});
418448
}
@@ -980,7 +1010,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
9801010
addRetentionLeaseStep.addListener(listener.map(rr -> newLease));
9811011
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
9821012
}
983-
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads);
1013+
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger);
9841014
}
9851015

9861016
boolean hasSameLegacySyncId(Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
@@ -1221,7 +1251,8 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
12211251
() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
12221252
shardId + " marking " + request.targetAllocationId() + " as in sync",
12231253
shard,
1224-
cancellableThreads
1254+
cancellableThreads,
1255+
logger
12251256
);
12261257
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
12271258
final StepListener<Void> finalizeListener = new StepListener<>();
@@ -1232,7 +1263,8 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
12321263
() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
12331264
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint",
12341265
shard,
1235-
cancellableThreads
1266+
cancellableThreads,
1267+
logger
12361268
);
12371269

12381270
if (request.isPrimaryRelocation()) {

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
798798
Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test"));
799799
cancelingThread.start();
800800
try {
801-
RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads);
801+
RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger);
802802
} catch (CancellableThreads.ExecutionCancelledException e) {
803803
// expected.
804804
}

0 commit comments

Comments
 (0)