1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .action .admin .indices .rollover .RolloverAction ;
1313import org .elasticsearch .action .admin .indices .rollover .RolloverRequest ;
14- import org .elasticsearch .action .admin .indices .settings .get .GetSettingsRequest ;
15- import org .elasticsearch .action .admin .indices .settings .get .GetSettingsResponse ;
1614import org .elasticsearch .action .downsample .DownsampleConfig ;
17- import org .elasticsearch .action .support .IndicesOptions ;
15+ import org .elasticsearch .action .support .TestPlainActionFuture ;
1816import org .elasticsearch .cluster .metadata .DataStreamLifecycle ;
1917import org .elasticsearch .cluster .metadata .IndexMetadata ;
18+ import org .elasticsearch .cluster .service .ClusterService ;
2019import org .elasticsearch .common .settings .Settings ;
2120import org .elasticsearch .core .TimeValue ;
2221import org .elasticsearch .datastreams .DataStreamsPlugin ;
2322import org .elasticsearch .datastreams .lifecycle .DataStreamLifecycleService ;
2423import org .elasticsearch .plugins .Plugin ;
2524import org .elasticsearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
25+ import org .elasticsearch .test .ClusterServiceUtils ;
2626import org .elasticsearch .test .ESIntegTestCase ;
2727import org .elasticsearch .test .InternalTestCluster ;
28- import org .elasticsearch .test .junit .annotations .TestLogging ;
2928import org .elasticsearch .xpack .aggregatemetric .AggregateMetricMapperPlugin ;
3029import org .elasticsearch .xpack .core .LocalStateCompositeXPackPlugin ;
3130
3231import java .util .Collection ;
3332import java .util .List ;
33+ import java .util .Set ;
3434import java .util .concurrent .TimeUnit ;
3535
36+ import static org .elasticsearch .cluster .metadata .IndexMetadata .INDEX_DOWNSAMPLE_STATUS ;
3637import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .getBackingIndices ;
3738import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .putTSDBIndexTemplate ;
38- import static org .hamcrest .Matchers .is ;
39- import static org .hamcrest .Matchers .notNullValue ;
4039
4140@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 , numClientNodes = 4 )
4241public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
@@ -55,7 +54,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5554 return settings .build ();
5655 }
5756
58- @ TestLogging (value = "org.elasticsearch.datastreams.lifecycle:TRACE" , reason = "debugging" )
5957 public void testDataStreamLifecycleDownsampleRollingRestart () throws Exception {
6058 final InternalTestCluster cluster = internalCluster ();
6159 cluster .startMasterOnlyNodes (1 );
@@ -88,38 +86,57 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
8886 // testing so DSL doesn't have to wait for the end_time to lapse)
8987 putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
9088 client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )).actionGet ();
89+ String sourceIndex = getBackingIndices (client (), dataStreamName ).get (0 );
90+ final String targetIndex = "downsample-5m-" + sourceIndex ;
9191
92- // DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
93- // downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
94- long sleepTime = randomLongBetween (3000 , 4500 );
95- logger .info ("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption" , sleepTime );
96- Thread .sleep (sleepTime );
97- List <String > backingIndices = getBackingIndices (client (), dataStreamName );
98- // first generation index
99- String sourceIndex = backingIndices .get (0 );
92+ /**
93+ * DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
94+ * downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index.
95+ */
96+ logger .info ("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption." );
97+ ensureDownsamplingStatus (
98+ targetIndex ,
99+ Set .of (IndexMetadata .DownsampleTaskStatus .STARTED , IndexMetadata .DownsampleTaskStatus .SUCCESS ),
100+ TimeValue .timeValueSeconds (5 )
101+ );
100102
103+ logger .info ("-> Starting the disruption." );
101104 internalCluster ().rollingRestart (new InternalTestCluster .RestartCallback () {
102105 });
103106
104- // if the source index has already been downsampled and moved into the data stream just use its name directly
105- final String targetIndex = sourceIndex .startsWith ("downsample-5m-" ) ? sourceIndex : "downsample-5m-" + sourceIndex ;
106- assertBusy (() -> {
107- try {
108- GetSettingsResponse getSettingsResponse = cluster .client ()
109- .admin ()
110- .indices ()
111- .getSettings (
112- new GetSettingsRequest (TEST_REQUEST_TIMEOUT ).indices (targetIndex ).indicesOptions (IndicesOptions .LENIENT_EXPAND_OPEN )
113- )
114- .actionGet ();
115- Settings indexSettings = getSettingsResponse .getIndexToSettings ().get (targetIndex );
116- assertThat (indexSettings , is (notNullValue ()));
117- assertThat (IndexMetadata .INDEX_DOWNSAMPLE_STATUS .get (indexSettings ), is (IndexMetadata .DownsampleTaskStatus .SUCCESS ));
118- assertEquals ("5m" , IndexMetadata .INDEX_DOWNSAMPLE_INTERVAL .get (indexSettings ));
119- } catch (Exception e ) {
120- throw new AssertionError (e );
121- }
122- }, 120 , TimeUnit .SECONDS );
107+ ensureDownsamplingStatus (targetIndex , Set .of (IndexMetadata .DownsampleTaskStatus .SUCCESS ), TimeValue .timeValueMinutes (2 ));
123108 ensureGreen (targetIndex );
109+ logger .info ("-> Relocation has finished" );
110+ }
111+
112+ private void ensureDownsamplingStatus (
113+ String downsampledIndex ,
114+ Set <IndexMetadata .DownsampleTaskStatus > expectedStatuses ,
115+ TimeValue timeout
116+ ) {
117+ final var clusterService = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class );
118+ final IndexMetadata .DownsampleTaskStatus [] downsamplingStatus = new IndexMetadata .DownsampleTaskStatus [1 ];
119+ final var listener = ClusterServiceUtils .addTemporaryStateListener (clusterService , clusterState -> {
120+ final var indexMetadata = clusterState .metadata ().getProject ().index (downsampledIndex );
121+ if (indexMetadata == null ) {
122+ return false ;
123+ }
124+ downsamplingStatus [0 ] = INDEX_DOWNSAMPLE_STATUS .get (indexMetadata .getSettings ());
125+ return expectedStatuses .contains (downsamplingStatus [0 ]);
126+ });
127+ try {
128+ final var future = new TestPlainActionFuture <Void >();
129+ listener .addListener (future );
130+ future .get (timeout .getMillis (), TimeUnit .MILLISECONDS );
131+ logger .info ("-> Downsampling status for index [{}] is [{}]" , downsampledIndex , downsamplingStatus [0 ]);
132+ } catch (Exception e ) {
133+ if (e instanceof InterruptedException ) {
134+ Thread .currentThread ().interrupt ();
135+ }
136+ throw new AssertionError (
137+ "Error while waiting for " + expectedStatuses + " but found '" + downsamplingStatus [0 ] + "'. " + e .getMessage (),
138+ e
139+ );
140+ }
124141 }
125142}
0 commit comments