diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java index d391bace56489..2c6ccb434b801 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java @@ -214,7 +214,6 @@ private CountDownLatch configureElectionLatch(String newMaster, List */ private static String ensureSufficientMasterEligibleNodes() { final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener( - internalCluster().getAnyMasterNodeInstance(ClusterService.class), cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size() ); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 8dd16caedff31..96f3c3b7b569c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ListenableFuture; @@ -2182,7 +2181,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() { // ensure each snapshot has really started before moving on to the next one safeAwait( ClusterServiceUtils.addTemporaryStateListener( - internalCluster().getInstance(ClusterService.class), cs -> SnapshotsInProgress.get(cs) .forRepo(repoName) .stream() @@ -2202,7 +2200,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() { final var indexRecreatedListener = ClusterServiceUtils // wait until the snapshot has entered finalization .addTemporaryStateListener( - internalCluster().getInstance(ClusterService.class), cs -> SnapshotsInProgress.get(cs) .forRepo(repoName) .stream() diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index eea60ce13af2f..badc7f47b1a0a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -19,7 +19,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -848,12 +847,7 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception { clusterAdmin().prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap-1", "test-snap-2") .setIndices("test-idx-1") .get(); - safeAwait( - ClusterServiceUtils.addTemporaryStateListener( - internalCluster().getInstance(ClusterService.class), - cs -> SnapshotsInProgress.get(cs).isEmpty() - ) - ); + safeAwait(ClusterServiceUtils.addTemporaryStateListener(cs -> SnapshotsInProgress.get(cs).isEmpty())); assertThat( clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, "test-repo") .setSnapshots("test-snap-2") diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 11f2607191c2e..4dd63915467c3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1484,7 +1484,6 @@ public void run() { indexShard.failShard("simulated", new ElasticsearchException("simulated")); safeAwait( ClusterServiceUtils.addTemporaryStateListener( - internalCluster().getInstance(ClusterService.class), cs -> cs.metadata().getProject().index(indexName).primaryTerm(0) > primaryTerm ) ); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java index b86cae1c2fb60..24539a270c03e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -170,22 +169,19 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception { */ private SubscribableListener createSnapshotDeletionListener(String repositoryName) { AtomicBoolean deleteHasStarted = new AtomicBoolean(false); - return ClusterServiceUtils.addTemporaryStateListener( - internalCluster().getCurrentMasterNodeInstance(ClusterService.class), - state -> { - SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms() - .get(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress == null) { - return false; - } - if (deleteHasStarted.get() == false) { - deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName)); - return false; - } else { - return deletionsInProgress.hasExecutingDeletion(repositoryName) == false; - } + return ClusterServiceUtils.addMasterTemporaryStateListener(state -> { + SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms() + .get(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress == null) { + return false; } - ); + if (deleteHasStarted.get() == false) { + deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName)); + return false; + } else { + return deletionsInProgress.hasExecutingDeletion(repositoryName) == false; + } + }); } public void testRerouteWhenShardSnapshotsCompleted() throws Exception { @@ -209,13 +205,10 @@ public void testRerouteWhenShardSnapshotsCompleted() throws Exception { .put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", originalNode) ); - final var shardMovedListener = ClusterServiceUtils.addTemporaryStateListener( - internalCluster().getCurrentMasterNodeInstance(ClusterService.class), - state -> { - final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard(); - return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false; - } - ); + final var shardMovedListener = ClusterServiceUtils.addMasterTemporaryStateListener(state -> { + final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard(); + return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false; + }); assertFalse(shardMovedListener.isDone()); unblockAllDataNodes(repoName); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 8c6058b47cf0c..52296f1d896cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -263,6 +263,15 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta ); } + /** + * Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state + * that satisfies {@code predicate}, at which point it unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is + * already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is + * completed exceptionally on the scheduler thread that belongs to {@code clusterService}. + */ public static SubscribableListener addTemporaryStateListener(ClusterService clusterService, Predicate predicate) { final var listener = new SubscribableListener(); final ClusterStateListener clusterStateListener = new ClusterStateListener() { @@ -291,4 +300,35 @@ public String toString() { } return listener; } + + /** + * Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the + * {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate} + * the listener unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster + * state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen + * within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that + * belongs to the chosen node's {@link ClusterService}. + */ + public static SubscribableListener addTemporaryStateListener(Predicate predicate) { + return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate); + } + + /** + * Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the + * {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate} + * the listener unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * {@link ClusterService} belonging to the node that was the elected master node in the + * {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches + * {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within + * {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to + * the elected master node's {@link ClusterService}. + */ + public static SubscribableListener addMasterTemporaryStateListener(Predicate predicate) { + return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 3b648e5f8da06..5c033558daaee 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -867,8 +867,7 @@ public static List waitForDataStreamBackingIndices(String dataStreamName public static List waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) { // We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state. // This avoids inconsistencies in subsequent API calls which might hit a non-master node. - final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); - final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> { + final var listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName); if (dataStream == null) { return false;