1111import  org .apache .logging .log4j .Logger ;
1212import  org .elasticsearch .action .admin .indices .rollover .RolloverAction ;
1313import  org .elasticsearch .action .admin .indices .rollover .RolloverRequest ;
14- import  org .elasticsearch .action .admin .indices .settings .get .GetSettingsRequest ;
15- import  org .elasticsearch .action .admin .indices .settings .get .GetSettingsResponse ;
1614import  org .elasticsearch .action .downsample .DownsampleConfig ;
17- import  org .elasticsearch .action .support .IndicesOptions ;
1815import  org .elasticsearch .cluster .metadata .DataStreamLifecycle ;
1916import  org .elasticsearch .cluster .metadata .IndexMetadata ;
17+ import  org .elasticsearch .cluster .service .ClusterService ;
2018import  org .elasticsearch .common .settings .Settings ;
2119import  org .elasticsearch .core .TimeValue ;
2220import  org .elasticsearch .datastreams .DataStreamsPlugin ;
2321import  org .elasticsearch .datastreams .lifecycle .DataStreamLifecycleService ;
2422import  org .elasticsearch .plugins .Plugin ;
2523import  org .elasticsearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
24+ import  org .elasticsearch .test .ClusterServiceUtils ;
2625import  org .elasticsearch .test .ESIntegTestCase ;
2726import  org .elasticsearch .test .InternalTestCluster ;
28- import  org .elasticsearch .test .junit .annotations .TestLogging ;
2927import  org .elasticsearch .xpack .aggregatemetric .AggregateMetricMapperPlugin ;
3028import  org .elasticsearch .xpack .core .LocalStateCompositeXPackPlugin ;
3129
3230import  java .util .Collection ;
3331import  java .util .List ;
3432import  java .util .concurrent .TimeUnit ;
3533
34+ import  static  org .elasticsearch .cluster .metadata .IndexMetadata .INDEX_DOWNSAMPLE_STATUS ;
3635import  static  org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .getBackingIndices ;
3736import  static  org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .putTSDBIndexTemplate ;
38- import  static  org .hamcrest .Matchers .is ;
39- import  static  org .hamcrest .Matchers .notNullValue ;
4037
4138@ ESIntegTestCase .ClusterScope (scope  = ESIntegTestCase .Scope .TEST , numDataNodes  = 0 , numClientNodes  = 4 )
4239public  class  DataStreamLifecycleDownsampleDisruptionIT  extends  ESIntegTestCase  {
4340    private  static  final  Logger  logger  = LogManager .getLogger (DataStreamLifecycleDownsampleDisruptionIT .class );
44-     public  static  final  int  DOC_COUNT  = 50_000 ;
41+     public  static  final  int  DOC_COUNT  = 25_000 ;
4542
4643    @ Override 
4744    protected  Collection <Class <? extends  Plugin >> nodePlugins () {
@@ -55,7 +52,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5552        return  settings .build ();
5653    }
5754
58-     @ TestLogging (value  = "org.elasticsearch.datastreams.lifecycle:TRACE" , reason  = "debugging" )
5955    public  void  testDataStreamLifecycleDownsampleRollingRestart () throws  Exception  {
6056        final  InternalTestCluster  cluster  = internalCluster ();
6157        cluster .startMasterOnlyNodes (1 );
@@ -88,36 +84,38 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
8884        // testing so DSL doesn't have to wait for the end_time to lapse) 
8985        putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
9086        client ().execute (RolloverAction .INSTANCE , new  RolloverRequest (dataStreamName , null )).actionGet ();
87+         String  sourceIndex  = getBackingIndices (client (), dataStreamName ).get (0 );
88+         final  String  targetIndex  = "downsample-5m-"  + sourceIndex ;
9189
92-         // DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts 
93-         // downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution. 
94-         long  sleepTime  = randomLongBetween (3000 , 4500 );
95-         logger .info ("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption" , sleepTime );
96-         Thread .sleep (sleepTime );
97-         List <String > backingIndices  = getBackingIndices (client (), dataStreamName );
98-         // first generation index 
99-         String  sourceIndex  = backingIndices .get (0 );
90+         /** 
91+          * DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts 
92+          * downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index. 
93+          */ 
94+         logger .info ("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption." );
95+         ensureDownsamplingStatus (targetIndex , IndexMetadata .DownsampleTaskStatus .STARTED , TimeValue .timeValueSeconds (8 ));
10096
101-         internalCluster (). rollingRestart ( new   InternalTestCluster . RestartCallback () { 
102-         } );
97+         logger . info ( "-> Starting the disruption." ); 
98+         internalCluster (). rollingRestart ( new   InternalTestCluster . RestartCallback () );
10399
104-         // if the source index has already been downsampled and moved into the data stream just use its name directly 
105-         final  String  targetIndex  = sourceIndex .startsWith ("downsample-5m-" ) ? sourceIndex  : "downsample-5m-"  + sourceIndex ;
106-         assertBusy (() -> {
107-             try  {
108-                 GetSettingsResponse  getSettingsResponse  = cluster .client ()
109-                     .admin ()
110-                     .indices ()
111-                     .getSettings (new  GetSettingsRequest ().indices (targetIndex ).indicesOptions (IndicesOptions .LENIENT_EXPAND_OPEN ))
112-                     .actionGet ();
113-                 Settings  indexSettings  = getSettingsResponse .getIndexToSettings ().get (targetIndex );
114-                 assertThat (indexSettings , is (notNullValue ()));
115-                 assertThat (IndexMetadata .INDEX_DOWNSAMPLE_STATUS .get (indexSettings ), is (IndexMetadata .DownsampleTaskStatus .SUCCESS ));
116-                 assertEquals ("5m" , IndexMetadata .INDEX_DOWNSAMPLE_INTERVAL .get (indexSettings ));
117-             } catch  (Exception  e ) {
118-                 throw  new  AssertionError (e );
119-             }
120-         }, 120 , TimeUnit .SECONDS );
100+         ensureDownsamplingStatus (targetIndex , IndexMetadata .DownsampleTaskStatus .SUCCESS , TimeValue .timeValueSeconds (120 ));
121101        ensureGreen (targetIndex );
102+         logger .info ("-> Relocation has finished" );
103+     }
104+ 
105+     private  void  ensureDownsamplingStatus (String  downsampledIndex , IndexMetadata .DownsampleTaskStatus  expectedStatus , TimeValue  timeout ) {
106+         final  var  clusterService  = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class );
107+         final  var  listener  = ClusterServiceUtils .addTemporaryStateListener (clusterService , clusterState  -> {
108+             final  var  indexMetadata  = clusterState .metadata ().index (downsampledIndex );
109+             if  (indexMetadata  == null ) {
110+                 return  false ;
111+             }
112+             var  downsamplingStatus  = INDEX_DOWNSAMPLE_STATUS .get (indexMetadata .getSettings ());
113+             if  (expectedStatus  == downsamplingStatus ) {
114+                 logger .info ("-> Downsampling status for index [{}] is [{}]" , downsampledIndex , downsamplingStatus );
115+                 return  true ;
116+             }
117+             return  false ;
118+         });
119+         safeAwait (listener , timeout .millis (), TimeUnit .MILLISECONDS );
122120    }
123121}
0 commit comments