Skip to content

Commit 134f51b

Browse files
authored
Small cleanup in PRTS#doRecovery (#93549)
Renames `failureHandler` to `cleanupOnly` and makes use of `ActionListener#run` where appropriate.
1 parent b8c9dc9 commit 134f51b

File tree

1 file changed

+34
-42
lines changed

1 file changed

+34
-42
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
222222
final IndexShard indexShard = recoveryTarget.indexShard();
223223
final Releasable onCompletion = Releasables.wrap(recoveryTarget.disableRecoveryMonitor(), recoveryRef);
224224

225-
final var failureHandler = ActionListener.notifyOnce(ActionListener.runBefore(ActionListener.noop().delegateResponse((l, e) -> {
225+
// async version of the catch/finally structure we need, but this does nothing with successes so needs further modification below
226+
final var cleanupOnly = ActionListener.notifyOnce(ActionListener.runBefore(ActionListener.noop().delegateResponse((l, e) -> {
226227
// this will be logged as warning later on...
227228
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
228229
onGoingRecoveries.failRecovery(
@@ -235,28 +236,23 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
235236
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
236237
assert preExistingRequest == null;
237238
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
238-
try {
239-
indexShard.preRecovery(failureHandler.map(v -> {
240-
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
241-
indexShard.prepareForIndexRecovery();
242-
// Skip unnecessary intermediate stages
243-
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
244-
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
245-
indexShard.openEngineAndSkipTranslogRecovery();
246-
recoveryState.getIndex().setFileDetailsComplete();
247-
recoveryState.setStage(RecoveryState.Stage.FINALIZE);
248-
onGoingRecoveries.markRecoveryAsDone(recoveryId);
249-
return null;
250-
}));
251-
} catch (Exception e) {
252-
failureHandler.onFailure(e);
253-
}
254-
239+
ActionListener.run(cleanupOnly.map(v -> {
240+
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
241+
indexShard.prepareForIndexRecovery();
242+
// Skip unnecessary intermediate stages
243+
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
244+
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
245+
indexShard.openEngineAndSkipTranslogRecovery();
246+
recoveryState.getIndex().setFileDetailsComplete();
247+
recoveryState.setStage(RecoveryState.Stage.FINALIZE);
248+
onGoingRecoveries.markRecoveryAsDone(recoveryId);
249+
return null;
250+
}), indexShard::preRecovery);
255251
return;
256252
}
257253

258254
record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {}
259-
final ActionListener<StartRecoveryRequestToSend> toSendListener = failureHandler.map(r -> {
255+
final ActionListener<StartRecoveryRequestToSend> toSendListener = cleanupOnly.map(r -> {
260256
logger.trace(
261257
"{} [{}]: recovery from {}",
262258
r.startRecoveryRequest().shardId(),
@@ -273,30 +269,26 @@ record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, Str
273269
});
274270

275271
if (preExistingRequest == null) {
276-
try {
277-
indexShard.preRecovery(toSendListener.delegateFailure((l, v) -> ActionListener.completeWith(l, () -> {
278-
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
279-
indexShard.prepareForIndexRecovery();
280-
if (indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
281-
// for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
282-
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
283-
final Store store = indexShard.store();
284-
store.incRef();
285-
try {
286-
StoreRecovery.bootstrap(indexShard, store);
287-
} finally {
288-
store.decRef();
289-
}
272+
ActionListener.run(toSendListener.map(v -> {
273+
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
274+
indexShard.prepareForIndexRecovery();
275+
if (indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
276+
// for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
277+
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
278+
final Store store = indexShard.store();
279+
store.incRef();
280+
try {
281+
StoreRecovery.bootstrap(indexShard, store);
282+
} finally {
283+
store.decRef();
290284
}
291-
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
292-
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
293-
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
294-
final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
295-
return new StartRecoveryRequestToSend(startRequest, PeerRecoverySourceService.Actions.START_RECOVERY, startRequest);
296-
})));
297-
} catch (Exception e) {
298-
toSendListener.onFailure(e);
299-
}
285+
}
286+
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
287+
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
288+
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
289+
final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
290+
return new StartRecoveryRequestToSend(startRequest, PeerRecoverySourceService.Actions.START_RECOVERY, startRequest);
291+
}), indexShard::preRecovery);
300292
} else {
301293
toSendListener.onResponse(
302294
new StartRecoveryRequestToSend(

0 commit comments

Comments
 (0)