@@ -274,26 +274,13 @@ && isTargetSameHistory()
274
274
}
275
275
});
276
276
277
- final StepListener <ReplicationResponse > deleteRetentionLeaseStep = new StepListener <>();
278
- runUnderPrimaryPermit (() -> {
279
- try {
280
- // If the target previously had a copy of this shard then a file-based recovery might move its global
281
- // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
282
- // new one later on in the recovery.
283
- shard .removePeerRecoveryRetentionLease (
284
- request .targetNode ().getId (),
285
- new ThreadedActionListener <>(shard .getThreadPool ().generic (), deleteRetentionLeaseStep )
286
- );
287
- } catch (RetentionLeaseNotFoundException e ) {
288
- logger .debug ("no peer-recovery retention lease for " + request .targetAllocationId ());
289
- deleteRetentionLeaseStep .onResponse (null );
290
- }
291
- }, shard , cancellableThreads );
292
-
293
- deleteRetentionLeaseStep .whenComplete (ignored -> {
277
+ // If the target previously had a copy of this shard then a file-based recovery might move its global checkpoint
278
+ // backwards. We must therefore remove any existing retention lease so that we can create a new one later on in the
279
+ // recovery.
280
+ deleteRetentionLease (ActionListener .wrap (ignored -> {
294
281
assert Transports .assertNotTransportThread (RecoverySourceHandler .this + "[phase1]" );
295
282
phase1 (safeCommitRef .getIndexCommit (), startingSeqNo , () -> estimateNumOps , sendFileStep );
296
- }, onFailure );
283
+ }, onFailure )) ;
297
284
298
285
} catch (final Exception e ) {
299
286
throw new RecoveryEngineException (shard .shardId (), 1 , "sendFileStep failed" , e );
@@ -999,6 +986,26 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
999
986
}), shard , cancellableThreads , leaseListener .delegateResponse ((l , e ) -> outerListener .onFailure (e )));
1000
987
}
1001
988
989
+ private void deleteRetentionLease (ActionListener <Void > outerListener ) {
990
+ // NB we release the operation permit as soon as we have deleted the lease, but delay the outer listener until it is synced
991
+ final var leaseListener = new SubscribableListener <Void >();
992
+ final var delayedListener = new ThreadedActionListener <>(
993
+ shard .getThreadPool ().generic (),
994
+ outerListener .<Void >delegateFailure ((l , ignored ) -> leaseListener .addListener (l ))
995
+ );
996
+
997
+ runUnderPrimaryPermit (permitListener -> ActionListener .completeWith (permitListener , () -> {
998
+ try {
999
+ shard .removePeerRecoveryRetentionLease (request .targetNode ().getId (), delayedListener .map (ignored -> null ));
1000
+ } catch (RetentionLeaseNotFoundException e ) {
1001
+ // this counts as success
1002
+ logger .debug ("no peer-recovery retention lease for [{}]" , request .targetAllocationId ());
1003
+ leaseListener .addListener (outerListener );
1004
+ }
1005
+ return null ;
1006
+ }), shard , cancellableThreads , leaseListener .delegateResponse ((l , e ) -> outerListener .onFailure (e )));
1007
+ }
1008
+
1002
1009
boolean hasSameLegacySyncId (Store .MetadataSnapshot source , Store .MetadataSnapshot target ) {
1003
1010
if (source .getSyncId () == null || source .getSyncId ().equals (target .getSyncId ()) == false ) {
1004
1011
return false ;
0 commit comments