1515import org .elasticsearch .cluster .metadata .DataStreamLifecycle .Downsampling ;
1616import org .elasticsearch .common .settings .Settings ;
1717import org .elasticsearch .core .TimeValue ;
18- import org .elasticsearch .datastreams .DataStreamsPlugin ;
1918import org .elasticsearch .datastreams .lifecycle .DataStreamLifecycleService ;
20- import org .elasticsearch .plugins .Plugin ;
2119import org .elasticsearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
22- import org .elasticsearch .test .ESIntegTestCase ;
2320import org .elasticsearch .test .junit .annotations .TestLogging ;
24- import org .elasticsearch .xpack .aggregatemetric .AggregateMetricMapperPlugin ;
25- import org .elasticsearch .xpack .core .LocalStateCompositeXPackPlugin ;
2621
27- import java .util .Collection ;
2822import java .util .HashSet ;
2923import java .util .List ;
3024import java .util .Set ;
3125import java .util .concurrent .TimeUnit ;
3226
3327import static org .elasticsearch .cluster .metadata .ClusterChangedEventUtils .indicesCreated ;
3428import static org .elasticsearch .cluster .metadata .DataStreamTestHelper .backingIndexEqualTo ;
35- import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .getBackingIndices ;
36- import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .putTSDBIndexTemplate ;
29+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
3730import static org .hamcrest .Matchers .is ;
3831
39- public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
32+ public class DataStreamLifecycleDownsampleIT extends DownsamplingIntegTestCase {
4033 public static final int DOC_COUNT = 50_000 ;
4134
42- @ Override
43- protected Collection <Class <? extends Plugin >> nodePlugins () {
44- return List .of (DataStreamsPlugin .class , LocalStateCompositeXPackPlugin .class , Downsample .class , AggregateMetricMapperPlugin .class );
45- }
46-
4735 @ Override
4836 protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
4937 Settings .Builder settings = Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings ));
@@ -66,8 +54,7 @@ public void testDownsampling() throws Exception {
6654 )
6755 .build ();
6856
69- DataStreamLifecycleDriver .setupTSDBDataStreamAndIngestDocs (
70- client (),
57+ setupTSDBDataStreamAndIngestDocs (
7158 dataStreamName ,
7259 "1986-01-08T23:40:53.384Z" ,
7360 "2022-01-08T23:40:53.384Z" ,
@@ -76,7 +63,7 @@ public void testDownsampling() throws Exception {
7663 "1990-09-09T18:00:00"
7764 );
7865
79- List <String > backingIndices = getBackingIndices ( client (), dataStreamName );
66+ List <String > backingIndices = getDataStreamBackingIndexNames ( dataStreamName );
8067 String firstGenerationBackingIndex = backingIndices .get (0 );
8168 String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex ;
8269 String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex ;
@@ -93,7 +80,7 @@ public void testDownsampling() throws Exception {
9380 });
9481 // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
9582 // testing so DSL doesn't have to wait for the end_time to lapse)
96- putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
83+ putTSDBIndexTemplate (dataStreamName , null , null , lifecycle );
9784
9885 client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )).actionGet ();
9986
@@ -109,7 +96,7 @@ public void testDownsampling() throws Exception {
10996 }, 30 , TimeUnit .SECONDS );
11097
11198 assertBusy (() -> {
112- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
99+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
113100
114101 assertThat (dsBackingIndices .size (), is (2 ));
115102 String writeIndex = dsBackingIndices .get (1 );
@@ -136,8 +123,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
136123 )
137124 )
138125 .build ();
139- DataStreamLifecycleDriver .setupTSDBDataStreamAndIngestDocs (
140- client (),
126+ setupTSDBDataStreamAndIngestDocs (
141127 dataStreamName ,
142128 "1986-01-08T23:40:53.384Z" ,
143129 "2022-01-08T23:40:53.384Z" ,
@@ -146,7 +132,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
146132 "1990-09-09T18:00:00"
147133 );
148134
149- List <String > backingIndices = getBackingIndices ( client (), dataStreamName );
135+ List <String > backingIndices = getDataStreamBackingIndexNames ( dataStreamName );
150136 String firstGenerationBackingIndex = backingIndices .get (0 );
151137 String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex ;
152138 String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex ;
@@ -163,7 +149,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
163149 });
164150 // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
165151 // testing so DSL doesn't have to wait for the end_time to lapse)
166- putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
152+ putTSDBIndexTemplate (dataStreamName , null , null , lifecycle );
167153 client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )).actionGet ();
168154
169155 assertBusy (() -> {
@@ -173,7 +159,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
173159 }, 30 , TimeUnit .SECONDS );
174160
175161 assertBusy (() -> {
176- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
162+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
177163
178164 assertThat (dsBackingIndices .size (), is (2 ));
179165 String writeIndex = dsBackingIndices .get (1 );
@@ -201,8 +187,7 @@ public void testUpdateDownsampleRound() throws Exception {
201187 )
202188 .build ();
203189
204- DataStreamLifecycleDriver .setupTSDBDataStreamAndIngestDocs (
205- client (),
190+ setupTSDBDataStreamAndIngestDocs (
206191 dataStreamName ,
207192 "1986-01-08T23:40:53.384Z" ,
208193 "2022-01-08T23:40:53.384Z" ,
@@ -211,7 +196,7 @@ public void testUpdateDownsampleRound() throws Exception {
211196 "1990-09-09T18:00:00"
212197 );
213198
214- List <String > backingIndices = getBackingIndices ( client (), dataStreamName );
199+ List <String > backingIndices = getDataStreamBackingIndexNames ( dataStreamName );
215200 String firstGenerationBackingIndex = backingIndices .get (0 );
216201 String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex ;
217202 String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex ;
@@ -228,8 +213,8 @@ public void testUpdateDownsampleRound() throws Exception {
228213 });
229214 // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
230215 // testing so DSL doesn't have to wait for the end_time to lapse)
231- putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
232- client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )). actionGet ( );
216+ putTSDBIndexTemplate (dataStreamName , null , null , lifecycle );
217+ safeGet ( client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )));
233218
234219 assertBusy (() -> {
235220 assertThat (witnessedDownsamplingIndices .size (), is (1 ));
@@ -238,7 +223,7 @@ public void testUpdateDownsampleRound() throws Exception {
238223 }, 30 , TimeUnit .SECONDS );
239224
240225 assertBusy (() -> {
241- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
226+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
242227 assertThat (dsBackingIndices .size (), is (2 ));
243228 String writeIndex = dsBackingIndices .get (1 );
244229 assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
@@ -247,22 +232,23 @@ public void testUpdateDownsampleRound() throws Exception {
247232
248233 // update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval
249234 // the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous
250- // `10s` interval downsample index, downsample it to `30s ` and replace it in the data stream instead of the `10s` one.
235+ // `10s` interval downsample index, downsample it to `20m ` and replace it in the data stream instead of the `10s` one.
251236 DataStreamLifecycle updatedLifecycle = DataStreamLifecycle .newBuilder ()
252237 .downsampling (
253238 new Downsampling (
254239 List .of (new Downsampling .Round (TimeValue .timeValueMillis (10 ), new DownsampleConfig (new DateHistogramInterval ("20m" ))))
255240 )
256241 )
257242 .build ();
258-
259- client ().execute (
260- PutDataStreamLifecycleAction .INSTANCE ,
261- new PutDataStreamLifecycleAction .Request (
262- TEST_REQUEST_TIMEOUT ,
263- TEST_REQUEST_TIMEOUT ,
264- new String [] { dataStreamName },
265- updatedLifecycle
243+ assertAcked (
244+ client ().execute (
245+ PutDataStreamLifecycleAction .INSTANCE ,
246+ new PutDataStreamLifecycleAction .Request (
247+ TEST_REQUEST_TIMEOUT ,
248+ TEST_REQUEST_TIMEOUT ,
249+ new String [] { dataStreamName },
250+ updatedLifecycle
251+ )
266252 )
267253 );
268254
@@ -271,7 +257,7 @@ public void testUpdateDownsampleRound() throws Exception {
271257 assertBusy (() -> {
272258 assertThat (indexExists (tenSecondsDownsampleIndex ), is (false ));
273259
274- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
260+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
275261 assertThat (dsBackingIndices .size (), is (2 ));
276262 String writeIndex = dsBackingIndices .get (1 );
277263 assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
0 commit comments