Skip to content

Commit 89467b8

Browse files
Add FailedShardEntry info to shard-failed task source string (#125520)
Appends the FailedShardEntry request to the 'shard-failed' task source string in ShardFailedTransportHandler.messageReceived(). This information will now be available in the 'source' string for shard failed task entries in the Cluster Pending Tasks API response. This source string change matches what is done in the ShardStartedTransportHandler. Closes #102606.
1 parent 9b041af commit 89467b8

File tree

3 files changed

+78
-5
lines changed

3 files changed

+78
-5
lines changed

docs/changelog/125520.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 125520
2+
summary: Add `FailedShardEntry` info to shard-failed task source string
3+
area: Allocation
4+
type: enhancement
5+
issues:
6+
- 102606

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,21 @@
88
*/
99
package org.elasticsearch.cluster.routing.allocation;
1010

11+
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.ExceptionsHelper;
1113
import org.elasticsearch.cluster.ClusterState;
1214
import org.elasticsearch.cluster.health.ClusterHealthStatus;
1315
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.common.Priority;
1419
import org.elasticsearch.index.IndexService;
1520
import org.elasticsearch.index.shard.IndexShard;
1621
import org.elasticsearch.indices.IndicesService;
1722
import org.elasticsearch.test.ESIntegTestCase;
1823

24+
import java.util.concurrent.CyclicBarrier;
25+
1926
import static org.hamcrest.Matchers.equalTo;
2027

2128
public class ShardStateIT extends ESIntegTestCase {
@@ -76,4 +83,58 @@ protected void assertPrimaryTerms(long shard0Term, long shard1Term) {
7683
}
7784
}
7885
}
86+
87+
public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception {
88+
internalCluster().ensureAtLeastNumDataNodes(1);
89+
prepareCreate("test").setSettings(indexSettings(1, 0)).get();
90+
ensureGreen();
91+
92+
final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
93+
final var barrier = new CyclicBarrier(2);
94+
95+
// Used to block the master service task processing so we have a chance to get the pending shard-failed task.
96+
masterNodeClusterService.createTaskQueue("initial-block", Priority.NORMAL, batchExecutionContext -> {
97+
safeAwait(barrier);
98+
safeAwait(barrier);
99+
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
100+
return batchExecutionContext.initialState();
101+
}).submitTask("initial-block", e -> fail(e, "unexpected"), null);
102+
103+
// Sync up with the blocking executor.
104+
safeAwait(barrier);
105+
106+
// Obtain a reference to the IndexShard for shard 0.
107+
final var state = masterNodeClusterService.state();
108+
final var shard0RoutingTable = state.routingTable(Metadata.DEFAULT_PROJECT_ID).index("test").shard(0);
109+
assertNotNull(shard0RoutingTable);
110+
final var nodeId = shard0RoutingTable.primaryShard().currentNodeId();
111+
final var node = state.nodes().get(nodeId).getName();
112+
final var indicesService = internalCluster().getInstance(IndicesService.class, node);
113+
final var shard0 = indicesService.indexService(resolveIndex("test")).getShard(0);
114+
assertNotNull(shard0);
115+
116+
// Create a failed shard state action for shard 0.
117+
final var shardFailedReason = "simulated test failure";
118+
final var shardFailedException = new ElasticsearchException("simulated exception");
119+
shard0.failShard(shardFailedReason, shardFailedException);
120+
121+
// Get the pending tasks and verify we see the shard-failed state action and expected source string components.
122+
final var masterService = masterNodeClusterService.getMasterService();
123+
assertBusy(() -> {
124+
assertTrue(masterService.pendingTasks().stream().anyMatch(task -> {
125+
final var src = task.getSource().string();
126+
// We expect the failure reason and exception message, but not the stack trace.
127+
return src.startsWith("shard-failed ")
128+
&& src.contains("[test][0]")
129+
&& src.contains(shardFailedReason)
130+
&& src.contains(shardFailedException.getMessage())
131+
&& src.contains(ExceptionsHelper.stackTrace(shardFailedException)) == false;
132+
}));
133+
});
134+
135+
// Unblock the master service from the executor above.
136+
safeAwait(barrier);
137+
// Wait for the failed shard task to get processed and then for the shard and cluster to recover.
138+
ensureGreen();
139+
}
79140
}

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,16 +283,14 @@ private static class ShardFailedTransportHandler implements TransportRequestHand
283283
ClusterService clusterService,
284284
ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor
285285
) {
286-
taskQueue = clusterService.createTaskQueue(TASK_SOURCE, Priority.HIGH, shardFailedClusterStateTaskExecutor);
286+
taskQueue = clusterService.createTaskQueue("shard-failed", Priority.HIGH, shardFailedClusterStateTaskExecutor);
287287
}
288288

289-
private static final String TASK_SOURCE = "shard-failed";
290-
291289
@Override
292290
public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) {
293291
logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure);
294292
taskQueue.submitTask(
295-
TASK_SOURCE,
293+
"shard-failed " + request.toStringNoFailureStackTrace(),
296294
new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)),
297295
null
298296
);
@@ -501,14 +499,22 @@ public void writeTo(StreamOutput out) throws IOException {
501499

502500
@Override
503501
public String toString() {
502+
return toString(true);
503+
}
504+
505+
public String toStringNoFailureStackTrace() {
506+
return toString(false);
507+
}
508+
509+
private String toString(boolean includeStackTrace) {
504510
return Strings.format(
505511
"FailedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s], markAsStale [%b], failure [%s]}",
506512
shardId,
507513
allocationId,
508514
primaryTerm,
509515
message,
510516
markAsStale,
511-
failure != null ? ExceptionsHelper.stackTrace(failure) : null
517+
failure == null ? null : (includeStackTrace ? ExceptionsHelper.stackTrace(failure) : failure.getMessage())
512518
);
513519
}
514520

0 commit comments

Comments
 (0)