4141import java .util .Locale ;
4242import java .util .Map ;
4343import java .util .Objects ;
44+ import java .util .function .Function ;
4445import java .util .stream .Collectors ;
4546
4647import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
@@ -124,7 +125,7 @@ static List<Integer> valuesInWindow(List<Map<String, Object>> pointsInGroup, Str
124125 return values ;
125126 }
126127
127- static Map <String , List <Tuple <String , Tuple <Instant , Integer >>>> groupByTimeseries (
128+ static Map <String , List <Tuple <String , Tuple <Instant , Double >>>> groupByTimeseries (
128129 List <Map <String , Object >> pointsInGroup ,
129130 String metricName
130131 ) {
@@ -136,19 +137,30 @@ static Map<String, List<Tuple<String, Tuple<Instant, Integer>>>> groupByTimeseri
136137 .map (entry -> entry .getKey () + ":" + entry .getValue ())
137138 .collect (Collectors .joining ("," ));
138139 var docTs = Instant .parse ((String ) doc .get ("@timestamp" ));
139- var docValue = (Integer ) ((Map <String , Object >) doc .get ("metrics" )).get (metricName );
140+ var docValue = switch (((Map <String , Object >) doc .get ("metrics" )).get (metricName )) {
141+ case Integer i -> i .doubleValue ();
142+ case Long l -> l .doubleValue ();
143+ case Float f -> f .doubleValue ();
144+ case Double d -> d ;
145+ default -> throw new IllegalStateException (
146+ "Unexpected value type: "
147+ + ((Map <String , Object >) doc .get ("metrics" )).get (metricName )
148+ + " of class "
149+ + ((Map <String , Object >) doc .get ("metrics" )).get (metricName ).getClass ()
150+ );
151+ };
140152 return new Tuple <>(docKey , new Tuple <>(docTs , docValue ));
141153 })
142154 .collect (Collectors .groupingBy (Tuple ::v1 ));
143155 }
144156
145157 static Object aggregatePerTimeseries (
146- Map <String , List <Tuple <String , Tuple <Instant , Integer >>>> timeseries ,
158+ Map <String , List <Tuple <String , Tuple <Instant , Double >>>> timeseries ,
147159 Agg crossAgg ,
148160 Agg timeseriesAgg
149161 ) {
150162 var res = timeseries .values ().stream ().map (timeseriesList -> {
151- List <Integer > values = timeseriesList .stream ().map (t -> t .v2 ().v2 ()).collect (Collectors .toList ());
163+ List <Double > values = timeseriesList .stream ().map (t -> t .v2 ().v2 ()).collect (Collectors .toList ());
152164 return aggregateValuesInWindow (values , timeseriesAgg );
153165 }).filter (Objects ::nonNull ).toList ();
154166
@@ -157,27 +169,20 @@ static Object aggregatePerTimeseries(
157169 }
158170
159171 return switch (crossAgg ) {
160- case MAX -> res .isEmpty ()
161- ? null
162- : Double .valueOf (res .stream ().mapToDouble (Double ::doubleValue ).max ().orElseThrow ()).longValue ();
163- case MIN -> res .isEmpty ()
164- ? null
165- : Double .valueOf (res .stream ().mapToDouble (Double ::doubleValue ).min ().orElseThrow ()).longValue ();
172+ case MAX -> res .isEmpty () ? null : res .stream ().mapToDouble (Double ::doubleValue ).max ().orElseThrow ();
173+ case MIN -> res .isEmpty () ? null : res .stream ().mapToDouble (Double ::doubleValue ).min ().orElseThrow ();
166174 case AVG -> res .isEmpty () ? null : res .stream ().mapToDouble (Double ::doubleValue ).average ().orElseThrow ();
167- case SUM -> res .isEmpty () ? null : Double . valueOf ( res .stream ().mapToDouble (Double ::doubleValue ).sum ()). longValue ();
175+ case SUM -> res .isEmpty () ? null : res .stream ().mapToDouble (Double ::doubleValue ).sum ();
168176 case COUNT -> Integer .toUnsignedLong (res .size ());
169177 };
170178 }
171179
172- static Double aggregateValuesInWindow (List <Integer > values , Agg agg ) {
173- // if (values.isEmpty()) {
174- // throw new IllegalArgumentException("No values to aggregate for " + agg + " operation");
175- // }
180+ static Double aggregateValuesInWindow (List <Double > values , Agg agg ) {
176181 return switch (agg ) {
177- case MAX -> Double . valueOf ( values .stream ().max (Integer ::compareTo ).orElseThrow () );
178- case MIN -> Double . valueOf ( values .stream ().min (Integer ::compareTo ).orElseThrow () );
179- case AVG -> values .stream ().mapToDouble (Integer ::doubleValue ).average ().orElseThrow ();
180- case SUM -> values .isEmpty () ? null : values .stream ().mapToDouble (Integer ::doubleValue ).sum ();
182+ case MAX -> values .stream ().max (Double ::compareTo ).orElseThrow ();
183+ case MIN -> values .stream ().min (Double ::compareTo ).orElseThrow ();
184+ case AVG -> values .stream ().mapToDouble (Double ::doubleValue ).average ().orElseThrow ();
185+ case SUM -> values .isEmpty () ? null : values .stream ().mapToDouble (Double ::doubleValue ).sum ();
181186 case COUNT -> (double ) values .size ();
182187 };
183188 }
@@ -230,7 +235,7 @@ public int compareToFindingMax(RateRange o) {
230235 record RateStats (Long count , RateRange max , RateRange avg , RateRange min , RateRange sum ) {}
231236
232237 static RateStats calculateRateAggregation (
233- Collection <List <Tuple <String , Tuple <Instant , Integer >>>> allTimeseries ,
238+ Collection <List <Tuple <String , Tuple <Instant , Double >>>> allTimeseries ,
234239 Integer secondsInWindow
235240 ) {
236241 List <RateRange > allRates = allTimeseries .stream ().map (timeseries -> {
@@ -241,9 +246,9 @@ static RateStats calculateRateAggregation(
241246 timeseries .sort ((t1 , t2 ) -> t1 .v2 ().v1 ().compareTo (t2 .v2 ().v1 ()));
242247 var firstTs = timeseries .getFirst ().v2 ().v1 ();
243248 var lastTs = timeseries .getLast ().v2 ().v1 ();
244- Integer lastValue = null ;
249+ Double lastValue = null ;
245250 Double counterGrowth = 0.0 ;
246- for (Tuple <String , Tuple <Instant , Integer >> point : timeseries ) {
251+ for (Tuple <String , Tuple <Instant , Double >> point : timeseries ) {
247252 var currentValue = point .v2 ().v2 ();
248253 if (currentValue == null ) {
249254 throw new IllegalArgumentException ("Null value in counter timeseries" );
@@ -352,10 +357,10 @@ public void testRateGroupBySubset() {
352357 var dimensionsStr = dimensions .stream ().map (d -> "attributes." + d ).collect (Collectors .joining (", " ));
353358 try (var resp = run (String .format (Locale .ROOT , """
354359 TS %s
355- | STATS count(rate(metrics.counter_hdd .bytes.read)),
356- max(rate(metrics.counter_hdd .bytes.read)),
357- avg(rate(metrics.counter_hdd .bytes.read)),
358- min(rate(metrics.counter_hdd .bytes.read))
360+ | STATS count(rate(metrics.counterl_hdd .bytes.read)),
361+ max(rate(metrics.counterl_hdd .bytes.read)),
362+ avg(rate(metrics.counterl_hdd .bytes.read)),
363+ min(rate(metrics.counterl_hdd .bytes.read))
359364 BY tbucket=bucket(@timestamp, 1 minute), %s
360365 | SORT tbucket
361366 | LIMIT 1000
@@ -366,7 +371,7 @@ public void testRateGroupBySubset() {
366371 for (List <Object > row : rows ) {
367372 var rowKey = getRowKey (row , dimensions , 4 );
368373 var windowDataPoints = groups .get (rowKey );
369- var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counter_hdd .bytes.read" );
374+ var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counterl_hdd .bytes.read" );
370375 var rateAgg = calculateRateAggregation (docsPerTimeseries .values (), SECONDS_IN_WINDOW );
371376 try {
372377 assertThat (row .getFirst (), equalTo (rateAgg .count ));
@@ -391,10 +396,10 @@ public void testRateGroupByNothing() {
391396 var groups = groupedRows (documents , List .of (), 60 );
392397 try (var resp = run (String .format (Locale .ROOT , """
393398 TS %s
394- | STATS count(rate(metrics.counter_hdd .bytes.read)),
395- max(rate(metrics.counter_hdd .bytes.read)),
396- avg(rate(metrics.counter_hdd .bytes.read)),
397- min(rate(metrics.counter_hdd .bytes.read))
399+ | STATS count(rate(metrics.counterl_hdd .bytes.read)),
400+ max(rate(metrics.counterl_hdd .bytes.read)),
401+ avg(rate(metrics.counterl_hdd .bytes.read)),
402+ min(rate(metrics.counterl_hdd .bytes.read))
398403 BY tbucket=bucket(@timestamp, 1 minute)
399404 | SORT tbucket
400405 | LIMIT 1000
@@ -404,7 +409,7 @@ public void testRateGroupByNothing() {
404409 for (List <Object > row : rows ) {
405410 var windowStart = windowStart (row .get (4 ), SECONDS_IN_WINDOW );
406411 var windowDataPoints = groups .get (List .of (Long .toString (windowStart )));
407- var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counter_hdd .bytes.read" );
412+ var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counterl_hdd .bytes.read" );
408413 var rateAgg = calculateRateAggregation (docsPerTimeseries .values (), SECONDS_IN_WINDOW );
409414 try {
410415 assertThat (row .getFirst (), equalTo (rateAgg .count ));
@@ -419,6 +424,70 @@ public void testRateGroupByNothing() {
419424 }
420425 }
421426
427+ public void testGaugeGroupByRandomAndRandomAgg () {
428+ // TODO: randomize window size as well!
429+ var dimensions = ESTestCase .randomSubsetOf (dataGenerationHelper .attributesForMetrics );
430+ var dimensionsStr = dimensions .isEmpty () ? "" : ", " + dimensions .stream ().map (d -> "attributes." + d ).collect (Collectors .joining (", " ));
431+ var aggs = Agg .values ();
432+ var metricName = ESTestCase .randomFrom (List .of ("gaugel_hdd.bytes.used" , "gauged_cpu.percent" ));
433+ var selectedAggs = ESTestCase .randomSubsetOf (2 , aggs );
434+ var aggExpression = String .format (
435+ "%s(%s_over_time(metrics.%s))" ,
436+ selectedAggs .get (0 ),
437+ selectedAggs .get (1 ).toString ().toLowerCase (),
438+ metricName
439+ );
440+ var query = String .format (Locale .ROOT , """
441+ TS %s
442+ | STATS
443+ %s
444+ BY tbucket=bucket(@timestamp, 1 minute) %s
445+ | SORT tbucket
446+ | LIMIT 1000""" , DATASTREAM_NAME , aggExpression , dimensionsStr );
447+ try (EsqlQueryResponse resp = run (query )) {
448+ var groups = groupedRows (documents , dimensions , 60 );
449+ List <List <Object >> rows = consumeRows (resp );
450+ for (List <Object > row : rows ) {
451+ var rowKey = getRowKey (row , dimensions , 1 );
452+ var tsGroups = groupByTimeseries (groups .get (rowKey ), metricName );
453+ Object expectedVal = aggregatePerTimeseries (tsGroups , selectedAggs .get (0 ), selectedAggs .get (1 ));
454+ Double actualVal = switch (row .get (0 )) {
455+ case Long l -> l .doubleValue ();
456+ case Double d -> d ;
457+ case null -> null ;
458+ default -> throw new IllegalStateException (
459+ "Unexpected value type: " + row .get (0 ) + " of class " + row .get (0 ).getClass ()
460+ );
461+ };
462+ try {
463+ switch (expectedVal ) {
464+ case Double dVal -> assertThat (actualVal , closeTo (dVal , dVal * 0.01 ));
465+ case Long lVal -> assertThat (actualVal , closeTo (lVal .doubleValue (), lVal * 0.01 ));
466+ case null -> assertThat (actualVal , equalTo (null ));
467+ default -> throw new IllegalStateException (
468+ "Unexpected value type: " + expectedVal + " of class " + expectedVal .getClass ()
469+ );
470+ }
471+ } catch (AssertionError e ) {
472+ throw new AssertionError (
473+ "Failed for aggregations:\n "
474+ + selectedAggs
475+ + " with total dimensions for grouping: " + dimensions .size ()
476+ + " on metric "
477+ + metricName
478+ + "\n Wanted val: "
479+ + expectedVal
480+ + "\n Got val: "
481+ + actualVal
482+ + "\n Exception: "
483+ + e .getMessage (),
484+ e
485+ );
486+ }
487+ }
488+ }
489+ }
490+
422491 /**
423492 * This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions.
424493 * The subset of dimensions is a random subset of the dimensions present in the data.
@@ -431,22 +500,22 @@ public void testGroupBySubset() {
431500 try (EsqlQueryResponse resp = run (String .format (Locale .ROOT , """
432501 TS %s
433502 | STATS
434- values(metrics.gauge_hdd .bytes.used),
435- max(max_over_time(metrics.gauge_hdd .bytes.used)),
436- min(min_over_time(metrics.gauge_hdd .bytes.used)),
437- sum(count_over_time(metrics.gauge_hdd .bytes.used)),
438- sum(sum_over_time(metrics.gauge_hdd .bytes.used)),
439- avg(avg_over_time(metrics.gauge_hdd .bytes.used)),
440- count(count_over_time(metrics.gauge_hdd .bytes.used))
503+ values(metrics.gaugel_hdd .bytes.used),
504+ max(max_over_time(metrics.gaugel_hdd .bytes.used)),
505+ min(min_over_time(metrics.gaugel_hdd .bytes.used)),
506+ sum(count_over_time(metrics.gaugel_hdd .bytes.used)),
507+ sum(sum_over_time(metrics.gaugel_hdd .bytes.used)),
508+ avg(avg_over_time(metrics.gaugel_hdd .bytes.used)),
509+ count(count_over_time(metrics.gaugel_hdd .bytes.used))
441510 BY tbucket=bucket(@timestamp, 1 minute), %s
442511 | SORT tbucket
443512 | LIMIT 1000""" , DATASTREAM_NAME , dimensionsStr ))) {
444513 var groups = groupedRows (documents , dimensions , 60 );
445514 List <List <Object >> rows = consumeRows (resp );
446515 for (List <Object > row : rows ) {
447516 var rowKey = getRowKey (row , dimensions , 7 );
448- var tsGroups = groupByTimeseries (groups .get (rowKey ), "gauge_hdd .bytes.used" );
449- var docValues = valuesInWindow (groups .get (rowKey ), "gauge_hdd .bytes.used" );
517+ var tsGroups = groupByTimeseries (groups .get (rowKey ), "gaugel_hdd .bytes.used" );
518+ var docValues = valuesInWindow (groups .get (rowKey ), "gaugel_hdd .bytes.used" );
450519 if (row .get (0 ) instanceof List ) {
451520 assertThat (
452521 (Collection <Long >) row .getFirst (),
@@ -476,22 +545,22 @@ public void testGroupByNothing() {
476545 try (EsqlQueryResponse resp = run (String .format (Locale .ROOT , """
477546 TS %s
478547 | STATS
479- values(metrics.gauge_hdd .bytes.used),
480- max(max_over_time(metrics.gauge_hdd .bytes.used)),
481- min(min_over_time(metrics.gauge_hdd .bytes.used)),
482- sum(count_over_time(metrics.gauge_hdd .bytes.used)),
483- sum(sum_over_time(metrics.gauge_hdd .bytes.used)),
484- avg(avg_over_time(metrics.gauge_hdd .bytes.used)),
485- count(count_over_time(metrics.gauge_hdd .bytes.used))
548+ values(metrics.gaugel_hdd .bytes.used),
549+ max(max_over_time(metrics.gaugel_hdd .bytes.used)),
550+ min(min_over_time(metrics.gaugel_hdd .bytes.used)),
551+ sum(count_over_time(metrics.gaugel_hdd .bytes.used)),
552+ sum(sum_over_time(metrics.gaugel_hdd .bytes.used)),
553+ avg(avg_over_time(metrics.gaugel_hdd .bytes.used)),
554+ count(count_over_time(metrics.gaugel_hdd .bytes.used))
486555 BY tbucket=bucket(@timestamp, 1 minute)
487556 | SORT tbucket
488557 | LIMIT 1000""" , DATASTREAM_NAME ))) {
489558 List <List <Object >> rows = consumeRows (resp );
490559 var groups = groupedRows (documents , List .of (), 60 );
491560 for (List <Object > row : rows ) {
492561 var windowStart = windowStart (row .get (7 ), 60 );
493- List <Integer > docValues = valuesInWindow (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd .bytes.used" );
494- var tsGroups = groupByTimeseries (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd .bytes.used" );
562+ List <Integer > docValues = valuesInWindow (groups .get (List .of (Long .toString (windowStart ))), "gaugel_hdd .bytes.used" );
563+ var tsGroups = groupByTimeseries (groups .get (List .of (Long .toString (windowStart ))), "gaugel_hdd .bytes.used" );
495564 if (row .get (0 ) instanceof List ) {
496565 assertThat (
497566 (Collection <Long >) row .get (0 ),
@@ -500,10 +569,16 @@ public void testGroupByNothing() {
500569 } else {
501570 assertThat (row .getFirst (), equalTo (docValues .isEmpty () ? null : docValues .getFirst ().longValue ()));
502571 }
503- assertThat (row .get (1 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MAX , Agg .MAX )));
504- assertThat (row .get (2 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MIN , Agg .MIN )));
505- assertThat (row .get (3 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .COUNT )));
506- assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM )));
572+ Function <Object , Double > toDouble = cell -> switch (cell ) {
573+ case Long l -> l .doubleValue ();
574+ case Double d -> d ;
575+ case null -> null ;
576+ default -> throw new IllegalStateException ("Unexpected value type: " + cell + " of class " + cell .getClass ());
577+ };
578+ assertThat (toDouble .apply (row .get (1 )), equalTo (aggregatePerTimeseries (tsGroups , Agg .MAX , Agg .MAX )));
579+ assertThat (toDouble .apply (row .get (2 )), equalTo (aggregatePerTimeseries (tsGroups , Agg .MIN , Agg .MIN )));
580+ assertThat (toDouble .apply (row .get (3 )), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .COUNT )));
581+ assertThat (toDouble .apply (row .get (4 )), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM )));
507582 var avg = (Double ) aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
508583 assertThat ((Double ) row .get (5 ), row .get (5 ) == null ? equalTo (null ) : closeTo (avg , avg * 0.01 ));
509584 // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
0 commit comments