Skip to content

Commit 6ca7e75

Browse files
authored
Add possibility to acquire permits on primary shards with different checks (#119794)
Since #42241 we check that the shard must be in a primary mode for acquiring a primary permit on it. We would like customize this check and an option to perform different checks before running the `onPermitAcquired` listener. For example, we would to skip the primary mode check when we acquire primary permits during recovering of a hollow indexing shard. See ES-10487
1 parent b68fc8e commit 6ca7e75

File tree

2 files changed

+75
-17
lines changed

2 files changed

+75
-17
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@
189189
import static org.elasticsearch.core.Strings.format;
190190
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
191191
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
192+
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE;
192193

193194
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
194195

@@ -3568,58 +3569,100 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
35683569
);
35693570
}
35703571

3572+
/**
3573+
* Check to run before running the primary permit operation
3574+
*/
3575+
public enum PrimaryPermitCheck {
3576+
CHECK_PRIMARY_MODE,
3577+
/**
3578+
* IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards.
3579+
* Don't disable primary mode checks unless you're really sure.
3580+
*/
3581+
NONE
3582+
}
3583+
35713584
/**
35723585
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
35733586
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
35743587
* ActionListener will then be called using the provided executor.
3575-
*
35763588
*/
35773589
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
3578-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
3590+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
35793591
}
35803592

35813593
public void acquirePrimaryOperationPermit(
35823594
ActionListener<Releasable> onPermitAcquired,
35833595
Executor executorOnDelay,
35843596
boolean forceExecution
3597+
) {
3598+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE);
3599+
}
3600+
3601+
public void acquirePrimaryOperationPermit(
3602+
ActionListener<Releasable> onPermitAcquired,
3603+
Executor executorOnDelay,
3604+
boolean forceExecution,
3605+
PrimaryPermitCheck primaryPermitCheck
35853606
) {
35863607
verifyNotClosed();
35873608
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
3588-
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution);
3609+
indexShardOperationPermits.acquire(
3610+
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3611+
executorOnDelay,
3612+
forceExecution
3613+
);
35893614
}
35903615

35913616
public boolean isPrimaryMode() {
35923617
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
35933618
return replicationTracker.isPrimaryMode();
35943619
}
35953620

3621+
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
3622+
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE);
3623+
}
3624+
35963625
/**
35973626
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
35983627
* It is the responsibility of the caller to close the {@link Releasable}.
35993628
*/
3600-
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
3629+
public void acquireAllPrimaryOperationsPermits(
3630+
final ActionListener<Releasable> onPermitAcquired,
3631+
final TimeValue timeout,
3632+
final PrimaryPermitCheck primaryPermitCheck
3633+
) {
36013634
verifyNotClosed();
36023635
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
36033636

3604-
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
3637+
asyncBlockOperations(
3638+
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3639+
timeout.duration(),
3640+
timeout.timeUnit()
3641+
);
36053642
}
36063643

36073644
/**
3608-
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
3609-
* executing the action.
3645+
* Wraps the action to run on a primary after acquiring permit.
36103646
*
3647+
* @param primaryPermitCheck check to run before the primary mode operation
36113648
* @param listener the listener to wrap
36123649
* @return the wrapped listener
36133650
*/
3614-
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
3615-
return listener.delegateFailure((l, r) -> {
3616-
if (isPrimaryMode()) {
3617-
l.onResponse(r);
3618-
} else {
3619-
r.close();
3620-
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3621-
}
3622-
});
3651+
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
3652+
final PrimaryPermitCheck primaryPermitCheck,
3653+
final ActionListener<Releasable> listener
3654+
) {
3655+
return switch (primaryPermitCheck) {
3656+
case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
3657+
if (isPrimaryMode()) {
3658+
l.onResponse(r);
3659+
} else {
3660+
r.close();
3661+
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3662+
}
3663+
});
3664+
case NONE -> listener;
3665+
};
36233666
}
36243667

36253668
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
@@ -3657,7 +3700,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
36573700
runnable.run();
36583701
}
36593702
}, onFailure);
3660-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
3703+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
36613704
}
36623705

36633706
private <E extends Exception> void bumpPrimaryTerm(

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,21 @@ public void onFailure(final Exception e) {
790790
}
791791
}, TimeValue.timeValueSeconds(30));
792792
latch.await();
793+
794+
// It's possible to acquire permits if we skip the primary mode check
795+
var permitAcquiredLatch = new CountDownLatch(1);
796+
indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> {
797+
r.close();
798+
permitAcquiredLatch.countDown();
799+
}, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE);
800+
safeAwait(permitAcquiredLatch);
801+
802+
var allPermitsAcquiredLatch = new CountDownLatch(1);
803+
indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
804+
r.close();
805+
allPermitsAcquiredLatch.countDown();
806+
}, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE);
807+
safeAwait(allPermitsAcquiredLatch);
793808
}
794809

795810
if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {

0 commit comments

Comments
 (0)