diff --git a/docs/changelog/125520.yaml b/docs/changelog/125520.yaml new file mode 100644 index 0000000000000..71172b2137dc0 --- /dev/null +++ b/docs/changelog/125520.yaml @@ -0,0 +1,6 @@ +pr: 125520 +summary: Add `FailedShardEntry` info to shard-failed task source string +area: Allocation +type: enhancement +issues: + - 102606 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index 8a21372c9b50a..5f1b2d9df6f4e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -8,14 +8,21 @@ */ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; +import java.util.concurrent.CyclicBarrier; + import static org.hamcrest.Matchers.equalTo; public class ShardStateIT extends ESIntegTestCase { @@ -76,4 +83,58 @@ protected void assertPrimaryTerms(long shard0Term, long shard1Term) { } } } + + public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + prepareCreate("test").setSettings(indexSettings(1, 0)).get(); + ensureGreen(); + + final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var barrier = new CyclicBarrier(2); + + // Used to block the master service task processing so we have a chance to get the pending shard-failed task. + masterNodeClusterService.createTaskQueue("initial-block", Priority.NORMAL, batchExecutionContext -> { + safeAwait(barrier); + safeAwait(barrier); + batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); + return batchExecutionContext.initialState(); + }).submitTask("initial-block", e -> fail(e, "unexpected"), null); + + // Sync up with the blocking executor. + safeAwait(barrier); + + // Obtain a reference to the IndexShard for shard 0. + final var state = masterNodeClusterService.state(); + final var shard0RoutingTable = state.routingTable(Metadata.DEFAULT_PROJECT_ID).index("test").shard(0); + assertNotNull(shard0RoutingTable); + final var nodeId = shard0RoutingTable.primaryShard().currentNodeId(); + final var node = state.nodes().get(nodeId).getName(); + final var indicesService = internalCluster().getInstance(IndicesService.class, node); + final var shard0 = indicesService.indexService(resolveIndex("test")).getShard(0); + assertNotNull(shard0); + + // Create a failed shard state action for shard 0. + final var shardFailedReason = "simulated test failure"; + final var shardFailedException = new ElasticsearchException("simulated exception"); + shard0.failShard(shardFailedReason, shardFailedException); + + // Get the pending tasks and verify we see the shard-failed state action and expected source string components. + final var masterService = masterNodeClusterService.getMasterService(); + assertBusy(() -> { + assertTrue(masterService.pendingTasks().stream().anyMatch(task -> { + final var src = task.getSource().string(); + // We expect the failure reason and exception message, but not the stack trace. + return src.startsWith("shard-failed ") + && src.contains("[test][0]") + && src.contains(shardFailedReason) + && src.contains(shardFailedException.getMessage()) + && src.contains(ExceptionsHelper.stackTrace(shardFailedException)) == false; + })); + }); + + // Unblock the master service from the executor above. + safeAwait(barrier); + // Wait for the failed shard task to get processed and then for the shard and cluster to recover. + ensureGreen(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 3fa4685db8d4a..9e5194dfc479f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -283,16 +283,14 @@ private static class ShardFailedTransportHandler implements TransportRequestHand ClusterService clusterService, ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor ) { - taskQueue = clusterService.createTaskQueue(TASK_SOURCE, Priority.HIGH, shardFailedClusterStateTaskExecutor); + taskQueue = clusterService.createTaskQueue("shard-failed", Priority.HIGH, shardFailedClusterStateTaskExecutor); } - private static final String TASK_SOURCE = "shard-failed"; - @Override public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) { logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure); taskQueue.submitTask( - TASK_SOURCE, + "shard-failed " + request.toStringNoFailureStackTrace(), new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)), null ); @@ -501,6 +499,14 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { + return toString(true); + } + + public String toStringNoFailureStackTrace() { + return toString(false); + } + + private String toString(boolean includeStackTrace) { return Strings.format( "FailedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s], markAsStale [%b], failure [%s]}", shardId, @@ -508,7 +514,7 @@ public String toString() { primaryTerm, message, markAsStale, - failure != null ? ExceptionsHelper.stackTrace(failure) : null + failure == null ? null : (includeStackTrace ? ExceptionsHelper.stackTrace(failure) : failure.getMessage()) ); }