Skip to content

Commit b3b059c

Browse files
authored
Revert primary permits changes and add hook (#120398)
We would like to avoid using directly primary permits for hollow shards. So we revert relevant changes, and add a hook into the function that gets a primary permit with the purpose of a plugin being able to extend the behavior. Relates ES-10537
1 parent bf92343 commit b3b059c

File tree

5 files changed

+91
-127
lines changed

5 files changed

+91
-127
lines changed

server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Iterator;
2929
import java.util.List;
3030
import java.util.function.Consumer;
31+
import java.util.function.Supplier;
3132

3233
import static org.elasticsearch.core.Strings.format;
3334

@@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
349350
}
350351
}
351352
}
353+
354+
@Override
355+
public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {
356+
for (IndexEventListener listener : listeners) {
357+
try {
358+
listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier);
359+
} catch (Exception e) {
360+
logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e);
361+
throw e;
362+
}
363+
}
364+
}
352365
}

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.index.IndexSettings;
1818
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
1919

20+
import java.util.function.Supplier;
21+
2022
/**
2123
* An index event listener is the primary extension point for plugins and build-in services
2224
* to react / listen to per-index and per-shard events. These listeners are registered per-index
@@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
190192
* @param indexShard the shard that is recovering
191193
*/
192194
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}
195+
196+
/**
197+
* Called when a single primary permit is acquired for the given shard (see
198+
* {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}).
199+
*
200+
* @param indexShard the shard of which a primary permit is requested
201+
* @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be
202+
* completed in order for the permit to be given to the acquiring operation.
203+
*/
204+
default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {}
193205
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 32 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3535
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
3636
import org.elasticsearch.action.support.PlainActionFuture;
37+
import org.elasticsearch.action.support.RefCountingListener;
3738
import org.elasticsearch.action.support.SubscribableListener;
3839
import org.elasticsearch.action.support.replication.PendingReplicationActions;
3940
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -189,7 +190,6 @@
189190
import static org.elasticsearch.core.Strings.format;
190191
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
191192
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
192-
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE;
193193

194194
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
195195

@@ -779,28 +779,10 @@ public void relocated(
779779
final String targetAllocationId,
780780
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
781781
final ActionListener<Void> listener
782-
) throws IllegalIndexShardStateException, IllegalStateException {
783-
relocated(targetNodeId, targetAllocationId, consumer, listener, null);
784-
}
785-
786-
/**
787-
* Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option
788-
* to relocate the shard under externally acquired primary permits.
789-
*
790-
* @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately
791-
*/
792-
public void relocated(
793-
final String targetNodeId,
794-
final String targetAllocationId,
795-
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
796-
final ActionListener<Void> listener,
797-
@Nullable final Releasable acquiredPrimaryPermits
798782
) throws IllegalIndexShardStateException, IllegalStateException {
799783
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
800-
assert acquiredPrimaryPermits == null || indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
801-
: "external primary permits are provided but not held by the shard";
802784
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
803-
ActionListener<Releasable> onAcquired = new ActionListener<>() {
785+
indexShardOperationPermits.blockOperations(new ActionListener<>() {
804786
@Override
805787
public void onResponse(Releasable releasable) {
806788
boolean success = false;
@@ -878,13 +860,8 @@ public void onFailure(Exception e) {
878860
listener.onFailure(e);
879861
}
880862
}
881-
};
882-
if (acquiredPrimaryPermits == null) {
883-
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
884-
indexShardOperationPermits.blockOperations(onAcquired, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE);
885-
} else {
886-
ActionListener.completeWith(onAcquired, () -> acquiredPrimaryPermits);
887-
}
863+
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
864+
// CancellableThreads and we want to be able to interrupt it
888865
}
889866
}
890867

@@ -3592,100 +3569,69 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
35923569
);
35933570
}
35943571

3595-
/**
3596-
* Check to run before running the primary permit operation
3597-
*/
3598-
public enum PrimaryPermitCheck {
3599-
CHECK_PRIMARY_MODE,
3600-
/**
3601-
* IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards.
3602-
* Don't disable primary mode checks unless you're really sure.
3603-
*/
3604-
NONE
3605-
}
3606-
36073572
/**
36083573
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
36093574
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
36103575
* ActionListener will then be called using the provided executor.
36113576
*/
36123577
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
3613-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
3578+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
36143579
}
36153580

36163581
public void acquirePrimaryOperationPermit(
36173582
ActionListener<Releasable> onPermitAcquired,
36183583
Executor executorOnDelay,
36193584
boolean forceExecution
3620-
) {
3621-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE);
3622-
}
3623-
3624-
public void acquirePrimaryOperationPermit(
3625-
ActionListener<Releasable> onPermitAcquired,
3626-
Executor executorOnDelay,
3627-
boolean forceExecution,
3628-
PrimaryPermitCheck primaryPermitCheck
36293585
) {
36303586
verifyNotClosed();
36313587
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
3632-
indexShardOperationPermits.acquire(
3633-
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3634-
executorOnDelay,
3635-
forceExecution
3636-
);
3588+
3589+
ActionListener<Releasable> onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> {
3590+
final ActionListener<Releasable> wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener(
3591+
delegate,
3592+
executorOnDelay,
3593+
forceExecution
3594+
);
3595+
try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) {
3596+
indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire());
3597+
}
3598+
});
3599+
3600+
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution);
36373601
}
36383602

36393603
public boolean isPrimaryMode() {
36403604
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
36413605
return replicationTracker.isPrimaryMode();
36423606
}
36433607

3644-
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
3645-
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE);
3646-
}
3647-
36483608
/**
36493609
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
36503610
* It is the responsibility of the caller to close the {@link Releasable}.
36513611
*/
3652-
public void acquireAllPrimaryOperationsPermits(
3653-
final ActionListener<Releasable> onPermitAcquired,
3654-
final TimeValue timeout,
3655-
final PrimaryPermitCheck primaryPermitCheck
3656-
) {
3612+
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
36573613
verifyNotClosed();
36583614
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
36593615

3660-
asyncBlockOperations(
3661-
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3662-
timeout.duration(),
3663-
timeout.timeUnit()
3664-
);
3616+
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
36653617
}
36663618

36673619
/**
3668-
* Wraps the action to run on a primary after acquiring permit.
3620+
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
3621+
* executing the action.
36693622
*
3670-
* @param primaryPermitCheck check to run before the primary mode operation
36713623
* @param listener the listener to wrap
36723624
* @return the wrapped listener
36733625
*/
3674-
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
3675-
final PrimaryPermitCheck primaryPermitCheck,
3676-
final ActionListener<Releasable> listener
3677-
) {
3678-
return switch (primaryPermitCheck) {
3679-
case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
3680-
if (isPrimaryMode()) {
3681-
l.onResponse(r);
3682-
} else {
3683-
r.close();
3684-
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3685-
}
3686-
});
3687-
case NONE -> listener;
3688-
};
3626+
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
3627+
return listener.delegateFailure((l, r) -> {
3628+
if (isPrimaryMode()) {
3629+
l.onResponse(r);
3630+
} else {
3631+
r.close();
3632+
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3633+
}
3634+
});
36893635
}
36903636

36913637
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
@@ -3723,7 +3669,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
37233669
runnable.run();
37243670
}
37253671
}, onFailure);
3726-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
3672+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
37273673
}
37283674

37293675
private <E extends Exception> void bumpPrimaryTerm(

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -216,32 +216,7 @@ private void innerAcquire(
216216
try {
217217
synchronized (this) {
218218
if (queuedBlockOperations > 0) {
219-
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
220-
final ActionListener<Releasable> wrappedListener;
221-
if (executorOnDelay != null) {
222-
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired).delegateFailure(
223-
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
224-
@Override
225-
public boolean isForceExecution() {
226-
return forceExecution;
227-
}
228-
229-
@Override
230-
protected void doRun() {
231-
listener.onResponse(r);
232-
}
233-
234-
@Override
235-
public void onRejection(Exception e) {
236-
IOUtils.closeWhileHandlingException(r);
237-
super.onRejection(e);
238-
}
239-
})
240-
);
241-
} else {
242-
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
243-
}
244-
delayedOperations.add(wrappedListener);
219+
delayedOperations.add(wrapContextPreservingActionListener(onAcquired, executorOnDelay, forceExecution));
245220
return;
246221
} else {
247222
releasable = acquire();
@@ -255,6 +230,39 @@ public void onRejection(Exception e) {
255230
onAcquired.onResponse(releasable);
256231
}
257232

233+
public <T extends Closeable> ActionListener<T> wrapContextPreservingActionListener(
234+
ActionListener<T> listener,
235+
@Nullable final Executor executorOnDelay,
236+
final boolean forceExecution
237+
) {
238+
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
239+
final ActionListener<T> wrappedListener;
240+
if (executorOnDelay != null) {
241+
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener).delegateFailure(
242+
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
243+
@Override
244+
public boolean isForceExecution() {
245+
return forceExecution;
246+
}
247+
248+
@Override
249+
protected void doRun() {
250+
listener.onResponse(r);
251+
}
252+
253+
@Override
254+
public void onRejection(Exception e) {
255+
IOUtils.closeWhileHandlingException(r);
256+
super.onRejection(e);
257+
}
258+
})
259+
);
260+
} else {
261+
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener);
262+
}
263+
return wrappedListener;
264+
}
265+
258266
private Releasable acquire() throws InterruptedException {
259267
assert Thread.holdsLock(this);
260268
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -790,21 +790,6 @@ public void onFailure(final Exception e) {
790790
}
791791
}, TimeValue.timeValueSeconds(30));
792792
latch.await();
793-
794-
// It's possible to acquire permits if we skip the primary mode check
795-
var permitAcquiredLatch = new CountDownLatch(1);
796-
indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> {
797-
r.close();
798-
permitAcquiredLatch.countDown();
799-
}, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE);
800-
safeAwait(permitAcquiredLatch);
801-
802-
var allPermitsAcquiredLatch = new CountDownLatch(1);
803-
indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
804-
r.close();
805-
allPermitsAcquiredLatch.countDown();
806-
}, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE);
807-
safeAwait(allPermitsAcquiredLatch);
808793
}
809794

810795
if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {

0 commit comments

Comments
 (0)