diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java index c59ec12ed4c5a..178ed0eb05ca3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java @@ -100,7 +100,7 @@ protected void shardOperation( SubscribableListener.newForked(l -> { IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()) .getShard(shardRouting.shardId().id()); - indexShard.ensureMutable(l.map(unused -> indexShard)); + indexShard.ensureMutable(l.map(unused -> indexShard), false); }).andThen((l, indexShard) -> { threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(l, () -> { indexShard.forceMerge(request); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index a8db6c306280a..01968596db932 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -161,7 +161,7 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l))); + primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)), true); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 7449291b4363d..617a3c5c49b3e 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -209,7 +209,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< var executor = executor(indexService); assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE); - SubscribableListener.newForked(indexShard::ensureMutable) + SubscribableListener.newForked((l) -> indexShard.ensureMutable(l, false)) // Make sure to fork back to a `write` thread pool if necessary .andThen(executor, threadPool.getThreadContext(), (l, unused) -> ActionListener.completeWith(l, () -> { assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE); diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 1f55b92982abe..03bbf77b66046 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -138,18 +138,22 @@ public void indexShardStateChanged( } @Override - public void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener listener) { - iterateBeforeIndexShardMutableOperation(indexShard, listener.delegateResponse((l, e) -> { + public void beforeIndexShardMutableOperation(IndexShard indexShard, boolean permitAcquired, ActionListener listener) { + iterateBeforeIndexShardMutableOperation(indexShard, permitAcquired, listener.delegateResponse((l, e) -> { logger.warn(() -> format("%s failed to invoke the listener before ensuring shard mutability", indexShard.shardId()), e); l.onFailure(e); })); } - private void iterateBeforeIndexShardMutableOperation(IndexShard indexShard, ActionListener outerListener) { + private void iterateBeforeIndexShardMutableOperation( + IndexShard indexShard, + boolean permitAcquired, + ActionListener outerListener + ) { callListeners( indexShard, listeners.stream() - .map(iel -> (Consumer>) (l) -> iel.beforeIndexShardMutableOperation(indexShard, l)) + .map(iel -> (Consumer>) (l) -> iel.beforeIndexShardMutableOperation(indexShard, permitAcquired, l)) .iterator(), outerListener ); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 3cd594e646f4e..92af26228948c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -91,6 +91,7 @@ default void indexShardStateChanged( *
    *
  • Indexing operations
  • *
  • Force merges
  • + *
  • Pre-updates
  • *
* * This method ensures that the shard is ready to accept mutating operations. This is particularly useful in cases @@ -98,10 +99,11 @@ default void indexShardStateChanged( * The provided listener should be notified once the shard is prepared to proceed with the operation. * This can be called from a transport thread and therefore the function should be lightweight and not block the thread. * - * @param indexShard the shard where the mutable operation will be performed - * @param listener the listener to be notified when the shard is ready to proceed + * @param indexShard the shard where the mutable operation will be performed + * @param permitAcquired whether the operation has acquired an operation permit on the shard + * @param listener the listener to be notified when the shard is ready to proceed */ - default void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener listener) { + default void beforeIndexShardMutableOperation(IndexShard indexShard, boolean permitAcquired, ActionListener listener) { listener.onResponse(null); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 751f07b144d27..bf2e20b70b441 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4532,8 +4532,8 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera * * @param listener the listener to be notified when the shard is mutable */ - public void ensureMutable(ActionListener listener) { - indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> { + public void ensureMutable(ActionListener listener, boolean permitAcquired) { + indexEventListener.beforeIndexShardMutableOperation(this, permitAcquired, listener.delegateFailure((l, unused) -> { // TODO ES-10826: Acquire ref to engine and retry if it's immutable again? l.onResponse(null); }));