4343import java .util .stream .Collectors ;
4444
4545import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
46+ import static org .hamcrest .Matchers .closeTo ;
4647import static org .hamcrest .Matchers .containsInAnyOrder ;
4748import static org .hamcrest .Matchers .equalTo ;
4849
4950@ SuppressWarnings ("unchecked" )
5051public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
5152
52- private static final Long NUM_DOCS = 2000L ;
53+ private static final Long NUM_DOCS = 4000L ;
5354 private static final String DATASTREAM_NAME = "tsit_ds" ;
5455 private List <XContentBuilder > documents = null ;
5556 private TSDataGenerationHelper dataGenerationHelper ;
@@ -103,7 +104,8 @@ enum Agg {
103104 MAX ,
104105 MIN ,
105106 AVG ,
106- SUM
107+ SUM ,
108+ COUNT
107109 }
108110
109111 static List <Integer > valuesInWindow (List <Map <String , Object >> pointsInGroup , String metricName ) {
@@ -115,6 +117,46 @@ static List<Integer> valuesInWindow(List<Map<String, Object>> pointsInGroup, Str
115117 return values ;
116118 }
117119
120+ static Map <String , List <Tuple <String , Tuple <Instant , Integer >>>> groupByTimeseries (
121+ List <Map <String , Object >> pointsInGroup ,
122+ String metricName
123+ ) {
124+ return pointsInGroup .stream ()
125+ .filter (doc -> doc .containsKey ("metrics" ) && ((Map <String , Object >) doc .get ("metrics" )).containsKey (metricName ))
126+ .map (doc -> {
127+ String docKey = ((Map <String , Object >) doc .get ("attributes" )).entrySet ()
128+ .stream ()
129+ .map (entry -> entry .getKey () + ":" + entry .getValue ())
130+ .collect (Collectors .joining ("," ));
131+ var docTs = Instant .parse ((String ) doc .get ("@timestamp" ));
132+ var docValue = (Integer ) ((Map <String , Object >) doc .get ("metrics" )).get (metricName );
133+ return new Tuple <>(docKey , new Tuple <>(docTs , docValue ));
134+ })
135+ .collect (Collectors .groupingBy (Tuple ::v1 ));
136+ }
137+
138+ static Double aggregatePerTimeseries (
139+ Map <String , List <Tuple <String , Tuple <Instant , Integer >>>> timeseries ,
140+ Agg crossAgg ,
141+ Agg timeseriesAgg
142+ ) {
143+ var res = timeseries .values ().stream ().map (timeseriesList -> {
144+ List <Integer > values = timeseriesList .stream ().map (t -> t .v2 ().v2 ()).collect (Collectors .toList ());
145+ return aggregateValuesInWindow (values , timeseriesAgg );
146+ }).filter (Objects ::nonNull ).toList ();
147+
148+ if (res .isEmpty ()) {
149+ return null ; // No values to aggregate
150+ }
151+ return switch (crossAgg ) {
152+ case MAX -> res .stream ().mapToDouble (Double ::doubleValue ).max ().orElseThrow ();
153+ case MIN -> res .stream ().mapToDouble (Double ::doubleValue ).min ().orElseThrow ();
154+ case AVG -> res .stream ().mapToDouble (Double ::doubleValue ).average ().orElseThrow ();
155+ case SUM -> res .stream ().mapToDouble (Double ::doubleValue ).sum ();
156+ case COUNT -> (double ) res .size ();
157+ };
158+ }
159+
118160 static Double aggregateValuesInWindow (List <Integer > values , Agg agg ) {
119161 if (values .isEmpty ()) {
120162 throw new IllegalArgumentException ("No values to aggregate for " + agg + " operation" );
@@ -124,6 +166,7 @@ static Double aggregateValuesInWindow(List<Integer> values, Agg agg) {
124166 case MIN -> Double .valueOf (values .stream ().min (Integer ::compareTo ).orElseThrow ());
125167 case AVG -> values .stream ().mapToDouble (Integer ::doubleValue ).average ().orElseThrow ();
126168 case SUM -> values .stream ().mapToDouble (Integer ::doubleValue ).sum ();
169+ case COUNT -> (double ) values .size ();
127170 };
128171 }
129172
@@ -150,6 +193,49 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
150193 return List .of (DataStreamsPlugin .class , LocalStateCompositeXPackPlugin .class , AggregateMetricMapperPlugin .class , EsqlPlugin .class );
151194 }
152195
196+ static Double calculateRateAggregation (Collection <List <Tuple <String , Tuple <Instant , Integer >>>> allTimeseries , Agg agg ) {
197+ List <Long > allRates = allTimeseries .stream ().map (timeseries -> {
198+ if (timeseries .size () < 2 ) {
199+ return null ; // Not enough data points to calculate rate
200+ }
201+ // Sort the timeseries by timestamp
202+ timeseries .sort ((t1 , t2 ) -> t1 .v2 ().v1 ().compareTo (t2 .v2 ().v1 ()));
203+ var firstTs = timeseries .getFirst ().v2 ().v1 ();
204+ var lastTs = timeseries .getLast ().v2 ().v1 ();
205+ Integer lastValue = null ;
206+ Long counterGrowth = 0L ;
207+ for (Tuple <String , Tuple <Instant , Integer >> point : timeseries ) {
208+ var currentValue = point .v2 ().v2 ();
209+ if (currentValue == null ) {
210+ return null ; // Skip if the value is null
211+ }
212+ if (lastValue == null ) {
213+ lastValue = point .v2 ().v2 (); // Initialize with the first value
214+ continue ;
215+ }
216+ if (currentValue > lastValue ) {
217+ counterGrowth += currentValue - lastValue ; // Incremental growth
218+ } else if (currentValue < lastValue ) {
219+ // If the value decreased, we assume a reset and start counting from the current value
220+ counterGrowth += currentValue ;
221+ }
222+ lastValue = currentValue ; // Update last value for next iteration
223+ }
224+ return counterGrowth / (lastTs .toEpochMilli () - firstTs .toEpochMilli ()) * 1000 ; // Rate per second
225+ }).filter (Objects ::nonNull ).toList ();
226+ if (allRates .isEmpty () && agg != Agg .COUNT && agg != Agg .SUM ) {
227+ return null ; // No rates to aggregate
228+ }
229+ return switch (agg ) {
230+ // TODO: fix the orElse in the stream operations
231+ case MAX -> allRates .stream ().mapToDouble (Long ::doubleValue ).max ().orElseThrow ();
232+ case MIN -> allRates .stream ().mapToDouble (Long ::doubleValue ).min ().orElseThrow ();
233+ case AVG -> allRates .stream ().mapToDouble (Long ::doubleValue ).average ().orElseThrow ();
234+ case SUM -> allRates .stream ().mapToDouble (Long ::doubleValue ).sum ();
235+ case COUNT -> (double ) allRates .size ();
236+ };
237+ }
238+
153239 void putTSDBIndexTemplate (List <String > patterns , @ Nullable String mappingString ) throws IOException {
154240 Settings .Builder settingsBuilder = Settings .builder ();
155241 // Ensure it will be a TSDB data stream
@@ -190,6 +276,38 @@ public void populateIndex() throws IOException {
190276 }
191277 }
192278
279+ public void testCounterRateGroupByNothing () {
280+ try (var resp = run (String .format (Locale .ROOT , """
281+ TS %s
282+ | STATS count(rate(metrics.counter_hdd.bytes.read))
283+ BY tbucket=bucket(@timestamp, 1 minute)
284+ | SORT tbucket
285+ | LIMIT 1000
286+ """ , DATASTREAM_NAME ))) {
287+ List <List <Object >> rows = new ArrayList <>();
288+ resp .rows ().forEach (rowIter -> {
289+ List <Object > row = new ArrayList <>();
290+ rowIter .forEach (row ::add );
291+ rows .add (row );
292+ });
293+ var groups = groupedRows (documents , List .of (), 60 );
294+ for (List <Object > row : rows ) {
295+ var windowStart = windowStart (row .get (1 ), 60 );
296+ var windowDataPoints = groups .get (List .of (Long .toString (windowStart )));
297+ var docsPerTimeseries = groupByTimeseries (windowDataPoints , "counter_hdd.bytes.read" );
298+ var rate = calculateRateAggregation (docsPerTimeseries .values (), Agg .COUNT );
299+ if (rate == null ) {
300+ assertThat (row .getFirst (), equalTo (null ));
301+ continue ;
302+ } else if (rate == 0 ) {
303+ assertThat (row .getFirst (), equalTo (0L ));
304+ continue ; // No data points in the window
305+ }
306+ assertThat (row .getFirst (), equalTo (rate .longValue ()));
307+ }
308+ }
309+ }
310+
193311 /**
194312 * This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions.
195313 * The subset of dimensions is a random subset of the dimensions present in the data.
@@ -215,21 +333,22 @@ public void testGroupBySubset() {
215333 List <List <Object >> rows = consumeRows (resp );
216334 for (List <Object > row : rows ) {
217335 var rowKey = getRowKey (row , dimensions , 6 );
336+ var tsGroups = groupByTimeseries (groups .get (rowKey ), "gauge_hdd.bytes.used" );
218337 var docValues = valuesInWindow (groups .get (rowKey ), "gauge_hdd.bytes.used" );
219338 if (row .get (0 ) instanceof List ) {
220339 assertThat (
221- (Collection <Long >) row .get ( 0 ),
340+ (Collection <Long >) row .getFirst ( ),
222341 containsInAnyOrder (docValues .stream ().mapToLong (Integer ::longValue ).boxed ().toArray (Long []::new ))
223342 );
224343 } else {
225- assertThat (row .get ( 0 ), equalTo (docValues .getFirst ().longValue ()));
344+ assertThat (row .getFirst ( ), equalTo (docValues .getFirst ().longValue ()));
226345 }
227- assertThat (row .get (1 ), equalTo (Math . round ( aggregateValuesInWindow ( docValues , Agg .MAX ) )));
228- assertThat (row .get (2 ), equalTo (Math . round ( aggregateValuesInWindow ( docValues , Agg .MIN ) )));
229- assertThat (row .get (3 ), equalTo (( long ) docValues . size ()));
230- assertThat (row .get (4 ), equalTo (aggregateValuesInWindow ( docValues , Agg .SUM ).longValue ()));
231- // TODO: fix then enable
232- // assertThat(row.get(5), equalTo(aggregateValuesInWindow(docValues, Agg.SUM) / (double) docValues.size() ));
346+ assertThat (row .get (1 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg .MAX , Agg . MAX ). longValue ( )));
347+ assertThat (row .get (2 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg .MIN , Agg . MIN ). longValue ( )));
348+ assertThat (row .get (3 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg . SUM , Agg . COUNT ). longValue ()));
349+ assertThat (row .get (4 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg . SUM , Agg .SUM ).longValue ()));
350+ var avg = aggregatePerTimeseries ( tsGroups , Agg . AVG , Agg . AVG );
351+ assertThat (( Double ) row .get (5 ), closeTo ( avg , avg * 0.01 ));
233352 }
234353 }
235354 }
@@ -258,6 +377,7 @@ public void testGroupByNothing() {
258377 for (List <Object > row : rows ) {
259378 var windowStart = windowStart (row .get (6 ), 60 );
260379 List <Integer > docValues = valuesInWindow (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd.bytes.used" );
380+ var tsGroups = groupByTimeseries (groups .get (List .of (Long .toString (windowStart ))), "gauge_hdd.bytes.used" );
261381 if (row .get (0 ) instanceof List ) {
262382 assertThat (
263383 (Collection <Long >) row .get (0 ),
@@ -266,12 +386,12 @@ public void testGroupByNothing() {
266386 } else {
267387 assertThat (row .get (0 ), equalTo (docValues .getFirst ().longValue ()));
268388 }
269- assertThat (row .get (1 ), equalTo (Math . round ( aggregateValuesInWindow ( docValues , Agg .MAX ) )));
270- assertThat (row .get (2 ), equalTo (Math . round ( aggregateValuesInWindow ( docValues , Agg .MIN ) )));
271- assertThat (row .get (3 ), equalTo (( long ) docValues . size ()));
272- assertThat (row .get (4 ), equalTo (aggregateValuesInWindow ( docValues , Agg .SUM ).longValue ()));
273- // TODO: fix then enable
274- // assertThat(row.get(5), equalTo(aggregateValuesInWindow(docValues, Agg.SUM) / (double) docValues.size() ));
389+ assertThat (row .get (1 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg .MAX , Agg . MAX ). longValue ( )));
390+ assertThat (row .get (2 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg .MIN , Agg . MIN ). longValue ( )));
391+ assertThat (row .get (3 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg . SUM , Agg . COUNT ). longValue ()));
392+ assertThat (row .get (4 ), equalTo (aggregatePerTimeseries ( tsGroups , Agg . SUM , Agg .SUM ).longValue ()));
393+ var avg = aggregatePerTimeseries ( tsGroups , Agg . AVG , Agg . AVG );
394+ assertThat (( Double ) row .get (5 ), closeTo ( avg , avg * 0.01 ));
275395 }
276396 }
277397 }
0 commit comments