Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125520.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125520
summary: Add `FailedShardEntry` info to shard-failed task source string
area: Allocation
type: enhancement
issues:
- 102606
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
*/
package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.concurrent.CyclicBarrier;

import static org.hamcrest.Matchers.equalTo;

public class ShardStateIT extends ESIntegTestCase {
Expand Down Expand Up @@ -76,4 +83,58 @@ protected void assertPrimaryTerms(long shard0Term, long shard1Term) {
}
}
}

public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(1);
prepareCreate("test").setSettings(indexSettings(1, 0)).get();
ensureGreen();

final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var barrier = new CyclicBarrier(2);

// Used to block the master service task processing so we have a chance to get the pending shard-failed task.
masterNodeClusterService.createTaskQueue("initial-block", Priority.NORMAL, batchExecutionContext -> {
safeAwait(barrier);
safeAwait(barrier);
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
return batchExecutionContext.initialState();
}).submitTask("initial-block", ignored -> {}, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it's useful to assert that the onFailure of a task is never called, e.g.:

Suggested change
}).submitTask("initial-block", ignored -> {}, null);
}).submitTask("initial-block", e -> fail(e, "unexpected"), null);


// Sync up with the blocking executor.
safeAwait(barrier);

// Obtain a reference to the IndexShard for shard 0.
final var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you can get the state with masterNodeClusterService.state().

final var shard0RoutingTable = state.routingTable(Metadata.DEFAULT_PROJECT_ID).index("test").shard(0);
assertNotNull(shard0RoutingTable);
final var nodeId = shard0RoutingTable.primaryShard().currentNodeId();
final var node = state.nodes().get(nodeId).getName();
final var indicesService = internalCluster().getInstance(IndicesService.class, node);
final var shard0 = indicesService.indexService(resolveIndex("test")).getShard(0);
assertNotNull(shard0);

// Create a failed shard state action for shard 0.
final var shardFailedReason = "simulated test failure";
final var shardFailedException = new ElasticsearchException("simulated exception");
shard0.failShard(shardFailedReason, shardFailedException);

// Get the pending tasks and verify we see the shard-failed state action and expected source string components.
final var masterService = masterNodeClusterService.getMasterService();
assertBusy(() -> {
assertTrue(masterService.pendingTasks().stream().anyMatch(task -> {
final var src = task.getSource().string();
// We expect the failure reason and exception message, but not the stack trace.
return src.startsWith("shard-failed ")
&& src.contains("[test][0]")
&& src.contains(shardFailedReason)
&& src.contains(shardFailedException.getMessage())
&& src.contains(ExceptionsHelper.stackTrace(shardFailedException)) == false;
}));
});

// Unblock the master service from the blocked executor and allow the failed shard task to get processed.
safeAwait(barrier);
assertBusy(() -> assertTrue(masterService.pendingTasks().isEmpty()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to drop this assertion since it is not really relevant. It may also add up to the total test time unnecessarily and has a potential (though tiny) to fail if the CI machine is really slow or the cluster does something rare.

ensureGreen();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,14 @@ private static class ShardFailedTransportHandler implements TransportRequestHand
ClusterService clusterService,
ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor
) {
taskQueue = clusterService.createTaskQueue(TASK_SOURCE, Priority.HIGH, shardFailedClusterStateTaskExecutor);
taskQueue = clusterService.createTaskQueue("shard-failed", Priority.HIGH, shardFailedClusterStateTaskExecutor);
}

private static final String TASK_SOURCE = "shard-failed";

@Override
public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) {
logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure);
taskQueue.submitTask(
TASK_SOURCE,
"shard-failed " + request.toStringNoFailureStackTrace(),
new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)),
null
);
Expand Down Expand Up @@ -501,14 +499,22 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return toString(true);
}

public String toStringNoFailureStackTrace() {
return toString(false);
}

private String toString(boolean includeStackTrace) {
return Strings.format(
"FailedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s], markAsStale [%b], failure [%s]}",
shardId,
allocationId,
primaryTerm,
message,
markAsStale,
failure != null ? ExceptionsHelper.stackTrace(failure) : null
failure == null ? null : (includeStackTrace ? ExceptionsHelper.stackTrace(failure) : failure.getMessage())
);
}

Expand Down
Loading