Skip to content

Commit dbc2119

Browse files
authored
Effort to fix testDataStreamLifecycleDownsampleRollingRestart #123769 (#125478) (#125857)
1 parent 0722687 commit dbc2119

File tree

2 files changed

+55
-37
lines changed

2 files changed

+55
-37
lines changed

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2449,9 +2449,19 @@ public static void safeAcquire(int permits, Semaphore semaphore) {
24492449
* @return The value with which the {@code listener} was completed.
24502450
*/
24512451
public static <T> T safeAwait(SubscribableListener<T> listener) {
2452+
return safeAwait(listener, SAFE_AWAIT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
2453+
}
2454+
2455+
/**
2456+
* Wait for the successful completion of the given {@link SubscribableListener}, respecting the provided timeout,
2457+
* preserving the thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2458+
*
2459+
* @return The value with which the {@code listener} was completed.
2460+
*/
2461+
public static <T> T safeAwait(SubscribableListener<T> listener, long timeout, TimeUnit unit) {
24522462
final var future = new TestPlainActionFuture<T>();
24532463
listener.addListener(future);
2454-
return safeGet(future);
2464+
return safeGet(future, timeout, unit);
24552465
}
24562466

24572467
/**
@@ -2481,8 +2491,18 @@ public static <T extends ActionResponse> T safeExecute(ElasticsearchClient clien
24812491
* @return The value with which the {@code future} was completed.
24822492
*/
24832493
public static <T> T safeGet(Future<T> future) {
2494+
return safeGet(future, SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2495+
}
2496+
2497+
/**
2498+
* Wait for the successful completion of the given {@link Future}, respecting the provided timeout, preserving the
2499+
* thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2500+
*
2501+
* @return The value with which the {@code future} was completed.
2502+
*/
2503+
public static <T> T safeGet(Future<T> future, long timeout, TimeUnit unit) {
24842504
try {
2485-
return future.get(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2505+
return future.get(timeout, unit);
24862506
} catch (InterruptedException e) {
24872507
Thread.currentThread().interrupt();
24882508
throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e);

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,34 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1313
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
14-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
15-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
1614
import org.elasticsearch.action.downsample.DownsampleConfig;
17-
import org.elasticsearch.action.support.IndicesOptions;
1815
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
1916
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.service.ClusterService;
2018
import org.elasticsearch.common.settings.Settings;
2119
import org.elasticsearch.core.TimeValue;
2220
import org.elasticsearch.datastreams.DataStreamsPlugin;
2321
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
2422
import org.elasticsearch.plugins.Plugin;
2523
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
24+
import org.elasticsearch.test.ClusterServiceUtils;
2625
import org.elasticsearch.test.ESIntegTestCase;
2726
import org.elasticsearch.test.InternalTestCluster;
28-
import org.elasticsearch.test.junit.annotations.TestLogging;
2927
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
3028
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
3129

3230
import java.util.Collection;
3331
import java.util.List;
3432
import java.util.concurrent.TimeUnit;
3533

34+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
3635
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
3736
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;
38-
import static org.hamcrest.Matchers.is;
39-
import static org.hamcrest.Matchers.notNullValue;
4037

4138
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4)
4239
public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
4340
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class);
44-
public static final int DOC_COUNT = 50_000;
41+
public static final int DOC_COUNT = 25_000;
4542

4643
@Override
4744
protected Collection<Class<? extends Plugin>> nodePlugins() {
@@ -55,7 +52,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5552
return settings.build();
5653
}
5754

58-
@TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
5955
public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
6056
final InternalTestCluster cluster = internalCluster();
6157
cluster.startMasterOnlyNodes(1);
@@ -90,36 +86,38 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
9086
// testing so DSL doesn't have to wait for the end_time to lapse)
9187
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
9288
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
89+
String sourceIndex = getBackingIndices(client(), dataStreamName).get(0);
90+
final String targetIndex = "downsample-5m-" + sourceIndex;
9391

94-
// DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
95-
// downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
96-
long sleepTime = randomLongBetween(3000, 4500);
97-
logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime);
98-
Thread.sleep(sleepTime);
99-
List<String> backingIndices = getBackingIndices(client(), dataStreamName);
100-
// first generation index
101-
String sourceIndex = backingIndices.get(0);
92+
/**
93+
* DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
94+
* downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index.
95+
*/
96+
logger.info("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption.");
97+
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.STARTED, TimeValue.timeValueSeconds(8));
10298

103-
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
104-
});
99+
logger.info("-> Starting the disruption.");
100+
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
105101

106-
// if the source index has already been downsampled and moved into the data stream just use its name directly
107-
final String targetIndex = sourceIndex.startsWith("downsample-5m-") ? sourceIndex : "downsample-5m-" + sourceIndex;
108-
assertBusy(() -> {
109-
try {
110-
GetSettingsResponse getSettingsResponse = cluster.client()
111-
.admin()
112-
.indices()
113-
.getSettings(new GetSettingsRequest().indices(targetIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN))
114-
.actionGet();
115-
Settings indexSettings = getSettingsResponse.getIndexToSettings().get(targetIndex);
116-
assertThat(indexSettings, is(notNullValue()));
117-
assertThat(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexSettings), is(IndexMetadata.DownsampleTaskStatus.SUCCESS));
118-
assertEquals("5m", IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.get(indexSettings));
119-
} catch (Exception e) {
120-
throw new AssertionError(e);
121-
}
122-
}, 120, TimeUnit.SECONDS);
102+
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.SUCCESS, TimeValue.timeValueSeconds(120));
123103
ensureGreen(targetIndex);
104+
logger.info("-> Relocation has finished");
105+
}
106+
107+
private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.DownsampleTaskStatus expectedStatus, TimeValue timeout) {
108+
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
109+
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
110+
final var indexMetadata = clusterState.metadata().index(downsampledIndex);
111+
if (indexMetadata == null) {
112+
return false;
113+
}
114+
var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
115+
if (expectedStatus == downsamplingStatus) {
116+
logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus);
117+
return true;
118+
}
119+
return false;
120+
});
121+
safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS);
124122
}
125123
}

0 commit comments

Comments
 (0)