Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,6 @@ tests:
- class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests
method: testRecreateTemplateWhenDeleted
issue: https://github.com/elastic/elasticsearch/issues/123232
- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT
method: testDataStreamLifecycleDownsampleRollingRestart
issue: https://github.com/elastic/elasticsearch/issues/123769
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=ml/start_data_frame_analytics/Test start given dest index is not empty}
issue: https://github.com/elastic/elasticsearch/issues/125909
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,23 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
return addTemporaryStateListener(clusterService, predicate, ESTestCase.SAFE_AWAIT_TIMEOUT);
}

/**
* Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
* that satisfies {@code predicate}, at which point it unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
* already complete. If no matching cluster state is seen within the provided {@code timeout} then the listener is
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(
ClusterService clusterService,
Predicate<ClusterState> predicate,
TimeValue timeout
) {
final var listener = new SubscribableListener<Void>();
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
Expand All @@ -296,7 +313,7 @@ public String toString() {
if (predicate.test(clusterService.state())) {
listener.onResponse(null);
} else {
listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
listener.addTimeout(timeout, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
return listener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.Dow
return true;
}
return false;
});
}, timeout);
safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS);
}
}