From 4bfa21a56381e7da4c5aa5b99ce34ebf094ade10 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Mon, 24 Mar 2025 13:31:39 -0400 Subject: [PATCH 1/6] Add FailedShardEntry info to shard-failed task source string Appends the FailedShardEntry request to the 'shard-failed' task source string in ShardFailedTransportHandler.messageReceived(). This information will now be available in the 'source' string for shard failed task entries in the Cluster Pending Tasks API response. This source string change matches what is done in the ShardStartedTransportHandler. Closes #102606. --- .../cluster/action/shard/ShardStateAction.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 3fa4685db8d4a..1797d56a4b5b4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -283,16 +283,14 @@ private static class ShardFailedTransportHandler implements TransportRequestHand ClusterService clusterService, ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor ) { - taskQueue = clusterService.createTaskQueue(TASK_SOURCE, Priority.HIGH, shardFailedClusterStateTaskExecutor); + taskQueue = clusterService.createTaskQueue("shard-failed", Priority.HIGH, shardFailedClusterStateTaskExecutor); } - private static final String TASK_SOURCE = "shard-failed"; - @Override public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) { logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure); taskQueue.submitTask( - TASK_SOURCE, + "shard-failed " + request, new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)), null ); From 064b2fe6caff8acfb8c749b1881b83af46a71089 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Mon, 24 Mar 2025 13:38:41 -0400 Subject: [PATCH 2/6] Update docs/changelog/125520.yaml --- docs/changelog/125520.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/125520.yaml diff --git a/docs/changelog/125520.yaml b/docs/changelog/125520.yaml new file mode 100644 index 0000000000000..71172b2137dc0 --- /dev/null +++ b/docs/changelog/125520.yaml @@ -0,0 +1,6 @@ +pr: 125520 +summary: Add `FailedShardEntry` info to shard-failed task source string +area: Allocation +type: enhancement +issues: + - 102606 From 25d46b539586dac545406adbd0b48f7741fb17be Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Tue, 25 Mar 2025 13:02:57 -0400 Subject: [PATCH 3/6] Don't include stack trace in source string, add unit test case --- .../action/shard/ShardStateAction.java | 14 ++- .../action/shard/ShardStateActionTests.java | 109 ++++++++++++++++++ 2 files changed, 120 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 1797d56a4b5b4..173019d8eae10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -276,7 +276,7 @@ public void onTimeout(TimeValue timeout) { } // TODO: Make this a TransportMasterNodeAction and remove duplication of master failover retrying from upstream code - private static class ShardFailedTransportHandler implements TransportRequestHandler { + static class ShardFailedTransportHandler implements TransportRequestHandler { private final MasterServiceTaskQueue taskQueue; ShardFailedTransportHandler( @@ -290,7 +290,7 @@ private static class ShardFailedTransportHandler implements TransportRequestHand public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) { logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure); taskQueue.submitTask( - "shard-failed " + request, + "shard-failed " + request.toStringNoFailureStackTrace(), new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)), null ); @@ -499,6 +499,14 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { + return toString(true); + } + + public String toStringNoFailureStackTrace() { + return toString(false); + } + + private String toString(boolean includeStackTrace) { return Strings.format( "FailedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s], markAsStale [%b], failure [%s]}", shardId, @@ -506,7 +514,7 @@ public String toString() { primaryTerm, message, markAsStale, - failure != null ? ExceptionsHelper.stackTrace(failure) : null + failure == null ? null : (includeStackTrace ? ExceptionsHelper.stackTrace(failure) : failure.getMessage()) ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 3d534d9a74ad2..aeaaf9ea6bc34 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -11,12 +11,15 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; @@ -28,19 +31,27 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardLongFieldRangeWireTests; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.TestTransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; @@ -51,7 +62,11 @@ import org.junit.BeforeClass; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -628,6 +643,77 @@ public void testStartedShardEntrySerializationWithOlderTransportVersion() throws } } + public void testShardFailedTransportHandlerSubmitTaskSourceStringIncludesRequestInfo() { + // Create a modified ClusterService that returns task capturing task queues. + final var taskQueueMap = new HashMap>(); + final var modifiedClusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + THREAD_POOL, + null + ) { + @SuppressWarnings("unchecked") + @Override + public MasterServiceTaskQueue createTaskQueue( + String name, + Priority priority, + ClusterStateTaskExecutor executor + ) { + return (MasterServiceTaskQueue) taskQueueMap.computeIfAbsent( + name, + k -> new TaskCollectingQueue(super.createTaskQueue(name, priority, executor)) + ); + } + }; + + final var simulatedException = new RuntimeException("fake exception"); + final var failedShardEntry = new FailedShardEntry( + new ShardId(new Index("foo-idx", "foo-idx-id"), 0), + "alloc-id", + 0L, + "FAILURE MSG", + simulatedException, + false + ); + final var now = new TimeValue(System.currentTimeMillis()); + final var shardFailedTask = new Task( + 82L, + "transport", + ShardStateAction.SHARD_FAILED_ACTION_NAME, + "", + null, + now.millis(), + now.nanos(), + Map.of() + ); + + final var handler = new ShardStateAction.ShardFailedTransportHandler( + modifiedClusterService, + new ShardStateAction.ShardFailedClusterStateTaskExecutor(null, null) + ); + + // Check that the submitted task's 'source' string doesn't include the exception stack trace. + handler.messageReceived(failedShardEntry, new TestTransportChannel(ActionListener.noop()), shardFailedTask); + final var taskQueue = taskQueueMap.get("shard-failed"); + assertNotNull(taskQueue); + final var tasks = taskQueue.getTasks(); + assertEquals(1, tasks.size()); + final var task = tasks.getFirst(); + final var stackTraceInfo = ExceptionsHelper.stackTrace(simulatedException); + assertEquals("shard-failed " + failedShardEntry.toStringNoFailureStackTrace(), task.source()); + assertNotNull(failedShardEntry.failure); + assertFalse("Shard failed task's source string included the exception stack trace", task.source.contains(stackTraceInfo)); + assertTrue( + "Shard failed task's source string didn't include the exception message", + task.source.contains(simulatedException.getMessage()) + ); + assertTrue( + "FailedShardEntry.toString() didn't include the exception stack trace", + failedShardEntry.toString().contains(stackTraceInfo) + ); + assertTrue(task.task instanceof ShardStateAction.FailedShardUpdateTask); + } + BytesReference serialize(Writeable writeable, TransportVersion version) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setTransportVersion(version); @@ -663,4 +749,27 @@ void await() throws InterruptedException { latch.await(); } } + + private static class TaskCollectingQueue implements MasterServiceTaskQueue { + + record Entry(String source, T task, TimeValue timeout) {} + + private final List> tasks; + private final MasterServiceTaskQueue taskQueue; + + TaskCollectingQueue(MasterServiceTaskQueue taskQueue) { + this.taskQueue = taskQueue; + tasks = new ArrayList<>(); + } + + @Override + public void submitTask(String source, T task, TimeValue timeout) { + tasks.add(new Entry(source, task, timeout)); + taskQueue.submitTask(source, task, timeout); + } + + List> getTasks() { + return tasks; + } + } } From 0f7b0474757310e37a6ecb5e274a6fd73193cd3e Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Thu, 27 Mar 2025 16:53:58 -0400 Subject: [PATCH 4/6] Switch to adding a new test case in ShardStateIT --- .../routing/allocation/ShardStateIT.java | 78 +++++++++++++ .../action/shard/ShardStateAction.java | 2 +- .../action/shard/ShardStateActionTests.java | 109 ------------------ 3 files changed, 79 insertions(+), 110 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index 8a21372c9b50a..5bfacd561be7e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -8,14 +8,20 @@ */ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; +import java.util.concurrent.CyclicBarrier; + import static org.hamcrest.Matchers.equalTo; public class ShardStateIT extends ESIntegTestCase { @@ -76,4 +82,76 @@ protected void assertPrimaryTerms(long shard0Term, long shard1Term) { } } } + + public void testGetPendingTasksSourceStringDataForFailedAndStartedShards() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + prepareCreate("test").setSettings(indexSettings(1, 0)).get(); + ensureGreen(); + + final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var barrier = new CyclicBarrier(2); + + final var finalBlockingQueue = masterNodeClusterService.createTaskQueue("final-block", Priority.NORMAL, batchExecutionContext -> { + safeAwait(barrier); + batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); + return batchExecutionContext.initialState(); + }); + + masterNodeClusterService.createTaskQueue("initial-block", Priority.NORMAL, batchExecutionContext -> { + safeAwait(barrier); + safeAwait(barrier); + batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); + // Submit the final blocking task before exiting so that it will be queued before the expected shard-started task. + finalBlockingQueue.submitTask("final-block", ignored -> {}, null); + return batchExecutionContext.initialState(); + }).submitTask("initial-block", ignored -> {}, null); + + // Sync up with our initial blocking executor. + safeAwait(barrier); + + // Obtain a reference to the IndexShard for shard 0. + final var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState(); + final var shard0RoutingTable = state.routingTable().index("test").shard(0); + assertNotNull(shard0RoutingTable); + final var nodeId = shard0RoutingTable.primaryShard().currentNodeId(); + final var node = state.nodes().get(nodeId).getName(); + final var indicesService = internalCluster().getInstance(IndicesService.class, node); + final var shard0 = indicesService.indexService(resolveIndex("test")).getShard(0); + assertNotNull(shard0); + + // Create a failed shard state action for shard 0. + final var shardFailedReason = "simulated test failure"; + final var shardFailedException = new ElasticsearchException("simulated exception"); + shard0.failShard(shardFailedReason, shardFailedException); + + // Get the pending tasks and verify we see the shard-failed state action and expected source string components. + final var masterService = masterNodeClusterService.getMasterService(); + assertBusy(() -> { + assertTrue(masterService.pendingTasks().stream().anyMatch(task -> { + final var src = task.getSource().string(); + // We expect the failure reason and exception message, but not the stack trace. + return src.startsWith("shard-failed ") + && src.contains("[test][0]") + && src.contains(shardFailedReason) + && src.contains(shardFailedException.getMessage()) + && src.contains(ExceptionsHelper.stackTrace(shardFailedException)) == false; + })); + }); + + // Unblock the master service from the initial-block executor and allow the failed shard task to get processed. + safeAwait(barrier); + + // Wait for recovery and a shard-started pending task for shard 0. + assertBusy(() -> { + assertTrue(masterService.pendingTasks().stream().anyMatch(task -> { + final var src = task.getSource().string(); + return src.startsWith("shard-started ") && src.contains("[test][0]") && src.contains("after existing store recovery"); + })); + }); + + // Unblock the master service and wait for all tasks to clear and the cluster to complete the recovery. + safeAwait(barrier); + assertBusy(() -> assertTrue(masterService.pendingTasks().isEmpty())); + ensureGreen(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 173019d8eae10..9e5194dfc479f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -276,7 +276,7 @@ public void onTimeout(TimeValue timeout) { } // TODO: Make this a TransportMasterNodeAction and remove duplication of master failover retrying from upstream code - static class ShardFailedTransportHandler implements TransportRequestHandler { + private static class ShardFailedTransportHandler implements TransportRequestHandler { private final MasterServiceTaskQueue taskQueue; ShardFailedTransportHandler( diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index aeaaf9ea6bc34..3d534d9a74ad2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -11,15 +11,12 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; @@ -31,27 +28,19 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.service.MasterServiceTaskQueue; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardLongFieldRangeWireTests; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; -import org.elasticsearch.transport.TestTransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; @@ -62,11 +51,7 @@ import org.junit.BeforeClass; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -643,77 +628,6 @@ public void testStartedShardEntrySerializationWithOlderTransportVersion() throws } } - public void testShardFailedTransportHandlerSubmitTaskSourceStringIncludesRequestInfo() { - // Create a modified ClusterService that returns task capturing task queues. - final var taskQueueMap = new HashMap>(); - final var modifiedClusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - THREAD_POOL, - null - ) { - @SuppressWarnings("unchecked") - @Override - public MasterServiceTaskQueue createTaskQueue( - String name, - Priority priority, - ClusterStateTaskExecutor executor - ) { - return (MasterServiceTaskQueue) taskQueueMap.computeIfAbsent( - name, - k -> new TaskCollectingQueue(super.createTaskQueue(name, priority, executor)) - ); - } - }; - - final var simulatedException = new RuntimeException("fake exception"); - final var failedShardEntry = new FailedShardEntry( - new ShardId(new Index("foo-idx", "foo-idx-id"), 0), - "alloc-id", - 0L, - "FAILURE MSG", - simulatedException, - false - ); - final var now = new TimeValue(System.currentTimeMillis()); - final var shardFailedTask = new Task( - 82L, - "transport", - ShardStateAction.SHARD_FAILED_ACTION_NAME, - "", - null, - now.millis(), - now.nanos(), - Map.of() - ); - - final var handler = new ShardStateAction.ShardFailedTransportHandler( - modifiedClusterService, - new ShardStateAction.ShardFailedClusterStateTaskExecutor(null, null) - ); - - // Check that the submitted task's 'source' string doesn't include the exception stack trace. - handler.messageReceived(failedShardEntry, new TestTransportChannel(ActionListener.noop()), shardFailedTask); - final var taskQueue = taskQueueMap.get("shard-failed"); - assertNotNull(taskQueue); - final var tasks = taskQueue.getTasks(); - assertEquals(1, tasks.size()); - final var task = tasks.getFirst(); - final var stackTraceInfo = ExceptionsHelper.stackTrace(simulatedException); - assertEquals("shard-failed " + failedShardEntry.toStringNoFailureStackTrace(), task.source()); - assertNotNull(failedShardEntry.failure); - assertFalse("Shard failed task's source string included the exception stack trace", task.source.contains(stackTraceInfo)); - assertTrue( - "Shard failed task's source string didn't include the exception message", - task.source.contains(simulatedException.getMessage()) - ); - assertTrue( - "FailedShardEntry.toString() didn't include the exception stack trace", - failedShardEntry.toString().contains(stackTraceInfo) - ); - assertTrue(task.task instanceof ShardStateAction.FailedShardUpdateTask); - } - BytesReference serialize(Writeable writeable, TransportVersion version) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setTransportVersion(version); @@ -749,27 +663,4 @@ void await() throws InterruptedException { latch.await(); } } - - private static class TaskCollectingQueue implements MasterServiceTaskQueue { - - record Entry(String source, T task, TimeValue timeout) {} - - private final List> tasks; - private final MasterServiceTaskQueue taskQueue; - - TaskCollectingQueue(MasterServiceTaskQueue taskQueue) { - this.taskQueue = taskQueue; - tasks = new ArrayList<>(); - } - - @Override - public void submitTask(String source, T task, TimeValue timeout) { - tasks.add(new Entry(source, task, timeout)); - taskQueue.submitTask(source, task, timeout); - } - - List> getTasks() { - return tasks; - } - } } From c3698fa5e5d26cd9781373a2fc79d9371f10aaf9 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Thu, 27 Mar 2025 19:33:34 -0400 Subject: [PATCH 5/6] Simplify integration test case --- .../routing/allocation/ShardStateIT.java | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index 5bfacd561be7e..b4b3fcc6d34ad 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.index.IndexService; @@ -83,7 +84,7 @@ protected void assertPrimaryTerms(long shard0Term, long shard1Term) { } } - public void testGetPendingTasksSourceStringDataForFailedAndStartedShards() throws Exception { + public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1); prepareCreate("test").setSettings(indexSettings(1, 0)).get(); ensureGreen(); @@ -91,27 +92,20 @@ public void testGetPendingTasksSourceStringDataForFailedAndStartedShards() throw final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); final var barrier = new CyclicBarrier(2); - final var finalBlockingQueue = masterNodeClusterService.createTaskQueue("final-block", Priority.NORMAL, batchExecutionContext -> { - safeAwait(barrier); - batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); - return batchExecutionContext.initialState(); - }); - + // Used to block the master service task processing so we have a chance to get the pending shard-failed task. masterNodeClusterService.createTaskQueue("initial-block", Priority.NORMAL, batchExecutionContext -> { safeAwait(barrier); safeAwait(barrier); batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); - // Submit the final blocking task before exiting so that it will be queued before the expected shard-started task. - finalBlockingQueue.submitTask("final-block", ignored -> {}, null); return batchExecutionContext.initialState(); }).submitTask("initial-block", ignored -> {}, null); - // Sync up with our initial blocking executor. + // Sync up with the blocking executor. safeAwait(barrier); // Obtain a reference to the IndexShard for shard 0. final var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState(); - final var shard0RoutingTable = state.routingTable().index("test").shard(0); + final var shard0RoutingTable = state.routingTable(Metadata.DEFAULT_PROJECT_ID).index("test").shard(0); assertNotNull(shard0RoutingTable); final var nodeId = shard0RoutingTable.primaryShard().currentNodeId(); final var node = state.nodes().get(nodeId).getName(); @@ -138,18 +132,7 @@ public void testGetPendingTasksSourceStringDataForFailedAndStartedShards() throw })); }); - // Unblock the master service from the initial-block executor and allow the failed shard task to get processed. - safeAwait(barrier); - - // Wait for recovery and a shard-started pending task for shard 0. - assertBusy(() -> { - assertTrue(masterService.pendingTasks().stream().anyMatch(task -> { - final var src = task.getSource().string(); - return src.startsWith("shard-started ") && src.contains("[test][0]") && src.contains("after existing store recovery"); - })); - }); - - // Unblock the master service and wait for all tasks to clear and the cluster to complete the recovery. + // Unblock the master service from the blocked executor and allow the failed shard task to get processed. safeAwait(barrier); assertBusy(() -> assertTrue(masterService.pendingTasks().isEmpty())); ensureGreen(); From f3b07209e23a5fe14fde8da6c5e328bade21be84 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Fri, 28 Mar 2025 08:46:49 -0400 Subject: [PATCH 6/6] Address code review comments --- .../cluster/routing/allocation/ShardStateIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index b4b3fcc6d34ad..5f1b2d9df6f4e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -98,13 +98,13 @@ public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception safeAwait(barrier); batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); return batchExecutionContext.initialState(); - }).submitTask("initial-block", ignored -> {}, null); + }).submitTask("initial-block", e -> fail(e, "unexpected"), null); // Sync up with the blocking executor. safeAwait(barrier); // Obtain a reference to the IndexShard for shard 0. - final var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState(); + final var state = masterNodeClusterService.state(); final var shard0RoutingTable = state.routingTable(Metadata.DEFAULT_PROJECT_ID).index("test").shard(0); assertNotNull(shard0RoutingTable); final var nodeId = shard0RoutingTable.primaryShard().currentNodeId(); @@ -132,9 +132,9 @@ public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception })); }); - // Unblock the master service from the blocked executor and allow the failed shard task to get processed. + // Unblock the master service from the executor above. safeAwait(barrier); - assertBusy(() -> assertTrue(masterService.pendingTasks().isEmpty())); + // Wait for the failed shard task to get processed and then for the shard and cluster to recover. ensureGreen(); } }