Skip to content

Commit 1bfb88c

Browse files
committed
Effort to fix testDataStreamLifecycleDownsampleRollingRestart elastic#123769 (elastic#125478)
(cherry picked from commit 1943844) # Conflicts: # muted-tests.yml # x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java
1 parent b8217ab commit 1bfb88c

File tree

2 files changed

+55
-37
lines changed

2 files changed

+55
-37
lines changed

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2424,9 +2424,19 @@ public static void safeAcquire(int permits, Semaphore semaphore) {
24242424
* @return The value with which the {@code listener} was completed.
24252425
*/
24262426
public static <T> T safeAwait(SubscribableListener<T> listener) {
2427+
return safeAwait(listener, SAFE_AWAIT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
2428+
}
2429+
2430+
/**
2431+
* Wait for the successful completion of the given {@link SubscribableListener}, respecting the provided timeout,
2432+
* preserving the thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2433+
*
2434+
* @return The value with which the {@code listener} was completed.
2435+
*/
2436+
public static <T> T safeAwait(SubscribableListener<T> listener, long timeout, TimeUnit unit) {
24272437
final var future = new TestPlainActionFuture<T>();
24282438
listener.addListener(future);
2429-
return safeGet(future);
2439+
return safeGet(future, timeout, unit);
24302440
}
24312441

24322442
/**
@@ -2456,8 +2466,18 @@ public static <T extends ActionResponse> T safeExecute(ElasticsearchClient clien
24562466
* @return The value with which the {@code future} was completed.
24572467
*/
24582468
public static <T> T safeGet(Future<T> future) {
2469+
return safeGet(future, SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2470+
}
2471+
2472+
/**
2473+
* Wait for the successful completion of the given {@link Future}, respecting the provided timeout, preserving the
2474+
* thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
2475+
*
2476+
* @return The value with which the {@code future} was completed.
2477+
*/
2478+
public static <T> T safeGet(Future<T> future, long timeout, TimeUnit unit) {
24592479
try {
2460-
return future.get(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
2480+
return future.get(timeout, unit);
24612481
} catch (InterruptedException e) {
24622482
Thread.currentThread().interrupt();
24632483
throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e);

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,34 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1313
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
14-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
15-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
1614
import org.elasticsearch.action.downsample.DownsampleConfig;
17-
import org.elasticsearch.action.support.IndicesOptions;
1815
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
1916
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.service.ClusterService;
2018
import org.elasticsearch.common.settings.Settings;
2119
import org.elasticsearch.core.TimeValue;
2220
import org.elasticsearch.datastreams.DataStreamsPlugin;
2321
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
2422
import org.elasticsearch.plugins.Plugin;
2523
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
24+
import org.elasticsearch.test.ClusterServiceUtils;
2625
import org.elasticsearch.test.ESIntegTestCase;
2726
import org.elasticsearch.test.InternalTestCluster;
28-
import org.elasticsearch.test.junit.annotations.TestLogging;
2927
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
3028
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
3129

3230
import java.util.Collection;
3331
import java.util.List;
3432
import java.util.concurrent.TimeUnit;
3533

34+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
3635
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
3736
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;
38-
import static org.hamcrest.Matchers.is;
39-
import static org.hamcrest.Matchers.notNullValue;
4037

4138
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4)
4239
public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
4340
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class);
44-
public static final int DOC_COUNT = 50_000;
41+
public static final int DOC_COUNT = 25_000;
4542

4643
@Override
4744
protected Collection<Class<? extends Plugin>> nodePlugins() {
@@ -55,7 +52,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5552
return settings.build();
5653
}
5754

58-
@TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
5955
public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
6056
final InternalTestCluster cluster = internalCluster();
6157
cluster.startMasterOnlyNodes(1);
@@ -88,36 +84,38 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
8884
// testing so DSL doesn't have to wait for the end_time to lapse)
8985
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
9086
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
87+
String sourceIndex = getBackingIndices(client(), dataStreamName).get(0);
88+
final String targetIndex = "downsample-5m-" + sourceIndex;
9189

92-
// DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
93-
// downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
94-
long sleepTime = randomLongBetween(3000, 4500);
95-
logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime);
96-
Thread.sleep(sleepTime);
97-
List<String> backingIndices = getBackingIndices(client(), dataStreamName);
98-
// first generation index
99-
String sourceIndex = backingIndices.get(0);
90+
/**
91+
* DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
92+
* downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index.
93+
*/
94+
logger.info("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption.");
95+
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.STARTED, TimeValue.timeValueSeconds(8));
10096

101-
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
102-
});
97+
logger.info("-> Starting the disruption.");
98+
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
10399

104-
// if the source index has already been downsampled and moved into the data stream just use its name directly
105-
final String targetIndex = sourceIndex.startsWith("downsample-5m-") ? sourceIndex : "downsample-5m-" + sourceIndex;
106-
assertBusy(() -> {
107-
try {
108-
GetSettingsResponse getSettingsResponse = cluster.client()
109-
.admin()
110-
.indices()
111-
.getSettings(new GetSettingsRequest().indices(targetIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN))
112-
.actionGet();
113-
Settings indexSettings = getSettingsResponse.getIndexToSettings().get(targetIndex);
114-
assertThat(indexSettings, is(notNullValue()));
115-
assertThat(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexSettings), is(IndexMetadata.DownsampleTaskStatus.SUCCESS));
116-
assertEquals("5m", IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.get(indexSettings));
117-
} catch (Exception e) {
118-
throw new AssertionError(e);
119-
}
120-
}, 120, TimeUnit.SECONDS);
100+
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.SUCCESS, TimeValue.timeValueSeconds(120));
121101
ensureGreen(targetIndex);
102+
logger.info("-> Relocation has finished");
103+
}
104+
105+
private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.DownsampleTaskStatus expectedStatus, TimeValue timeout) {
106+
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
107+
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
108+
final var indexMetadata = clusterState.metadata().index(downsampledIndex);
109+
if (indexMetadata == null) {
110+
return false;
111+
}
112+
var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
113+
if (expectedStatus == downsamplingStatus) {
114+
logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus);
115+
return true;
116+
}
117+
return false;
118+
});
119+
safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS);
122120
}
123121
}

0 commit comments

Comments
 (0)