@@ -779,10 +779,28 @@ public void relocated(
779779 final String targetAllocationId ,
780780 final BiConsumer <ReplicationTracker .PrimaryContext , ActionListener <Void >> consumer ,
781781 final ActionListener <Void > listener
782+ ) throws IllegalIndexShardStateException , IllegalStateException {
783+ relocated (targetNodeId , targetAllocationId , consumer , listener , null );
784+ }
785+
786+ /**
787+ * Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option
788+ * to relocate the shard under externally acquired primary permits.
789+ *
790+ * @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately
791+ */
792+ public void relocated (
793+ final String targetNodeId ,
794+ final String targetAllocationId ,
795+ final BiConsumer <ReplicationTracker .PrimaryContext , ActionListener <Void >> consumer ,
796+ final ActionListener <Void > listener ,
797+ @ Nullable final Releasable acquiredPrimaryPermits
782798 ) throws IllegalIndexShardStateException , IllegalStateException {
783799 assert shardRouting .primary () : "only primaries can be marked as relocated: " + shardRouting ;
800+ assert acquiredPrimaryPermits == null || indexShardOperationPermits .getActiveOperationsCount () == OPERATIONS_BLOCKED
801+ : "external primary permits are provided but not held by the shard" ;
784802 try (Releasable forceRefreshes = refreshListeners .forceRefreshes ()) {
785- indexShardOperationPermits . blockOperations ( new ActionListener <>() {
803+ ActionListener < Releasable > onAcquired = new ActionListener <>() {
786804 @ Override
787805 public void onResponse (Releasable releasable ) {
788806 boolean success = false ;
@@ -860,8 +878,13 @@ public void onFailure(Exception e) {
860878 listener .onFailure (e );
861879 }
862880 }
863- }, 30L , TimeUnit .MINUTES , EsExecutors .DIRECT_EXECUTOR_SERVICE ); // Wait on current thread because this execution is wrapped by
864- // CancellableThreads and we want to be able to interrupt it
881+ };
882+ if (acquiredPrimaryPermits == null ) {
883+ // Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
884+ indexShardOperationPermits .blockOperations (onAcquired , 30L , TimeUnit .MINUTES , EsExecutors .DIRECT_EXECUTOR_SERVICE );
885+ } else {
886+ ActionListener .completeWith (onAcquired , () -> acquiredPrimaryPermits );
887+ }
865888 }
866889 }
867890
0 commit comments