Skip to content

Commit 091c227

Browse files
committed
Overload safeGet and safeAwait to also accept timeouts
1 parent 4377b72 commit 091c227

File tree

2 files changed

+30
-22
lines changed

2 files changed

+30
-22
lines changed

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2477,9 +2477,19 @@ public static void safeAcquire(int permits, Semaphore semaphore) {
24772477
* @return The value with which the {@code listener} was completed.
24782478
*/
24792479
public static <T> T safeAwait(SubscribableListener<T> listener) {
2480+
return safeAwait(listener, SAFE_AWAIT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
2481+
}
2482+
2483+
/**
2484+
* Wait for the successful completion of the given {@link SubscribableListener}, respecting the provided timeout,
2485+
* preserving the thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2486+
*
2487+
* @return The value with which the {@code listener} was completed.
2488+
*/
2489+
public static <T> T safeAwait(SubscribableListener<T> listener, long timeout, TimeUnit unit) {
24802490
final var future = new TestPlainActionFuture<T>();
24812491
listener.addListener(future);
2482-
return safeGet(future);
2492+
return safeGet(future, timeout, unit);
24832493
}
24842494

24852495
/**
@@ -2509,8 +2519,18 @@ public static <T extends ActionResponse> T safeExecute(ElasticsearchClient clien
25092519
* @return The value with which the {@code future} was completed.
25102520
*/
25112521
public static <T> T safeGet(Future<T> future) {
2522+
return safeGet(future, SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2523+
}
2524+
2525+
/**
2526+
* Wait for the successful completion of the given {@link Future}, respecting the provided timeout, preserving the
2527+
* thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2528+
*
2529+
* @return The value with which the {@code future} was completed.
2530+
*/
2531+
public static <T> T safeGet(Future<T> future, long timeout, TimeUnit unit) {
25122532
try {
2513-
return future.get(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2533+
return future.get(timeout, unit);
25142534
} catch (InterruptedException e) {
25152535
Thread.currentThread().interrupt();
25162536
throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e);

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1313
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1414
import org.elasticsearch.action.downsample.DownsampleConfig;
15-
import org.elasticsearch.action.support.TestPlainActionFuture;
1615
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
1716
import org.elasticsearch.cluster.metadata.IndexMetadata;
1817
import org.elasticsearch.cluster.service.ClusterService;
@@ -101,8 +100,7 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
101100
);
102101

103102
logger.info("-> Starting the disruption.");
104-
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
105-
});
103+
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
106104

107105
ensureDownsamplingStatus(targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.SUCCESS), TimeValue.timeValueSeconds(120));
108106
ensureGreen(targetIndex);
@@ -115,28 +113,18 @@ private void ensureDownsamplingStatus(
115113
TimeValue timeout
116114
) {
117115
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
118-
final IndexMetadata.DownsampleTaskStatus[] downsamplingStatus = new IndexMetadata.DownsampleTaskStatus[1];
119116
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
120117
final var indexMetadata = clusterState.metadata().getProject().index(downsampledIndex);
121118
if (indexMetadata == null) {
122119
return false;
123120
}
124-
downsamplingStatus[0] = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
125-
return expectedStatuses.contains(downsamplingStatus[0]);
126-
});
127-
try {
128-
final var future = new TestPlainActionFuture<Void>();
129-
listener.addListener(future);
130-
future.get(timeout.getMillis(), TimeUnit.MILLISECONDS);
131-
logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus[0]);
132-
} catch (Exception e) {
133-
if (e instanceof InterruptedException) {
134-
Thread.currentThread().interrupt();
121+
var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
122+
if (expectedStatuses.contains(downsamplingStatus)) {
123+
logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus);
124+
return true;
135125
}
136-
throw new AssertionError(
137-
"Error while waiting for " + expectedStatuses + " but found '" + downsamplingStatus[0] + "'. " + e.getMessage(),
138-
e
139-
);
140-
}
126+
return false;
127+
});
128+
safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS);
141129
}
142130
}

0 commit comments

Comments
 (0)