|
12 | 12 | import org.elasticsearch.action.ActionListener; |
13 | 13 | import org.elasticsearch.action.ActionRunnable; |
14 | 14 | import org.elasticsearch.action.support.ActionFilters; |
| 15 | +import org.elasticsearch.action.support.SubscribableListener; |
15 | 16 | import org.elasticsearch.action.support.broadcast.BroadcastResponse; |
16 | 17 | import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; |
17 | 18 | import org.elasticsearch.cluster.ClusterState; |
@@ -92,12 +93,16 @@ protected void shardOperation( |
92 | 93 | ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener |
93 | 94 | ) { |
94 | 95 | assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it |
95 | | - threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(listener, () -> { |
| 96 | + SubscribableListener.<IndexShard>newForked(l -> { |
96 | 97 | IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()) |
97 | 98 | .getShard(shardRouting.shardId().id()); |
98 | | - indexShard.forceMerge(request); |
99 | | - return EmptyResult.INSTANCE; |
100 | | - })); |
| 99 | + indexShard.ensureMutable(l.map(unused -> indexShard)); |
| 100 | + }).<EmptyResult>andThen((l, indexShard) -> { |
| 101 | + threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(l, () -> { |
| 102 | + indexShard.forceMerge(request); |
| 103 | + return EmptyResult.INSTANCE; |
| 104 | + })); |
| 105 | + }).addListener(listener); |
101 | 106 | } |
102 | 107 |
|
103 | 108 | /** |
|
0 commit comments