diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java index da08b78d711cf..7a866792d167a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterState; @@ -92,12 +93,16 @@ protected void shardOperation( ActionListener listener ) { assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it - threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(listener, () -> { + SubscribableListener.newForked(l -> { IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()) .getShard(shardRouting.shardId().id()); - indexShard.forceMerge(request); - return EmptyResult.INSTANCE; - })); + indexShard.ensureMutable(l.map(unused -> indexShard)); + }).andThen((l, indexShard) -> { + threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(l, () -> { + indexShard.forceMerge(request); + return EmptyResult.INSTANCE; + })); + }).addListener(listener); } /**