From a969ed0044d95fdd64f60d489e1bbd272e4091b0 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Mon, 24 Mar 2025 11:55:09 +0200 Subject: [PATCH 1/6] Use a cluster state listener to detect the downsampling status to avoid "sleeping" --- muted-tests.yml | 3 - ...StreamLifecycleDownsampleDisruptionIT.java | 85 +++++++++++-------- 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 2cb96c3c42d4e..9efbceb351867 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -254,9 +254,6 @@ tests: - class: org.elasticsearch.test.rest.yaml.RcsCcsCommonYamlTestSuiteIT method: test {p0=search.vectors/41_knn_search_bbq_hnsw/Test knn search} issue: https://github.com/elastic/elasticsearch/issues/123727 -- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT - method: testDataStreamLifecycleDownsampleRollingRestart - issue: https://github.com/elastic/elasticsearch/issues/123769 - class: org.elasticsearch.xpack.searchablesnapshots.FrozenSearchableSnapshotsIntegTests method: testCreateAndRestorePartialSearchableSnapshot issue: https://github.com/elastic/elasticsearch/issues/123773 diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index f45a602af544d..a38907ed131cb 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -11,32 +11,31 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.downsample.DownsampleConfig; -import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.TestPlainActionFuture; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS; import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices; import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4) public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase { @@ -55,7 +54,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return settings.build(); } - @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging") public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { final InternalTestCluster cluster = internalCluster(); cluster.startMasterOnlyNodes(1); @@ -88,38 +86,57 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { // testing so DSL doesn't have to wait for the end_time to lapse) putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle); client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); + String sourceIndex = getBackingIndices(client(), dataStreamName).get(0); + final String targetIndex = "downsample-5m-" + sourceIndex; - // DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts - // downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution. - long sleepTime = randomLongBetween(3000, 4500); - logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime); - Thread.sleep(sleepTime); - List backingIndices = getBackingIndices(client(), dataStreamName); - // first generation index - String sourceIndex = backingIndices.get(0); + /** + * DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts + * downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index. + */ + logger.info("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption."); + ensureDownsamplingStatus( + targetIndex, + Set.of(IndexMetadata.DownsampleTaskStatus.STARTED, IndexMetadata.DownsampleTaskStatus.SUCCESS), + TimeValue.timeValueSeconds(5) + ); + logger.info("-> Starting the disruption."); internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { }); - // if the source index has already been downsampled and moved into the data stream just use its name directly - final String targetIndex = sourceIndex.startsWith("downsample-5m-") ? sourceIndex : "downsample-5m-" + sourceIndex; - assertBusy(() -> { - try { - GetSettingsResponse getSettingsResponse = cluster.client() - .admin() - .indices() - .getSettings( - new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(targetIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - ) - .actionGet(); - Settings indexSettings = getSettingsResponse.getIndexToSettings().get(targetIndex); - assertThat(indexSettings, is(notNullValue())); - assertThat(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexSettings), is(IndexMetadata.DownsampleTaskStatus.SUCCESS)); - assertEquals("5m", IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.get(indexSettings)); - } catch (Exception e) { - throw new AssertionError(e); - } - }, 120, TimeUnit.SECONDS); + ensureDownsamplingStatus(targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.SUCCESS), TimeValue.timeValueMinutes(2)); ensureGreen(targetIndex); + logger.info("-> Relocation has finished"); + } + + private void ensureDownsamplingStatus( + String downsampledIndex, + Set expectedStatuses, + TimeValue timeout + ) { + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final IndexMetadata.DownsampleTaskStatus[] downsamplingStatus = new IndexMetadata.DownsampleTaskStatus[1]; + final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> { + final var indexMetadata = clusterState.metadata().getProject().index(downsampledIndex); + if (indexMetadata == null) { + return false; + } + downsamplingStatus[0] = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); + return expectedStatuses.contains(downsamplingStatus[0]); + }); + try { + final var future = new TestPlainActionFuture(); + listener.addListener(future); + future.get(timeout.getMillis(), TimeUnit.MILLISECONDS); + logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus[0]); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new AssertionError( + "Error while waiting for " + expectedStatuses + " but found '" + downsamplingStatus[0] + "'. " + e.getMessage(), + e + ); + } } } From 0caf4dd4fd6c5470bac20759b4650e1f20f07b1c Mon Sep 17 00:00:00 2001 From: gmarouli Date: Mon, 24 Mar 2025 12:09:45 +0200 Subject: [PATCH 2/6] Reduce the amount of docs we are indexing to avoid flakiness --- .../downsample/DataStreamLifecycleDownsampleDisruptionIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index a38907ed131cb..1cd43fc463cc7 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -40,7 +40,7 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4) public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase { private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class); - public static final int DOC_COUNT = 50_000; + public static final int DOC_COUNT = 25_000; @Override protected Collection> nodePlugins() { From 38c188206e5c1192ed2da72369f593c92daaa52a Mon Sep 17 00:00:00 2001 From: gmarouli Date: Mon, 24 Mar 2025 12:17:09 +0200 Subject: [PATCH 3/6] Bring the timeouts closer to the original --- .../downsample/DataStreamLifecycleDownsampleDisruptionIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index 1cd43fc463cc7..3c5860b9d4d69 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -97,14 +97,14 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { ensureDownsamplingStatus( targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.STARTED, IndexMetadata.DownsampleTaskStatus.SUCCESS), - TimeValue.timeValueSeconds(5) + TimeValue.timeValueMillis(4500) ); logger.info("-> Starting the disruption."); internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { }); - ensureDownsamplingStatus(targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.SUCCESS), TimeValue.timeValueMinutes(2)); + ensureDownsamplingStatus(targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.SUCCESS), TimeValue.timeValueSeconds(120)); ensureGreen(targetIndex); logger.info("-> Relocation has finished"); } From 4377b72b878feac8c700308008ddf0b986bc1e8c Mon Sep 17 00:00:00 2001 From: gmarouli Date: Mon, 24 Mar 2025 19:41:13 +0200 Subject: [PATCH 4/6] Increase the timeout when waiting for the downsampling to start --- .../downsample/DataStreamLifecycleDownsampleDisruptionIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index 3c5860b9d4d69..ef52263f36fef 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -97,7 +97,7 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { ensureDownsamplingStatus( targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.STARTED, IndexMetadata.DownsampleTaskStatus.SUCCESS), - TimeValue.timeValueMillis(4500) + TimeValue.timeValueSeconds(6) ); logger.info("-> Starting the disruption."); From 091c227bf73c58c2d87c3a9a9bd617bce9fc57dc Mon Sep 17 00:00:00 2001 From: gmarouli Date: Wed, 26 Mar 2025 09:40:21 +0200 Subject: [PATCH 5/6] Overload `safeGet` and `safeAwait` to also accept timeouts --- .../org/elasticsearch/test/ESTestCase.java | 24 ++++++++++++++-- ...StreamLifecycleDownsampleDisruptionIT.java | 28 ++++++------------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index bdb35ceed7d4e..67445f1f0722c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -2477,9 +2477,19 @@ public static void safeAcquire(int permits, Semaphore semaphore) { * @return The value with which the {@code listener} was completed. */ public static T safeAwait(SubscribableListener listener) { + return safeAwait(listener, SAFE_AWAIT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS); + } + + /** + * Wait for the successful completion of the given {@link SubscribableListener}, respecting the provided timeout, + * preserving the thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure. + * + * @return The value with which the {@code listener} was completed. + */ + public static T safeAwait(SubscribableListener listener, long timeout, TimeUnit unit) { final var future = new TestPlainActionFuture(); listener.addListener(future); - return safeGet(future); + return safeGet(future, timeout, unit); } /** @@ -2509,8 +2519,18 @@ public static T safeExecute(ElasticsearchClient clien * @return The value with which the {@code future} was completed. */ public static T safeGet(Future future) { + return safeGet(future, SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS); + } + + /** + * Wait for the successful completion of the given {@link Future}, respecting the provided timeout, preserving the + * thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure. + * + * @return The value with which the {@code future} was completed. + */ + public static T safeGet(Future future, long timeout, TimeUnit unit) { try { - return future.get(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS); + return future.get(timeout, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e); diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index ef52263f36fef..489ed5d0c3582 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.downsample.DownsampleConfig; -import org.elasticsearch.action.support.TestPlainActionFuture; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; @@ -101,8 +100,7 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { ); logger.info("-> Starting the disruption."); - internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { - }); + internalCluster().rollingRestart(new InternalTestCluster.RestartCallback()); ensureDownsamplingStatus(targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.SUCCESS), TimeValue.timeValueSeconds(120)); ensureGreen(targetIndex); @@ -115,28 +113,18 @@ private void ensureDownsamplingStatus( TimeValue timeout ) { final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); - final IndexMetadata.DownsampleTaskStatus[] downsamplingStatus = new IndexMetadata.DownsampleTaskStatus[1]; final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> { final var indexMetadata = clusterState.metadata().getProject().index(downsampledIndex); if (indexMetadata == null) { return false; } - downsamplingStatus[0] = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); - return expectedStatuses.contains(downsamplingStatus[0]); - }); - try { - final var future = new TestPlainActionFuture(); - listener.addListener(future); - future.get(timeout.getMillis(), TimeUnit.MILLISECONDS); - logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus[0]); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); + if (expectedStatuses.contains(downsamplingStatus)) { + logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus); + return true; } - throw new AssertionError( - "Error while waiting for " + expectedStatuses + " but found '" + downsamplingStatus[0] + "'. " + e.getMessage(), - e - ); - } + return false; + }); + safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS); } } From 77ed7ff9bab1b0561b7db722322cd1b3e92eb8fa Mon Sep 17 00:00:00 2001 From: gmarouli Date: Fri, 28 Mar 2025 11:40:44 +0200 Subject: [PATCH 6/6] Make the status check stricter --- ...taStreamLifecycleDownsampleDisruptionIT.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index 489ed5d0c3582..a96a7f20a815f 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -29,7 +29,6 @@ import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS; @@ -93,25 +92,17 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { * downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index. */ logger.info("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption."); - ensureDownsamplingStatus( - targetIndex, - Set.of(IndexMetadata.DownsampleTaskStatus.STARTED, IndexMetadata.DownsampleTaskStatus.SUCCESS), - TimeValue.timeValueSeconds(6) - ); + ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.STARTED, TimeValue.timeValueSeconds(8)); logger.info("-> Starting the disruption."); internalCluster().rollingRestart(new InternalTestCluster.RestartCallback()); - ensureDownsamplingStatus(targetIndex, Set.of(IndexMetadata.DownsampleTaskStatus.SUCCESS), TimeValue.timeValueSeconds(120)); + ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.SUCCESS, TimeValue.timeValueSeconds(120)); ensureGreen(targetIndex); logger.info("-> Relocation has finished"); } - private void ensureDownsamplingStatus( - String downsampledIndex, - Set expectedStatuses, - TimeValue timeout - ) { + private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.DownsampleTaskStatus expectedStatus, TimeValue timeout) { final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> { final var indexMetadata = clusterState.metadata().getProject().index(downsampledIndex); @@ -119,7 +110,7 @@ private void ensureDownsamplingStatus( return false; } var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); - if (expectedStatuses.contains(downsamplingStatus)) { + if (expectedStatus == downsamplingStatus) { logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus); return true; }