Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected void shardOperation(
SubscribableListener.<IndexShard>newForked(l -> {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex())
.getShard(shardRouting.shardId().id());
indexShard.ensureMutable(l.map(unused -> indexShard));
indexShard.ensureMutable(l.map(unused -> indexShard), false);
}).<EmptyResult>andThen((l, indexShard) -> {
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(l, () -> {
indexShard.forceMerge(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {
primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)));
primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
var executor = executor(indexService);
assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE);

SubscribableListener.newForked(indexShard::ensureMutable)
SubscribableListener.<Void>newForked((l) -> indexShard.ensureMutable(l, false))
// Make sure to fork back to a `write` thread pool if necessary
.<UpdateHelper.Result>andThen(executor, threadPool.getThreadContext(), (l, unused) -> ActionListener.completeWith(l, () -> {
assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,22 @@ public void indexShardStateChanged(
}

@Override
public void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> listener) {
iterateBeforeIndexShardMutableOperation(indexShard, listener.delegateResponse((l, e) -> {
public void beforeIndexShardMutableOperation(IndexShard indexShard, boolean permitAcquired, ActionListener<Void> listener) {
iterateBeforeIndexShardMutableOperation(indexShard, permitAcquired, listener.delegateResponse((l, e) -> {
logger.warn(() -> format("%s failed to invoke the listener before ensuring shard mutability", indexShard.shardId()), e);
l.onFailure(e);
}));
}

private void iterateBeforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> outerListener) {
private void iterateBeforeIndexShardMutableOperation(
IndexShard indexShard,
boolean permitAcquired,
ActionListener<Void> outerListener
) {
callListeners(
indexShard,
listeners.stream()
.map(iel -> (Consumer<ActionListener<Void>>) (l) -> iel.beforeIndexShardMutableOperation(indexShard, l))
.map(iel -> (Consumer<ActionListener<Void>>) (l) -> iel.beforeIndexShardMutableOperation(indexShard, permitAcquired, l))
.iterator(),
outerListener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,19 @@ default void indexShardStateChanged(
* <ul>
* <li>Indexing operations</li>
* <li>Force merges</li>
* <li>Pre-updates</li>
* </ul>
*
* This method ensures that the shard is ready to accept mutating operations. This is particularly useful in cases
* where the shard initializes its internal {@link org.elasticsearch.index.engine.Engine} lazily, which may take some time.
* The provided listener should be notified once the shard is prepared to proceed with the operation.
* This can be called from a transport thread and therefore the function should be lightweight and not block the thread.
*
* @param indexShard the shard where the mutable operation will be performed
* @param listener the listener to be notified when the shard is ready to proceed
* @param indexShard the shard where the mutable operation will be performed
* @param permitAcquired whether the operation has acquired an operation permit on the shard
* @param listener the listener to be notified when the shard is ready to proceed
*/
default void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> listener) {
default void beforeIndexShardMutableOperation(IndexShard indexShard, boolean permitAcquired, ActionListener<Void> listener) {
listener.onResponse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4532,8 +4532,8 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera
*
* @param listener the listener to be notified when the shard is mutable
*/
public void ensureMutable(ActionListener<Void> listener) {
indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> {
public void ensureMutable(ActionListener<Void> listener, boolean permitAcquired) {
indexEventListener.beforeIndexShardMutableOperation(this, permitAcquired, listener.delegateFailure((l, unused) -> {
// TODO ES-10826: Acquire ref to engine and retry if it's immutable again?
l.onResponse(null);
}));
Expand Down