Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125520.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125520
summary: Add `FailedShardEntry` info to shard-failed task source string
area: Allocation
type: enhancement
issues:
- 102606
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
*/
package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.concurrent.CyclicBarrier;

import static org.hamcrest.Matchers.equalTo;

public class ShardStateIT extends ESIntegTestCase {
Expand Down Expand Up @@ -76,4 +83,58 @@ protected void assertPrimaryTerms(long shard0Term, long shard1Term) {
}
}
}

public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(1);
prepareCreate("test").setSettings(indexSettings(1, 0)).get();
ensureGreen();

final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var barrier = new CyclicBarrier(2);

// Used to block the master service task processing so we have a chance to get the pending shard-failed task.
masterNodeClusterService.createTaskQueue("initial-block", Priority.NORMAL, batchExecutionContext -> {
safeAwait(barrier);
safeAwait(barrier);
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
return batchExecutionContext.initialState();
}).submitTask("initial-block", e -> fail(e, "unexpected"), null);

// Sync up with the blocking executor.
safeAwait(barrier);

// Obtain a reference to the IndexShard for shard 0.
final var state = masterNodeClusterService.state();
final var shard0RoutingTable = state.routingTable(Metadata.DEFAULT_PROJECT_ID).index("test").shard(0);
assertNotNull(shard0RoutingTable);
final var nodeId = shard0RoutingTable.primaryShard().currentNodeId();
final var node = state.nodes().get(nodeId).getName();
final var indicesService = internalCluster().getInstance(IndicesService.class, node);
final var shard0 = indicesService.indexService(resolveIndex("test")).getShard(0);
assertNotNull(shard0);

// Create a failed shard state action for shard 0.
final var shardFailedReason = "simulated test failure";
final var shardFailedException = new ElasticsearchException("simulated exception");
shard0.failShard(shardFailedReason, shardFailedException);

// Get the pending tasks and verify we see the shard-failed state action and expected source string components.
final var masterService = masterNodeClusterService.getMasterService();
assertBusy(() -> {
assertTrue(masterService.pendingTasks().stream().anyMatch(task -> {
final var src = task.getSource().string();
// We expect the failure reason and exception message, but not the stack trace.
return src.startsWith("shard-failed ")
&& src.contains("[test][0]")
&& src.contains(shardFailedReason)
&& src.contains(shardFailedException.getMessage())
&& src.contains(ExceptionsHelper.stackTrace(shardFailedException)) == false;
}));
});

// Unblock the master service from the executor above.
safeAwait(barrier);
// Wait for the failed shard task to get processed and then for the shard and cluster to recover.
ensureGreen();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,14 @@ private static class ShardFailedTransportHandler implements TransportRequestHand
ClusterService clusterService,
ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor
) {
taskQueue = clusterService.createTaskQueue(TASK_SOURCE, Priority.HIGH, shardFailedClusterStateTaskExecutor);
taskQueue = clusterService.createTaskQueue("shard-failed", Priority.HIGH, shardFailedClusterStateTaskExecutor);
}

private static final String TASK_SOURCE = "shard-failed";

@Override
public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) {
logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure);
taskQueue.submitTask(
TASK_SOURCE,
"shard-failed " + request.toStringNoFailureStackTrace(),
new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)),
null
);
Expand Down Expand Up @@ -501,14 +499,22 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return toString(true);
}

public String toStringNoFailureStackTrace() {
return toString(false);
}

private String toString(boolean includeStackTrace) {
return Strings.format(
"FailedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s], markAsStale [%b], failure [%s]}",
shardId,
allocationId,
primaryTerm,
message,
markAsStale,
failure != null ? ExceptionsHelper.stackTrace(failure) : null
failure == null ? null : (includeStackTrace ? ExceptionsHelper.stackTrace(failure) : failure.getMessage())
);
}

Expand Down
Loading