2222import org .elasticsearch .index .IndexMode ;
2323import org .elasticsearch .index .IndexSettings ;
2424import org .elasticsearch .plugins .Plugin ;
25+ import org .elasticsearch .test .ESIntegTestCase ;
2526import org .elasticsearch .test .ESTestCase ;
2627import org .elasticsearch .xcontent .XContentBuilder ;
2728import org .elasticsearch .xcontent .XContentFactory ;
4344import java .util .stream .Collectors ;
4445
4546import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
47+ import static org .hamcrest .Matchers .allOf ;
4648import static org .hamcrest .Matchers .closeTo ;
4749import static org .hamcrest .Matchers .containsInAnyOrder ;
4850import static org .hamcrest .Matchers .equalTo ;
51+ import static org .hamcrest .Matchers .lessThan ;
52+ import static org .hamcrest .Matchers .lessThanOrEqualTo ;
53+ import static org .hamcrest .Matchers .not ;
4954
5055@ SuppressWarnings ("unchecked" )
56+ @ ESIntegTestCase .ClusterScope (maxNumDataNodes = 1 )
5157public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
5258
53- private static final Long NUM_DOCS = 4000L ;
59+ private static final Long NUM_DOCS = 2000L ;
60+ private static final Long TIME_RANGE_SECONDS = 3600L ;
5461 private static final String DATASTREAM_NAME = "tsit_ds" ;
5562 private List <XContentBuilder > documents = null ;
5663 private TSDataGenerationHelper dataGenerationHelper ;
@@ -193,21 +200,31 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
193200 return List .of (DataStreamsPlugin .class , LocalStateCompositeXPackPlugin .class , AggregateMetricMapperPlugin .class , EsqlPlugin .class );
194201 }
195202
196- static Double calculateRateAggregation (Collection <List <Tuple <String , Tuple <Instant , Integer >>>> allTimeseries , Agg agg ) {
197- List <Long > allRates = allTimeseries .stream ().map (timeseries -> {
203+ record RateRange (Double lower , Double upper ) implements Comparable <RateRange > {
204+ @ Override
205+ public int compareTo (RateRange o ) {
206+ return this .lower .compareTo (o .lower );
207+ }
208+ }
209+
210+ // A record that holds min, max, avg, count and sum of rates calculated from a timeseries.
211+ record RateStats (Long count , RateRange max , RateRange avg , RateRange min , RateRange sum ) {}
212+
213+ static RateStats calculateRateAggregation (Collection <List <Tuple <String , Tuple <Instant , Integer >>>> allTimeseries ) {
214+ List <RateRange > allRates = allTimeseries .stream ().map (timeseries -> {
198215 if (timeseries .size () < 2 ) {
199- return null ; // Not enough data points to calculate rate
216+ return null ;
200217 }
201218 // Sort the timeseries by timestamp
202219 timeseries .sort ((t1 , t2 ) -> t1 .v2 ().v1 ().compareTo (t2 .v2 ().v1 ()));
203220 var firstTs = timeseries .getFirst ().v2 ().v1 ();
204221 var lastTs = timeseries .getLast ().v2 ().v1 ();
205222 Integer lastValue = null ;
206- Long counterGrowth = 0L ;
223+ Double counterGrowth = 0.0 ;
207224 for (Tuple <String , Tuple <Instant , Integer >> point : timeseries ) {
208225 var currentValue = point .v2 ().v2 ();
209226 if (currentValue == null ) {
210- return null ; // Skip if the value is null
227+ throw new IllegalArgumentException ( "Null value in counter timeseries" );
211228 }
212229 if (lastValue == null ) {
213230 lastValue = point .v2 ().v2 (); // Initialize with the first value
@@ -221,19 +238,24 @@ static Double calculateRateAggregation(Collection<List<Tuple<String, Tuple<Insta
221238 }
222239 lastValue = currentValue ; // Update last value for next iteration
223240 }
224- return counterGrowth / (lastTs .toEpochMilli () - firstTs .toEpochMilli ()) * 1000 ; // Rate per second
241+ return new RateRange (
242+ counterGrowth / 60.0 , // TODO: do not hardcode time difference
243+ counterGrowth / (lastTs .toEpochMilli () / 1000 - firstTs .toEpochMilli () / 1000 )
244+ );
225245 }).filter (Objects ::nonNull ).toList ();
226- if (allRates .isEmpty () && agg != Agg . COUNT && agg != Agg . SUM ) {
227- return null ; // No rates to aggregate
246+ if (allRates .isEmpty ()) {
247+ return new RateStats ( 0L , null , null , null , new RateRange ( 0.0 , 0.0 ));
228248 }
229- return switch (agg ) {
230- // TODO: fix the orElse in the stream operations
231- case MAX -> allRates .stream ().mapToDouble (Long ::doubleValue ).max ().orElseThrow ();
232- case MIN -> allRates .stream ().mapToDouble (Long ::doubleValue ).min ().orElseThrow ();
233- case AVG -> allRates .stream ().mapToDouble (Long ::doubleValue ).average ().orElseThrow ();
234- case SUM -> allRates .stream ().mapToDouble (Long ::doubleValue ).sum ();
235- case COUNT -> (double ) allRates .size ();
236- };
249+ return new RateStats (
250+ (long ) allRates .size (),
251+ allRates .stream ().max (RateRange ::compareTo ).orElseThrow (),
252+ new RateRange (
253+ allRates .stream ().mapToDouble (r -> r .lower ).average ().orElseThrow (),
254+ allRates .stream ().mapToDouble (r -> r .upper ).average ().orElseThrow ()
255+ ),
256+ allRates .stream ().min (RateRange ::compareTo ).orElseThrow (),
257+ new RateRange (allRates .stream ().mapToDouble (r -> r .lower ).sum (), allRates .stream ().mapToDouble (r -> r .upper ).sum ())
258+ );
237259 }
238260
239261 void putTSDBIndexTemplate (List <String > patterns , @ Nullable String mappingString ) throws IOException {
@@ -257,7 +279,7 @@ void putTSDBIndexTemplate(List<String> patterns, @Nullable String mappingString)
257279
258280 @ Before
259281 public void populateIndex () throws IOException {
260- dataGenerationHelper = new TSDataGenerationHelper (NUM_DOCS );
282+ dataGenerationHelper = new TSDataGenerationHelper (NUM_DOCS , TIME_RANGE_SECONDS );
261283 final XContentBuilder builder = XContentFactory .jsonBuilder ();
262284 builder .map (dataGenerationHelper .mapping .raw ());
263285 final String jsonMappings = Strings .toString (builder );
@@ -276,10 +298,68 @@ public void populateIndex() throws IOException {
276298 }
277299 }
278300
279- public void testCounterRateGroupByNothing () {
301+ void checkWithin (Double actual , RateRange expected ) {
302+ if (expected == null ) {
303+ assertThat (actual , equalTo (null ));
304+ return ;
305+ }
306+ assertThat (actual , allOf (lessThanOrEqualTo (expected .upper ), not (lessThan (expected .lower ))));
307+ }
308+
309+ public void testRateGroupBySubset () {
310+ var dimensions = ESTestCase .randomNonEmptySubsetOf (dataGenerationHelper .attributesForMetrics );
311+ var dimensionsStr = dimensions .stream ().map (d -> "attributes." + d ).collect (Collectors .joining (", " ));
280312 try (var resp = run (String .format (Locale .ROOT , """
281313 TS %s
282- | STATS count(rate(metrics.counter_hdd.bytes.read))
314+ | STATS count(rate(metrics.counter_hdd.bytes.read)),
315+ min(rate(metrics.counter_hdd.bytes.read)),
316+ max(rate(metrics.counter_hdd.bytes.read)),
317+ avg(rate(metrics.counter_hdd.bytes.read))
318+ BY tbucket=bucket(@timestamp, 1 minute), %s
319+ | SORT tbucket
320+ | LIMIT 1000
321+ """ , DATASTREAM_NAME , dimensionsStr ))) {
322+ List <List <Object >> rows = new ArrayList <>();
323+ resp .rows ().forEach (rowIter -> {
324+ List <Object > row = new ArrayList <>();
325+ rowIter .forEach (row ::add );
326+ rows .add (row );
327+ });
328+ List <String > failedWindows = new ArrayList <>();
329+ var groups = groupedRows (documents , dimensions , 60 );
330+ for (List <Object > row : rows ) {
331+ var rowKey = getRowKey (row , dimensions , 4 );
332+ var windowDataPoints = groups .get (rowKey );
333+ var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counter_hdd.bytes.read" );
334+ var rateAgg = calculateRateAggregation (docsPerTimeseries .values ());
335+ try {
336+ assertThat (row .getFirst (), equalTo (rateAgg .count ));
337+ checkWithin ((Double ) row .get (1 ), rateAgg .max );
338+ checkWithin ((Double ) row .get (2 ), rateAgg .avg );
339+ checkWithin ((Double ) row .get (3 ), rateAgg .min );
340+ } catch (AssertionError e ) {
341+ failedWindows .add ("Failed for row:\n " + row + "\n Wanted: " + rateAgg + "\n Exception: " + e .getMessage ());
342+ }
343+ }
344+ if (failedWindows .isEmpty () == false ) {
345+ var pctFailures = (double ) failedWindows .size () / rows .size () * 100 ;
346+ var failureDetails = String .join ("\n " , failedWindows );
347+ if (failureDetails .length () > 2000 ) {
348+ failureDetails = failureDetails .substring (0 , 2000 ) + "\n ... (truncated)" ;
349+ }
350+ throw new AssertionError ("Failed " + failedWindows .size () + " windows(" + pctFailures + "%):\n " + failureDetails );
351+ }
352+ }
353+ }
354+
355+ public void testRateGroupByNothing () {
356+ var groups = groupedRows (documents , List .of (), 60 );
357+ try (var resp = run (String .format (Locale .ROOT , """
358+ TS %s
359+ | STATS count(rate(metrics.counter_hdd.bytes.read)),
360+ max(rate(metrics.counter_hdd.bytes.read)),
361+ avg(rate(metrics.counter_hdd.bytes.read)),
362+ min(rate(metrics.counter_hdd.bytes.read))
283363 BY tbucket=bucket(@timestamp, 1 minute)
284364 | SORT tbucket
285365 | LIMIT 1000
@@ -290,20 +370,28 @@ public void testCounterRateGroupByNothing() {
290370 rowIter .forEach (row ::add );
291371 rows .add (row );
292372 });
293- var groups = groupedRows ( documents , List . of (), 60 );
373+ List < String > failedWindows = new ArrayList <>( );
294374 for (List <Object > row : rows ) {
295- var windowStart = windowStart (row .get (1 ), 60 );
375+ var windowStart = windowStart (row .get (4 ), 60 );
296376 var windowDataPoints = groups .get (List .of (Long .toString (windowStart )));
297377 var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counter_hdd.bytes.read" );
298- var rate = calculateRateAggregation (docsPerTimeseries .values (), Agg .COUNT );
299- if (rate == null ) {
300- assertThat (row .getFirst (), equalTo (null ));
301- continue ;
302- } else if (rate == 0 ) {
303- assertThat (row .getFirst (), equalTo (0L ));
304- continue ; // No data points in the window
378+ var rateAgg = calculateRateAggregation (docsPerTimeseries .values ());
379+ try {
380+ assertThat (row .getFirst (), equalTo (rateAgg .count ));
381+ checkWithin ((Double ) row .get (1 ), rateAgg .max );
382+ checkWithin ((Double ) row .get (2 ), rateAgg .avg );
383+ checkWithin ((Double ) row .get (3 ), rateAgg .min );
384+ } catch (AssertionError e ) {
385+ failedWindows .add ("Failed for row:\n " + row + "\n Wanted: " + rateAgg + "\n Exception: " + e .getMessage ());
386+ }
387+ }
388+ if (failedWindows .isEmpty () == false ) {
389+ var pctFailures = (double ) failedWindows .size () / rows .size () * 100 ;
390+ var failureDetails = String .join ("\n " , failedWindows );
391+ if (failureDetails .length () > 2000 ) {
392+ failureDetails = failureDetails .substring (0 , 2000 ) + "\n ... (truncated)" ;
305393 }
306- assertThat ( row . getFirst (), equalTo ( rate . longValue ()) );
394+ throw new AssertionError ( "Failed " + failedWindows . size () + " windows(" + pctFailures + "%): \n " + failureDetails );
307395 }
308396 }
309397 }
@@ -325,14 +413,15 @@ public void testGroupBySubset() {
325413 min(min_over_time(metrics.gauge_hdd.bytes.used)),
326414 sum(count_over_time(metrics.gauge_hdd.bytes.used)),
327415 sum(sum_over_time(metrics.gauge_hdd.bytes.used)),
328- avg(avg_over_time(metrics.gauge_hdd.bytes.used))
416+ avg(avg_over_time(metrics.gauge_hdd.bytes.used)),
417+ count(count_over_time(metrics.gauge_hdd.bytes.used))
329418 BY tbucket=bucket(@timestamp, 1 minute), %s
330419 | SORT tbucket
331420 | LIMIT 1000""" , DATASTREAM_NAME , dimensionsStr ))) {
332421 var groups = groupedRows (documents , dimensions , 60 );
333422 List <List <Object >> rows = consumeRows (resp );
334423 for (List <Object > row : rows ) {
335- var rowKey = getRowKey (row , dimensions , 6 );
424+ var rowKey = getRowKey (row , dimensions , 7 );
336425 var tsGroups = groupByTimeseries (groups .get (rowKey ), "gauge_hdd.bytes.used" );
337426 var docValues = valuesInWindow (groups .get (rowKey ), "gauge_hdd.bytes.used" );
338427 if (row .get (0 ) instanceof List ) {
@@ -349,6 +438,7 @@ public void testGroupBySubset() {
349438 assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM ).longValue ()));
350439 var avg = aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
351440 assertThat ((Double ) row .get (5 ), closeTo (avg , avg * 0.01 ));
441+ // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
352442 }
353443 }
354444 }
@@ -368,14 +458,15 @@ public void testGroupByNothing() {
368458 min(min_over_time(metrics.gauge_hdd.bytes.used)),
369459 sum(count_over_time(metrics.gauge_hdd.bytes.used)),
370460 sum(sum_over_time(metrics.gauge_hdd.bytes.used)),
371- avg(avg_over_time(metrics.gauge_hdd.bytes.used))
461+ avg(avg_over_time(metrics.gauge_hdd.bytes.used)),
462+ count(count_over_time(metrics.gauge_hdd.bytes.used))
372463 BY tbucket=bucket(@timestamp, 1 minute)
373464 | SORT tbucket
374465 | LIMIT 1000""" , DATASTREAM_NAME ))) {
375466 List <List <Object >> rows = consumeRows (resp );
376467 var groups = groupedRows (documents , List .of (), 60 );
377468 for (List <Object > row : rows ) {
378- var windowStart = windowStart (row .get (6 ), 60 );
469+ var windowStart = windowStart (row .get (7 ), 60 );
379470 List <Integer > docValues = valuesInWindow (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd.bytes.used" );
380471 var tsGroups = groupByTimeseries (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd.bytes.used" );
381472 if (row .get (0 ) instanceof List ) {
@@ -392,6 +483,7 @@ public void testGroupByNothing() {
392483 assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM ).longValue ()));
393484 var avg = aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
394485 assertThat ((Double ) row .get (5 ), closeTo (avg , avg * 0.01 ));
486+ // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
395487 }
396488 }
397489 }
0 commit comments