diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 8c6058b47cf0c..4e9924f740702 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -264,6 +264,23 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta } public static SubscribableListener addTemporaryStateListener(ClusterService clusterService, Predicate predicate) { + return addTemporaryStateListener(clusterService, predicate, ESTestCase.SAFE_AWAIT_TIMEOUT); + } + + /** + * Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state + * that satisfies {@code predicate}, at which point it unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is + * already complete. If no matching cluster state is seen within the provided {@code timeout} then the listener is + * completed exceptionally on the scheduler thread that belongs to {@code clusterService}. + */ + public static SubscribableListener addTemporaryStateListener( + ClusterService clusterService, + Predicate predicate, + TimeValue timeout + ) { final var listener = new SubscribableListener(); final ClusterStateListener clusterStateListener = new ClusterStateListener() { @Override @@ -287,7 +304,7 @@ public String toString() { if (predicate.test(clusterService.state())) { listener.onResponse(null); } else { - listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE); + listener.addTimeout(timeout, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE); } return listener; } diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index 7ba8f8f4283fd..a396c94dbb17c 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -117,7 +117,7 @@ private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.Dow return true; } return false; - }); + }, timeout); safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS); } }