15
15
import org .elasticsearch .cluster .metadata .DataStreamLifecycle .Downsampling ;
16
16
import org .elasticsearch .common .settings .Settings ;
17
17
import org .elasticsearch .core .TimeValue ;
18
- import org .elasticsearch .datastreams .DataStreamsPlugin ;
19
18
import org .elasticsearch .datastreams .lifecycle .DataStreamLifecycleService ;
20
- import org .elasticsearch .plugins .Plugin ;
21
19
import org .elasticsearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
22
- import org .elasticsearch .test .ESIntegTestCase ;
23
20
import org .elasticsearch .test .junit .annotations .TestLogging ;
24
- import org .elasticsearch .xpack .aggregatemetric .AggregateMetricMapperPlugin ;
25
- import org .elasticsearch .xpack .core .LocalStateCompositeXPackPlugin ;
26
21
27
- import java .util .Collection ;
28
22
import java .util .HashSet ;
29
23
import java .util .List ;
30
24
import java .util .Set ;
31
25
import java .util .concurrent .TimeUnit ;
32
26
33
27
import static org .elasticsearch .cluster .metadata .ClusterChangedEventUtils .indicesCreated ;
34
28
import static org .elasticsearch .cluster .metadata .DataStreamTestHelper .backingIndexEqualTo ;
35
- import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .getBackingIndices ;
36
- import static org .elasticsearch .xpack .downsample .DataStreamLifecycleDriver .putTSDBIndexTemplate ;
29
+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
37
30
import static org .hamcrest .Matchers .is ;
38
31
39
- public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
32
+ public class DataStreamLifecycleDownsampleIT extends DownsamplingIntegTestCase {
40
33
public static final int DOC_COUNT = 50_000 ;
41
34
42
- @ Override
43
- protected Collection <Class <? extends Plugin >> nodePlugins () {
44
- return List .of (DataStreamsPlugin .class , LocalStateCompositeXPackPlugin .class , Downsample .class , AggregateMetricMapperPlugin .class );
45
- }
46
-
47
35
@ Override
48
36
protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
49
37
Settings .Builder settings = Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings ));
@@ -66,8 +54,7 @@ public void testDownsampling() throws Exception {
66
54
)
67
55
.build ();
68
56
69
- DataStreamLifecycleDriver .setupTSDBDataStreamAndIngestDocs (
70
- client (),
57
+ setupTSDBDataStreamAndIngestDocs (
71
58
dataStreamName ,
72
59
"1986-01-08T23:40:53.384Z" ,
73
60
"2022-01-08T23:40:53.384Z" ,
@@ -76,7 +63,7 @@ public void testDownsampling() throws Exception {
76
63
"1990-09-09T18:00:00"
77
64
);
78
65
79
- List <String > backingIndices = getBackingIndices ( client (), dataStreamName );
66
+ List <String > backingIndices = getDataStreamBackingIndexNames ( dataStreamName );
80
67
String firstGenerationBackingIndex = backingIndices .get (0 );
81
68
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex ;
82
69
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex ;
@@ -93,7 +80,7 @@ public void testDownsampling() throws Exception {
93
80
});
94
81
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
95
82
// testing so DSL doesn't have to wait for the end_time to lapse)
96
- putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
83
+ putTSDBIndexTemplate (dataStreamName , null , null , lifecycle );
97
84
98
85
client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )).actionGet ();
99
86
@@ -109,7 +96,7 @@ public void testDownsampling() throws Exception {
109
96
}, 30 , TimeUnit .SECONDS );
110
97
111
98
assertBusy (() -> {
112
- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
99
+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
113
100
114
101
assertThat (dsBackingIndices .size (), is (2 ));
115
102
String writeIndex = dsBackingIndices .get (1 );
@@ -136,8 +123,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
136
123
)
137
124
)
138
125
.build ();
139
- DataStreamLifecycleDriver .setupTSDBDataStreamAndIngestDocs (
140
- client (),
126
+ setupTSDBDataStreamAndIngestDocs (
141
127
dataStreamName ,
142
128
"1986-01-08T23:40:53.384Z" ,
143
129
"2022-01-08T23:40:53.384Z" ,
@@ -146,7 +132,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
146
132
"1990-09-09T18:00:00"
147
133
);
148
134
149
- List <String > backingIndices = getBackingIndices ( client (), dataStreamName );
135
+ List <String > backingIndices = getDataStreamBackingIndexNames ( dataStreamName );
150
136
String firstGenerationBackingIndex = backingIndices .get (0 );
151
137
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex ;
152
138
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex ;
@@ -163,7 +149,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
163
149
});
164
150
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
165
151
// testing so DSL doesn't have to wait for the end_time to lapse)
166
- putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
152
+ putTSDBIndexTemplate (dataStreamName , null , null , lifecycle );
167
153
client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )).actionGet ();
168
154
169
155
assertBusy (() -> {
@@ -173,7 +159,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
173
159
}, 30 , TimeUnit .SECONDS );
174
160
175
161
assertBusy (() -> {
176
- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
162
+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
177
163
178
164
assertThat (dsBackingIndices .size (), is (2 ));
179
165
String writeIndex = dsBackingIndices .get (1 );
@@ -201,8 +187,7 @@ public void testUpdateDownsampleRound() throws Exception {
201
187
)
202
188
.build ();
203
189
204
- DataStreamLifecycleDriver .setupTSDBDataStreamAndIngestDocs (
205
- client (),
190
+ setupTSDBDataStreamAndIngestDocs (
206
191
dataStreamName ,
207
192
"1986-01-08T23:40:53.384Z" ,
208
193
"2022-01-08T23:40:53.384Z" ,
@@ -211,7 +196,7 @@ public void testUpdateDownsampleRound() throws Exception {
211
196
"1990-09-09T18:00:00"
212
197
);
213
198
214
- List <String > backingIndices = getBackingIndices ( client (), dataStreamName );
199
+ List <String > backingIndices = getDataStreamBackingIndexNames ( dataStreamName );
215
200
String firstGenerationBackingIndex = backingIndices .get (0 );
216
201
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex ;
217
202
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex ;
@@ -228,8 +213,8 @@ public void testUpdateDownsampleRound() throws Exception {
228
213
});
229
214
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
230
215
// testing so DSL doesn't have to wait for the end_time to lapse)
231
- putTSDBIndexTemplate (client (), dataStreamName , null , null , lifecycle );
232
- client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )). actionGet ( );
216
+ putTSDBIndexTemplate (dataStreamName , null , null , lifecycle );
217
+ safeGet ( client ().execute (RolloverAction .INSTANCE , new RolloverRequest (dataStreamName , null )));
233
218
234
219
assertBusy (() -> {
235
220
assertThat (witnessedDownsamplingIndices .size (), is (1 ));
@@ -238,7 +223,7 @@ public void testUpdateDownsampleRound() throws Exception {
238
223
}, 30 , TimeUnit .SECONDS );
239
224
240
225
assertBusy (() -> {
241
- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
226
+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
242
227
assertThat (dsBackingIndices .size (), is (2 ));
243
228
String writeIndex = dsBackingIndices .get (1 );
244
229
assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
@@ -247,22 +232,23 @@ public void testUpdateDownsampleRound() throws Exception {
247
232
248
233
// update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval
249
234
// the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous
250
- // `10s` interval downsample index, downsample it to `30s ` and replace it in the data stream instead of the `10s` one.
235
+ // `10s` interval downsample index, downsample it to `20m ` and replace it in the data stream instead of the `10s` one.
251
236
DataStreamLifecycle updatedLifecycle = DataStreamLifecycle .newBuilder ()
252
237
.downsampling (
253
238
new Downsampling (
254
239
List .of (new Downsampling .Round (TimeValue .timeValueMillis (10 ), new DownsampleConfig (new DateHistogramInterval ("20m" ))))
255
240
)
256
241
)
257
242
.build ();
258
-
259
- client ().execute (
260
- PutDataStreamLifecycleAction .INSTANCE ,
261
- new PutDataStreamLifecycleAction .Request (
262
- TEST_REQUEST_TIMEOUT ,
263
- TEST_REQUEST_TIMEOUT ,
264
- new String [] { dataStreamName },
265
- updatedLifecycle
243
+ assertAcked (
244
+ client ().execute (
245
+ PutDataStreamLifecycleAction .INSTANCE ,
246
+ new PutDataStreamLifecycleAction .Request (
247
+ TEST_REQUEST_TIMEOUT ,
248
+ TEST_REQUEST_TIMEOUT ,
249
+ new String [] { dataStreamName },
250
+ updatedLifecycle
251
+ )
266
252
)
267
253
);
268
254
@@ -271,7 +257,7 @@ public void testUpdateDownsampleRound() throws Exception {
271
257
assertBusy (() -> {
272
258
assertThat (indexExists (tenSecondsDownsampleIndex ), is (false ));
273
259
274
- List <String > dsBackingIndices = getBackingIndices ( client (), dataStreamName );
260
+ List <String > dsBackingIndices = getDataStreamBackingIndexNames ( dataStreamName );
275
261
assertThat (dsBackingIndices .size (), is (2 ));
276
262
String writeIndex = dsBackingIndices .get (1 );
277
263
assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
0 commit comments