1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .action .admin .indices .rollover .RolloverAction ;
1313import org .elasticsearch .action .admin .indices .rollover .RolloverRequest ;
14- import org .elasticsearch .action .admin .indices .settings .get .GetSettingsRequest ;
15- import org .elasticsearch .action .admin .indices .settings .get .GetSettingsResponse ;
1614import org .elasticsearch .action .downsample .DownsampleConfig ;
17- import org .elasticsearch .action .support .IndicesOptions ;
1815import org .elasticsearch .cluster .metadata .DataStreamLifecycle ;
1916import org .elasticsearch .cluster .metadata .IndexMetadata ;
17+ import org .elasticsearch .cluster .service .ClusterService ;
2018import org .elasticsearch .common .settings .Settings ;
2119import org .elasticsearch .core .TimeValue ;
2220import org .elasticsearch .datastreams .DataStreamsPlugin ;
2321import org .elasticsearch .datastreams .lifecycle .DataStreamLifecycleService ;
2422import org .elasticsearch .plugins .Plugin ;
2523import org .elasticsearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
24+ import org .elasticsearch .test .ClusterServiceUtils ;
2625import org .elasticsearch .test .ESIntegTestCase ;
2726import org .elasticsearch .test .InternalTestCluster ;
28- import org .elasticsearch .test .junit .annotations .TestLogging ;
2927import org .elasticsearch .xpack .aggregatemetric .AggregateMetricMapperPlugin ;
3028import org .elasticsearch .xpack .core .LocalStateCompositeXPackPlugin ;
3129
3230import java .util .Collection ;
3331import java .util .List ;
3432import java .util .concurrent .TimeUnit ;
3533
34+ import static org .elasticsearch .cluster .metadata .IndexMetadata .INDEX_DOWNSAMPLE_STATUS ;
3635import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .getBackingIndices ;
3736import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .putTSDBIndexTemplate ;
38- import static org .hamcrest .Matchers .is ;
39- import static org .hamcrest .Matchers .notNullValue ;
4037
4138@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 , numClientNodes = 4 )
4239public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
4340 private static final Logger logger = LogManager .getLogger (DataStreamLifecycleDownsampleDisruptionIT .class );
44- public static final int DOC_COUNT = 50_000 ;
41+ public static final int DOC_COUNT = 25_000 ;
4542
4643 @ Override
4744 protected Collection <Class <? extends Plugin >> nodePlugins () {
@@ -55,7 +52,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5552 return settings .build ();
5653 }
5754
58- @ TestLogging (value = "org.elasticsearch.datastreams.lifecycle:TRACE" , reason = "debugging" )
5955 public void testDataStreamLifecycleDownsampleRollingRestart () throws Exception {
6056 final InternalTestCluster cluster = internalCluster ();
6157 cluster .startMasterOnlyNodes (1 );
@@ -90,36 +86,38 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
9086 // testing so DSL doesn't have to wait for the end_time to lapse)
9187 putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
9288 client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )).actionGet ();
89+ String sourceIndex = getBackingIndices (client (), dataStreamName ).get (0 );
90+ final String targetIndex = "downsample-5m-" + sourceIndex ;
9391
94- // DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
95- // downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
96- long sleepTime = randomLongBetween (3000 , 4500 );
97- logger .info ("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption" , sleepTime );
98- Thread .sleep (sleepTime );
99- List <String > backingIndices = getBackingIndices (client (), dataStreamName );
100- // first generation index
101- String sourceIndex = backingIndices .get (0 );
92+ /**
93+ * DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
94+ * downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index.
95+ */
96+ logger .info ("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption." );
97+ ensureDownsamplingStatus (targetIndex , IndexMetadata .DownsampleTaskStatus .STARTED , TimeValue .timeValueSeconds (8 ));
10298
103- internalCluster (). rollingRestart ( new InternalTestCluster . RestartCallback () {
104- } );
99+ logger . info ( "-> Starting the disruption." );
100+ internalCluster (). rollingRestart ( new InternalTestCluster . RestartCallback () );
105101
106- // if the source index has already been downsampled and moved into the data stream just use its name directly
107- final String targetIndex = sourceIndex .startsWith ("downsample-5m-" ) ? sourceIndex : "downsample-5m-" + sourceIndex ;
108- assertBusy (() -> {
109- try {
110- GetSettingsResponse getSettingsResponse = cluster .client ()
111- .admin ()
112- .indices ()
113- .getSettings (new GetSettingsRequest ().indices (targetIndex ).indicesOptions (IndicesOptions .LENIENT_EXPAND_OPEN ))
114- .actionGet ();
115- Settings indexSettings = getSettingsResponse .getIndexToSettings ().get (targetIndex );
116- assertThat (indexSettings , is (notNullValue ()));
117- assertThat (IndexMetadata .INDEX_DOWNSAMPLE_STATUS .get (indexSettings ), is (IndexMetadata .DownsampleTaskStatus .SUCCESS ));
118- assertEquals ("5m" , IndexMetadata .INDEX_DOWNSAMPLE_INTERVAL .get (indexSettings ));
119- } catch (Exception e ) {
120- throw new AssertionError (e );
121- }
122- }, 120 , TimeUnit .SECONDS );
102+ ensureDownsamplingStatus (targetIndex , IndexMetadata .DownsampleTaskStatus .SUCCESS , TimeValue .timeValueSeconds (120 ));
123103 ensureGreen (targetIndex );
104+ logger .info ("-> Relocation has finished" );
105+ }
106+
107+ private void ensureDownsamplingStatus (String downsampledIndex , IndexMetadata .DownsampleTaskStatus expectedStatus , TimeValue timeout ) {
108+ final var clusterService = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class );
109+ final var listener = ClusterServiceUtils .addTemporaryStateListener (clusterService , clusterState -> {
110+ final var indexMetadata = clusterState .metadata ().index (downsampledIndex );
111+ if (indexMetadata == null ) {
112+ return false ;
113+ }
114+ var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS .get (indexMetadata .getSettings ());
115+ if (expectedStatus == downsamplingStatus ) {
116+ logger .info ("-> Downsampling status for index [{}] is [{}]" , downsampledIndex , downsamplingStatus );
117+ return true ;
118+ }
119+ return false ;
120+ });
121+ safeAwait (listener , timeout .millis (), TimeUnit .MILLISECONDS );
124122 }
125123}
0 commit comments