Skip to content

Commit 5eb5eae

Browse files
authored
Fork computation in TransportGetShutdownStatusAction (#99490) (#99492)
This action does O(#shards) work so it must not happen on a transport worker. Also it can take minutes to complete in a huge cluster, so it really should react to cancellations properly. Closes #99487
1 parent ccd9b83 commit 5eb5eae

File tree

6 files changed

+82
-9
lines changed

6 files changed

+82
-9
lines changed

docs/changelog/99490.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 99490
2+
summary: Fork computation in `TransportGetShutdownStatusAction`
3+
area: Infra/Node Lifecycle
4+
type: bug
5+
issues:
6+
- 99487

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,9 @@ protected abstract void masterOperation(Task task, Request request, ClusterState
118118

119119
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
120120
throws Exception {
121-
if (task instanceof CancellableTask && ((CancellableTask) task).isCancelled()) {
121+
if (task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) {
122122
throw new TaskCancelledException("Task was cancelled");
123123
}
124-
125124
masterOperation(task, request, state, listener);
126125
}
127126

x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/GetShutdownStatusAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@
1414
import org.elasticsearch.common.Strings;
1515
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
1720
import org.elasticsearch.xcontent.ToXContentObject;
1821
import org.elasticsearch.xcontent.XContentBuilder;
1922

2023
import java.io.IOException;
2124
import java.util.Arrays;
2225
import java.util.List;
26+
import java.util.Map;
2327
import java.util.Objects;
2428

2529
public class GetShutdownStatusAction extends ActionType<GetShutdownStatusAction.Response> {
@@ -69,6 +73,11 @@ public boolean equals(Object o) {
6973
public int hashCode() {
7074
return Arrays.hashCode(nodeIds);
7175
}
76+
77+
@Override
78+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
79+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
80+
}
7281
}
7382

7483
public static class Response extends ActionResponse implements ToXContentObject {

x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/RestGetShutdownStatusAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.rest.RestRequest;
1414
import org.elasticsearch.rest.Scope;
1515
import org.elasticsearch.rest.ServerlessScope;
16+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1617
import org.elasticsearch.rest.action.RestToXContentListener;
1718

1819
import java.util.List;
@@ -36,7 +37,7 @@ public List<Route> routes() {
3637
@Override
3738
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
3839
String[] nodeIds = Strings.commaDelimitedListToStringArray(request.param("nodeId"));
39-
return channel -> client.execute(
40+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
4041
GetShutdownStatusAction.INSTANCE,
4142
new GetShutdownStatusAction.Request(nodeIds),
4243
new RestToXContentListener<>(channel)

x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
import org.elasticsearch.core.Tuple;
3737
import org.elasticsearch.shutdown.PluginShutdownService;
3838
import org.elasticsearch.snapshots.SnapshotsInfoService;
39+
import org.elasticsearch.tasks.CancellableTask;
3940
import org.elasticsearch.tasks.Task;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142
import org.elasticsearch.transport.TransportService;
43+
import org.elasticsearch.transport.Transports;
4244
import org.elasticsearch.xpack.core.ilm.ErrorStep;
4345
import org.elasticsearch.xpack.core.ilm.OperationMode;
4446
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
@@ -89,7 +91,7 @@ public TransportGetShutdownStatusAction(
8991
GetShutdownStatusAction.Request::readFrom,
9092
indexNameExpressionResolver,
9193
GetShutdownStatusAction.Response::new,
92-
ThreadPool.Names.SAME
94+
ThreadPool.Names.MANAGEMENT
9395
);
9496
this.allocationService = allocationService;
9597
this.allocationDeciders = allocationDeciders;
@@ -104,7 +106,8 @@ protected void masterOperation(
104106
GetShutdownStatusAction.Request request,
105107
ClusterState state,
106108
ActionListener<GetShutdownStatusAction.Response> listener
107-
) throws Exception {
109+
) {
110+
CancellableTask cancellableTask = (CancellableTask) task;
108111
NodesShutdownMetadata nodesShutdownMetadata = state.metadata().custom(NodesShutdownMetadata.TYPE);
109112

110113
GetShutdownStatusAction.Response response;
@@ -118,6 +121,7 @@ protected void masterOperation(
118121
ns -> new SingleNodeShutdownStatus(
119122
ns,
120123
shardMigrationStatus(
124+
cancellableTask,
121125
state,
122126
ns.getNodeId(),
123127
ns.getType(),
@@ -142,6 +146,7 @@ protected void masterOperation(
142146
ns -> new SingleNodeShutdownStatus(
143147
ns,
144148
shardMigrationStatus(
149+
cancellableTask,
145150
state,
146151
ns.getNodeId(),
147152
ns.getType(),
@@ -165,6 +170,7 @@ protected void masterOperation(
165170

166171
// pkg-private for testing
167172
static ShutdownShardMigrationStatus shardMigrationStatus(
173+
CancellableTask cancellableTask,
168174
ClusterState currentState,
169175
String nodeId,
170176
SingleNodeShutdownMetadata.Type shutdownType,
@@ -174,6 +180,8 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
174180
AllocationService allocationService,
175181
AllocationDeciders allocationDeciders
176182
) {
183+
assert Transports.assertNotTransportThread("doing O(#shards) work must be forked");
184+
177185
// Only REMOVE-type shutdowns will try to move shards, so RESTART-type shutdowns should immediately complete
178186
if (SingleNodeShutdownMetadata.Type.RESTART.equals(shutdownType)) {
179187
return new ShutdownShardMigrationStatus(
@@ -208,6 +216,7 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
208216
var unassignedShards = currentState.getRoutingNodes()
209217
.unassigned()
210218
.stream()
219+
.peek(s -> cancellableTask.ensureNotCancelled())
211220
.filter(s -> Objects.equals(s.unassignedInfo().getLastAllocatedNodeId(), nodeId))
212221
.filter(s -> s.primary() || hasShardCopyOnAnotherNode(currentState, s, shuttingDownNodes) == false)
213222
.toList();
@@ -264,6 +273,7 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
264273
Optional<Tuple<ShardRouting, ShardAllocationDecision>> unmovableShard = currentState.getRoutingNodes()
265274
.node(nodeId)
266275
.shardsWithState(ShardRoutingState.STARTED)
276+
.peek(s -> cancellableTask.ensureNotCancelled())
267277
.map(shardRouting -> new Tuple<>(shardRouting, allocationService.explainShardAllocation(shardRouting, allocation)))
268278
// Given that we're checking the status of a node that's shutting down, no shards should be allowed to remain
269279
.filter(pair -> {
@@ -285,17 +295,17 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
285295
})
286296
// If ILM is shrinking the index this shard is part of, it'll look like it's unmovable, but we can just wait for ILM to finish
287297
.filter(pair -> isIlmRestrictingShardMovement(currentState, pair.v1()) == false)
288-
.peek(pair -> {
289-
logger.debug(
298+
.peek(
299+
pair -> logger.debug(
290300
"node [{}] shutdown of type [{}] stalled: found shard [{}][{}] from index [{}] with negative decision: [{}]",
291301
nodeId,
292302
shutdownType,
293303
pair.v1().getId(),
294304
pair.v1().primary() ? "primary" : "replica",
295305
pair.v1().shardId().getIndexName(),
296306
Strings.toString(pair.v2())
297-
);
298-
})
307+
)
308+
)
299309
.findFirst();
300310

301311
if (totalRemainingShards == shardsToIgnoreForFinalStatus.get() && unmovableShard.isEmpty()) {

x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@
4343
import org.elasticsearch.node.Node;
4444
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
4545
import org.elasticsearch.snapshots.SnapshotsInfoService;
46+
import org.elasticsearch.tasks.CancellableTask;
47+
import org.elasticsearch.tasks.TaskCancelHelper;
48+
import org.elasticsearch.tasks.TaskCancelledException;
49+
import org.elasticsearch.tasks.TaskId;
4650
import org.elasticsearch.test.ESTestCase;
4751
import org.elasticsearch.test.gateway.TestGatewayAllocator;
4852
import org.elasticsearch.xpack.core.ilm.ErrorStep;
@@ -152,6 +156,7 @@ public void testEmptyCluster() {
152156
ClusterState state = createTestClusterState(routingTable, List.of(), SingleNodeShutdownMetadata.Type.REMOVE);
153157

154158
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
159+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
155160
state,
156161
SHUTTING_DOWN_NODE_ID,
157162
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -182,6 +187,7 @@ public void testRestartAlwaysComplete() {
182187
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.RESTART);
183188

184189
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
190+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
185191
state,
186192
SHUTTING_DOWN_NODE_ID,
187193
SingleNodeShutdownMetadata.Type.RESTART,
@@ -218,6 +224,7 @@ public void testComplete() {
218224
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
219225

220226
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
227+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
221228
state,
222229
SHUTTING_DOWN_NODE_ID,
223230
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -231,6 +238,39 @@ public void testComplete() {
231238
assertShardMigration(status, SingleNodeShutdownMetadata.Status.COMPLETE, 0, nullValue());
232239
}
233240

241+
/**
242+
* Ensures we check whether the task is cancelled during the computation
243+
*/
244+
public void testCancelled() {
245+
Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20));
246+
IndexMetadata imd = generateIndexMetadata(index, 1, 0);
247+
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(index)
248+
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), SHUTTING_DOWN_NODE_ID, true, ShardRoutingState.STARTED))
249+
.build();
250+
251+
RoutingTable.Builder routingTable = RoutingTable.builder();
252+
routingTable.add(indexRoutingTable);
253+
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
254+
255+
final var task = new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of());
256+
TaskCancelHelper.cancel(task, "test");
257+
258+
expectThrows(
259+
TaskCancelledException.class,
260+
() -> TransportGetShutdownStatusAction.shardMigrationStatus(
261+
task,
262+
state,
263+
SHUTTING_DOWN_NODE_ID,
264+
SingleNodeShutdownMetadata.Type.REMOVE,
265+
true,
266+
clusterInfoService,
267+
snapshotsInfoService,
268+
allocationService,
269+
allocationDeciders
270+
)
271+
);
272+
}
273+
234274
/**
235275
* Ensures that we properly detect "in progress" migrations while there are shards relocating off the node that's shutting down.
236276
*/
@@ -263,6 +303,7 @@ public void testInProgressWithRelocatingShards() {
263303
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
264304

265305
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
306+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
266307
state,
267308
SHUTTING_DOWN_NODE_ID,
268309
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -315,6 +356,7 @@ public void testInProgressWithShardsMovingBetweenOtherNodes() {
315356
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
316357

317358
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
359+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
318360
state,
319361
SHUTTING_DOWN_NODE_ID,
320362
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -351,6 +393,7 @@ public void testStalled() {
351393
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
352394

353395
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
396+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
354397
state,
355398
SHUTTING_DOWN_NODE_ID,
356399
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -441,6 +484,7 @@ public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() {
441484
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
442485

443486
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
487+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
444488
state,
445489
SHUTTING_DOWN_NODE_ID,
446490
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -473,6 +517,7 @@ public void testOnlyInitializingShardsRemaining() {
473517
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
474518

475519
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
520+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
476521
state,
477522
SHUTTING_DOWN_NODE_ID,
478523
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -548,6 +593,7 @@ public void testNodeNotInCluster() {
548593
.build();
549594

550595
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
596+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
551597
state,
552598
bogusNodeId,
553599
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -642,6 +688,7 @@ private void checkStalledShardWithIlmState(
642688
state = setIlmOperationMode(state, operationMode);
643689

644690
ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
691+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
645692
state,
646693
SHUTTING_DOWN_NODE_ID,
647694
SingleNodeShutdownMetadata.Type.REMOVE,
@@ -810,6 +857,7 @@ private ShutdownShardMigrationStatus getUnassignedShutdownStatus(Index index, In
810857
);
811858

812859
return TransportGetShutdownStatusAction.shardMigrationStatus(
860+
new CancellableTask(1, "direct", GetShutdownStatusAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()),
813861
state,
814862
SHUTTING_DOWN_NODE_ID,
815863
SingleNodeShutdownMetadata.Type.REMOVE,

0 commit comments

Comments
 (0)