diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java index b1e28de1a5264..9d17ce9a7b9cd 100644 --- a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java @@ -206,7 +206,7 @@ public void testSortAndPaginateWithInProgress() throws Exception { inProgressSnapshots.add(AbstractSnapshotIntegTestCase.startFullSnapshot(logger, repoName, snapshotName, false)); } AbstractSnapshotIntegTestCase.awaitNumberOfSnapshotsInProgress(logger, inProgressCount); - AbstractSnapshotIntegTestCase.awaitClusterState(logger, state -> { + AbstractSnapshotIntegTestCase.awaitClusterState(state -> { final var snapshotsInProgress = SnapshotsInProgress.get(state); boolean firstIndexSuccessfullySnapshot = snapshotsInProgress.asStream() .flatMap(s -> s.shards().entrySet().stream()) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 0d359300bbdc1..99514b5412118 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -1072,7 +1072,6 @@ public void onRequestSent( final ActionFuture deleteResponse = startDeleteSnapshot(repoName, snapshotName); awaitClusterState( - logger, otherDataNode, state -> SnapshotsInProgress.get(state) .forRepo(repoName) diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 9becc4af7710e..94485b587a622 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -557,7 +557,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map wait for [{}] deletions to show up in the cluster state", count); awaitClusterState(state -> SnapshotDeletionsInProgress.get(state).getEntries().size() == count); } @@ -569,7 +569,6 @@ protected void awaitNoMoreRunningOperations() throws Exception { protected void awaitNoMoreRunningOperations(String viaNode) throws Exception { logger.info("--> verify no more operations in the cluster state"); awaitClusterState( - logger, viaNode, state -> SnapshotsInProgress.get(state).isEmpty() && SnapshotDeletionsInProgress.get(state).hasDeletionsInProgress() == false ); @@ -604,13 +603,13 @@ public static ActionFuture startFullSnapshot( .execute(); } - protected void awaitNumberOfSnapshotsInProgress(int count) throws Exception { + protected void awaitNumberOfSnapshotsInProgress(int count) { awaitNumberOfSnapshotsInProgress(logger, count); } - public static void awaitNumberOfSnapshotsInProgress(Logger logger, int count) throws Exception { + public static void awaitNumberOfSnapshotsInProgress(Logger logger, int count) { logger.info("--> wait for [{}] snapshots to show up in the cluster state", count); - awaitClusterState(logger, state -> SnapshotsInProgress.get(state).count() == count); + awaitClusterState(state -> SnapshotsInProgress.get(state).count() == count); } protected SnapshotInfo assertSuccessful(ActionFuture future) throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index becb6aaa7de19..b0c046cc161f0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -8,17 +8,14 @@ */ package org.elasticsearch.test; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.util.Throwables; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStatePublicationEvent; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NodeConnectionsService; @@ -37,14 +34,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -210,33 +205,8 @@ public static void setAllElapsedMillis(ClusterStatePublicationEvent clusterState clusterStatePublicationEvent.setMasterApplyElapsedMillis(0L); } - public static void awaitClusterState(Logger logger, Predicate statePredicate, ClusterService clusterService) - throws Exception { - final PlainActionFuture future = new PlainActionFuture<>(); - ClusterStateObserver.waitForState( - clusterService, - clusterService.getClusterApplierService().threadPool().getThreadContext(), - new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - future.onResponse(null); - } - - @Override - public void onClusterServiceClose() { - future.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - assert false : "onTimeout called with no timeout set"; - } - }, - statePredicate, - null, - logger - ); - future.get(30L, TimeUnit.SECONDS); + public static void awaitClusterState(Predicate statePredicate, ClusterService clusterService) { + ESTestCase.safeAwait(addTemporaryStateListener(clusterService, statePredicate, TimeValue.THIRTY_SECONDS), TimeValue.THIRTY_SECONDS); } public static void awaitNoPendingTasks(ClusterService clusterService) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 88b608d5aed01..fe3ca0b2ba7c7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -17,7 +17,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.http.HttpHost; -import org.apache.logging.log4j.Logger; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TotalHits; import org.apache.lucene.tests.util.LuceneTestCase; @@ -1140,16 +1139,12 @@ public static PendingClusterTasksResponse getClusterPendingTasks(Client client) } } - protected void awaitClusterState(Predicate statePredicate) throws Exception { - awaitClusterState(logger, internalCluster().getMasterName(), statePredicate); + public static void awaitClusterState(Predicate statePredicate) { + awaitClusterState(internalCluster().getMasterName(), statePredicate); } - public static void awaitClusterState(Logger logger, Predicate statePredicate) throws Exception { - awaitClusterState(logger, internalCluster().getMasterName(), statePredicate); - } - - public static void awaitClusterState(Logger logger, String viaNode, Predicate statePredicate) throws Exception { - ClusterServiceUtils.awaitClusterState(logger, statePredicate, internalCluster().getInstance(ClusterService.class, viaNode)); + public static void awaitClusterState(String viaNode, Predicate statePredicate) { + ClusterServiceUtils.awaitClusterState(statePredicate, internalCluster().getInstance(ClusterService.class, viaNode)); } public static String getNodeId(String nodeName) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java index 8898cac495706..c8c78e13f5b09 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java @@ -121,11 +121,7 @@ public void tearDown() throws Exception { protected void waitForMlTemplates() throws Exception { // block until the templates are installed - ClusterServiceUtils.awaitClusterState( - logger, - MachineLearning::criticalTemplatesInstalled, - getInstanceFromNode(ClusterService.class) - ); + ClusterServiceUtils.awaitClusterState(MachineLearning::criticalTemplatesInstalled, getInstanceFromNode(ClusterService.class)); } protected void blockingCall(Consumer> function, AtomicReference response, AtomicReference error) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 5cf15454e47f2..c8a8187af8acb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -170,8 +170,8 @@ protected Collection> getMockPlugins() { } @Before - public void ensureTemplatesArePresent() throws Exception { - awaitClusterState(logger, MachineLearning::criticalTemplatesInstalled); + public void ensureTemplatesArePresent() { + awaitClusterState(MachineLearning::criticalTemplatesInstalled); } protected Job.Builder createJob(String id) { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java index de249f7f07e58..fb3bce0d11012 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java @@ -1322,7 +1322,7 @@ private static IndexMetadata getIndexMetadata(String indexName) { .index(indexName); } - private void waitUntilAllShardsAreUnassigned(Index index) throws Exception { + private void waitUntilAllShardsAreUnassigned(Index index) { awaitClusterState(state -> state.getRoutingTable().index(index).allPrimaryShardsUnassigned()); }