2222import org .elasticsearch .index .IndexMode ;
2323import org .elasticsearch .index .IndexSettings ;
2424import org .elasticsearch .plugins .Plugin ;
25+ import org .elasticsearch .test .ESIntegTestCase ;
2526import org .elasticsearch .test .ESTestCase ;
2627import org .elasticsearch .xcontent .XContentBuilder ;
2728import org .elasticsearch .xcontent .XContentFactory ;
4344import java .util .stream .Collectors ;
4445
4546import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
47+ import static org .hamcrest .Matchers .allOf ;
4648import static org .hamcrest .Matchers .closeTo ;
4749import static org .hamcrest .Matchers .containsInAnyOrder ;
4850import static org .hamcrest .Matchers .equalTo ;
51+ import static org .hamcrest .Matchers .lessThan ;
52+ import static org .hamcrest .Matchers .lessThanOrEqualTo ;
53+ import static org .hamcrest .Matchers .not ;
4954
5055@ SuppressWarnings ("unchecked" )
56+ @ ESIntegTestCase .ClusterScope (maxNumDataNodes = 1 )
5157public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
5258
53- private static final Long NUM_DOCS = 4000L ;
59+ private static final Long NUM_DOCS = 2000L ;
60+ private static final Long TIME_RANGE_SECONDS = 3600L ;
5461 private static final String DATASTREAM_NAME = "tsit_ds" ;
5562 private List <XContentBuilder > documents = null ;
5663 private TSDataGenerationHelper dataGenerationHelper ;
@@ -193,21 +200,31 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
193200 return List .of (DataStreamsPlugin .class , LocalStateCompositeXPackPlugin .class , AggregateMetricMapperPlugin .class , EsqlPlugin .class );
194201 }
195202
196- static Double calculateRateAggregation (Collection <List <Tuple <String , Tuple <Instant , Integer >>>> allTimeseries , Agg agg ) {
197- List <Long > allRates = allTimeseries .stream ().map (timeseries -> {
203+ record RateRange (Double lower , Double upper ) implements Comparable <RateRange > {
204+ @ Override
205+ public int compareTo (RateRange o ) {
206+ return this .lower .compareTo (o .lower );
207+ }
208+ }
209+
210+ // A record that holds min, max, avg, count and sum of rates calculated from a timeseries.
211+ record RateStats (Long count , RateRange max , RateRange avg , RateRange min , RateRange sum ) {}
212+
213+ static RateStats calculateRateAggregation (Collection <List <Tuple <String , Tuple <Instant , Integer >>>> allTimeseries ) {
214+ List <RateRange > allRates = allTimeseries .stream ().map (timeseries -> {
198215 if (timeseries .size () < 2 ) {
199- return null ; // Not enough data points to calculate rate
216+ return null ;
200217 }
201218 // Sort the timeseries by timestamp
202219 timeseries .sort ((t1 , t2 ) -> t1 .v2 ().v1 ().compareTo (t2 .v2 ().v1 ()));
203220 var firstTs = timeseries .getFirst ().v2 ().v1 ();
204221 var lastTs = timeseries .getLast ().v2 ().v1 ();
205222 Integer lastValue = null ;
206- Long counterGrowth = 0L ;
223+ Double counterGrowth = 0.0 ;
207224 for (Tuple <String , Tuple <Instant , Integer >> point : timeseries ) {
208225 var currentValue = point .v2 ().v2 ();
209226 if (currentValue == null ) {
210- return null ; // Skip if the value is null
227+ throw new IllegalArgumentException ( "Null value in counter timeseries" );
211228 }
212229 if (lastValue == null ) {
213230 lastValue = point .v2 ().v2 (); // Initialize with the first value
@@ -221,19 +238,24 @@ static Double calculateRateAggregation(Collection<List<Tuple<String, Tuple<Insta
221238 }
222239 lastValue = currentValue ; // Update last value for next iteration
223240 }
224- return counterGrowth / (lastTs .toEpochMilli () - firstTs .toEpochMilli ()) * 1000 ; // Rate per second
241+ return new RateRange (
242+ counterGrowth / 60.0 , // TODO: do not hardcode time difference
243+ counterGrowth / (lastTs .toEpochMilli () / 1000 - firstTs .toEpochMilli () / 1000 )
244+ );
225245 }).filter (Objects ::nonNull ).toList ();
226- if (allRates .isEmpty () && agg != Agg . COUNT && agg != Agg . SUM ) {
227- return null ; // No rates to aggregate
246+ if (allRates .isEmpty ()) {
247+ return new RateStats ( 0L , null , null , null , new RateRange ( 0.0 , 0.0 ));
228248 }
229- return switch (agg ) {
230- // TODO: fix the orElse in the stream operations
231- case MAX -> allRates .stream ().mapToDouble (Long ::doubleValue ).max ().orElseThrow ();
232- case MIN -> allRates .stream ().mapToDouble (Long ::doubleValue ).min ().orElseThrow ();
233- case AVG -> allRates .stream ().mapToDouble (Long ::doubleValue ).average ().orElseThrow ();
234- case SUM -> allRates .stream ().mapToDouble (Long ::doubleValue ).sum ();
235- case COUNT -> (double ) allRates .size ();
236- };
249+ return new RateStats (
250+ (long ) allRates .size (),
251+ allRates .stream ().max (RateRange ::compareTo ).orElseThrow (),
252+ new RateRange (
253+ allRates .stream ().mapToDouble (r -> r .lower ).average ().orElseThrow (),
254+ allRates .stream ().mapToDouble (r -> r .upper ).average ().orElseThrow ()
255+ ),
256+ allRates .stream ().min (RateRange ::compareTo ).orElseThrow (),
257+ new RateRange (allRates .stream ().mapToDouble (r -> r .lower ).sum (), allRates .stream ().mapToDouble (r -> r .upper ).sum ())
258+ );
237259 }
238260
239261 void putTSDBIndexTemplate (List <String > patterns , @ Nullable String mappingString ) throws IOException {
@@ -257,7 +279,7 @@ void putTSDBIndexTemplate(List<String> patterns, @Nullable String mappingString)
257279
258280 @ Before
259281 public void populateIndex () throws IOException {
260- dataGenerationHelper = new TSDataGenerationHelper (NUM_DOCS );
282+ dataGenerationHelper = new TSDataGenerationHelper (NUM_DOCS , TIME_RANGE_SECONDS );
261283 final XContentBuilder builder = XContentFactory .jsonBuilder ();
262284 builder .map (dataGenerationHelper .mapping .raw ());
263285 final String jsonMappings = Strings .toString (builder );
@@ -269,17 +291,83 @@ public void populateIndex() throws IOException {
269291 if (documents == null ) {
270292 documents = new ArrayList <>();
271293 }
272- documents .add (document );
273- var indexRequest = client ().prepareIndex (DATASTREAM_NAME ).setOpType (DocWriteRequest .OpType .CREATE ).setSource (document );
274- indexRequest .setRefreshPolicy (org .elasticsearch .action .support .WriteRequest .RefreshPolicy .IMMEDIATE );
275- indexRequest .get ();
294+ try {
295+ var indexRequest = client ().prepareIndex (DATASTREAM_NAME ).setOpType (DocWriteRequest .OpType .CREATE ).setSource (document );
296+ indexRequest .setRefreshPolicy (org .elasticsearch .action .support .WriteRequest .RefreshPolicy .IMMEDIATE );
297+ indexRequest .get ();
298+ documents .add (document );
299+ } catch (Exception e ) {
300+ if (e .getMessage () != null && e .getMessage ().contains ("version conflict" )) {
301+ // Ignore version conflicts, they can happen with randomized data
302+ continue ;
303+ }
304+ throw e ;
305+ }
306+ }
307+ }
308+
309+ void checkWithin (Double actual , RateRange expected ) {
310+ if (expected == null ) {
311+ assertThat (actual , equalTo (null ));
312+ return ;
313+ }
314+ assertThat (actual , allOf (lessThanOrEqualTo (expected .upper ), not (lessThan (expected .lower ))));
315+ }
316+
317+ public void testRateGroupBySubset () {
318+ var dimensions = ESTestCase .randomNonEmptySubsetOf (dataGenerationHelper .attributesForMetrics );
319+ var dimensionsStr = dimensions .stream ().map (d -> "attributes." + d ).collect (Collectors .joining (", " ));
320+ try (var resp = run (String .format (Locale .ROOT , """
321+ TS %s
322+ | STATS count(rate(metrics.counter_hdd.bytes.read)),
323+ min(rate(metrics.counter_hdd.bytes.read)),
324+ max(rate(metrics.counter_hdd.bytes.read)),
325+ avg(rate(metrics.counter_hdd.bytes.read))
326+ BY tbucket=bucket(@timestamp, 1 minute), %s
327+ | SORT tbucket
328+ | LIMIT 1000
329+ """ , DATASTREAM_NAME , dimensionsStr ))) {
330+ List <List <Object >> rows = new ArrayList <>();
331+ resp .rows ().forEach (rowIter -> {
332+ List <Object > row = new ArrayList <>();
333+ rowIter .forEach (row ::add );
334+ rows .add (row );
335+ });
336+ List <String > failedWindows = new ArrayList <>();
337+ var groups = groupedRows (documents , dimensions , 60 );
338+ for (List <Object > row : rows ) {
339+ var rowKey = getRowKey (row , dimensions , 4 );
340+ var windowDataPoints = groups .get (rowKey );
341+ var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counter_hdd.bytes.read" );
342+ var rateAgg = calculateRateAggregation (docsPerTimeseries .values ());
343+ try {
344+ assertThat (row .getFirst (), equalTo (rateAgg .count ));
345+ checkWithin ((Double ) row .get (1 ), rateAgg .max );
346+ checkWithin ((Double ) row .get (2 ), rateAgg .avg );
347+ checkWithin ((Double ) row .get (3 ), rateAgg .min );
348+ } catch (AssertionError e ) {
349+ failedWindows .add ("Failed for row:\n " + row + "\n Wanted: " + rateAgg + "\n Exception: " + e .getMessage ());
350+ }
351+ }
352+ if (failedWindows .isEmpty () == false ) {
353+ var pctFailures = (double ) failedWindows .size () / rows .size () * 100 ;
354+ var failureDetails = String .join ("\n " , failedWindows );
355+ if (failureDetails .length () > 2000 ) {
356+ failureDetails = failureDetails .substring (0 , 2000 ) + "\n ... (truncated)" ;
357+ }
358+ throw new AssertionError ("Failed " + failedWindows .size () + " windows(" + pctFailures + "%):\n " + failureDetails );
359+ }
276360 }
277361 }
278362
279- public void testCounterRateGroupByNothing () {
363+ public void testRateGroupByNothing () {
364+ var groups = groupedRows (documents , List .of (), 60 );
280365 try (var resp = run (String .format (Locale .ROOT , """
281366 TS %s
282- | STATS count(rate(metrics.counter_hdd.bytes.read))
367+ | STATS count(rate(metrics.counter_hdd.bytes.read)),
368+ max(rate(metrics.counter_hdd.bytes.read)),
369+ avg(rate(metrics.counter_hdd.bytes.read)),
370+ min(rate(metrics.counter_hdd.bytes.read))
283371 BY tbucket=bucket(@timestamp, 1 minute)
284372 | SORT tbucket
285373 | LIMIT 1000
@@ -290,20 +378,28 @@ public void testCounterRateGroupByNothing() {
290378 rowIter .forEach (row ::add );
291379 rows .add (row );
292380 });
293- var groups = groupedRows ( documents , List . of (), 60 );
381+ List < String > failedWindows = new ArrayList <>( );
294382 for (List <Object > row : rows ) {
295- var windowStart = windowStart (row .get (1 ), 60 );
383+ var windowStart = windowStart (row .get (4 ), 60 );
296384 var windowDataPoints = groups .get (List .of (Long .toString (windowStart )));
297385 var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counter_hdd.bytes.read" );
298- var rate = calculateRateAggregation (docsPerTimeseries .values (), Agg .COUNT );
299- if (rate == null ) {
300- assertThat (row .getFirst (), equalTo (null ));
301- continue ;
302- } else if (rate == 0 ) {
303- assertThat (row .getFirst (), equalTo (0L ));
304- continue ; // No data points in the window
386+ var rateAgg = calculateRateAggregation (docsPerTimeseries .values ());
387+ try {
388+ assertThat (row .getFirst (), equalTo (rateAgg .count ));
389+ checkWithin ((Double ) row .get (1 ), rateAgg .max );
390+ checkWithin ((Double ) row .get (2 ), rateAgg .avg );
391+ checkWithin ((Double ) row .get (3 ), rateAgg .min );
392+ } catch (AssertionError e ) {
393+ failedWindows .add ("Failed for row:\n " + row + "\n Wanted: " + rateAgg + "\n Exception: " + e .getMessage ());
394+ }
395+ }
396+ if (failedWindows .isEmpty () == false ) {
397+ var pctFailures = (double ) failedWindows .size () / rows .size () * 100 ;
398+ var failureDetails = String .join ("\n " , failedWindows );
399+ if (failureDetails .length () > 2000 ) {
400+ failureDetails = failureDetails .substring (0 , 2000 ) + "\n ... (truncated)" ;
305401 }
306- assertThat ( row . getFirst (), equalTo ( rate . longValue ()) );
402+ throw new AssertionError ( "Failed " + failedWindows . size () + " windows(" + pctFailures + "%): \n " + failureDetails );
307403 }
308404 }
309405 }
@@ -325,14 +421,15 @@ public void testGroupBySubset() {
325421 min(min_over_time(metrics.gauge_hdd.bytes.used)),
326422 sum(count_over_time(metrics.gauge_hdd.bytes.used)),
327423 sum(sum_over_time(metrics.gauge_hdd.bytes.used)),
328- avg(avg_over_time(metrics.gauge_hdd.bytes.used))
424+ avg(avg_over_time(metrics.gauge_hdd.bytes.used)),
425+ count(count_over_time(metrics.gauge_hdd.bytes.used))
329426 BY tbucket=bucket(@timestamp, 1 minute), %s
330427 | SORT tbucket
331428 | LIMIT 1000""" , DATASTREAM_NAME , dimensionsStr ))) {
332429 var groups = groupedRows (documents , dimensions , 60 );
333430 List <List <Object >> rows = consumeRows (resp );
334431 for (List <Object > row : rows ) {
335- var rowKey = getRowKey (row , dimensions , 6 );
432+ var rowKey = getRowKey (row , dimensions , 7 );
336433 var tsGroups = groupByTimeseries (groups .get (rowKey ), "gauge_hdd.bytes.used" );
337434 var docValues = valuesInWindow (groups .get (rowKey ), "gauge_hdd.bytes.used" );
338435 if (row .get (0 ) instanceof List ) {
@@ -349,6 +446,7 @@ public void testGroupBySubset() {
349446 assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM ).longValue ()));
350447 var avg = aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
351448 assertThat ((Double ) row .get (5 ), closeTo (avg , avg * 0.01 ));
449+ // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
352450 }
353451 }
354452 }
@@ -368,14 +466,15 @@ public void testGroupByNothing() {
368466 min(min_over_time(metrics.gauge_hdd.bytes.used)),
369467 sum(count_over_time(metrics.gauge_hdd.bytes.used)),
370468 sum(sum_over_time(metrics.gauge_hdd.bytes.used)),
371- avg(avg_over_time(metrics.gauge_hdd.bytes.used))
469+ avg(avg_over_time(metrics.gauge_hdd.bytes.used)),
470+ count(count_over_time(metrics.gauge_hdd.bytes.used))
372471 BY tbucket=bucket(@timestamp, 1 minute)
373472 | SORT tbucket
374473 | LIMIT 1000""" , DATASTREAM_NAME ))) {
375474 List <List <Object >> rows = consumeRows (resp );
376475 var groups = groupedRows (documents , List .of (), 60 );
377476 for (List <Object > row : rows ) {
378- var windowStart = windowStart (row .get (6 ), 60 );
477+ var windowStart = windowStart (row .get (7 ), 60 );
379478 List <Integer > docValues = valuesInWindow (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd.bytes.used" );
380479 var tsGroups = groupByTimeseries (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd.bytes.used" );
381480 if (row .get (0 ) instanceof List ) {
@@ -392,6 +491,7 @@ public void testGroupByNothing() {
392491 assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM ).longValue ()));
393492 var avg = aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
394493 assertThat ((Double ) row .get (5 ), closeTo (avg , avg * 0.01 ));
494+ // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
395495 }
396496 }
397497 }
0 commit comments