Skip to content

Commit 6b2acad

Browse files
committed
All aggregations for rate, and random window sizes
1 parent 6c31b5e commit 6b2acad

File tree

1 file changed

+36
-15
lines changed

1 file changed

+36
-15
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,22 @@
5656
@SuppressWarnings("unchecked")
5757
@ESIntegTestCase.ClusterScope(maxNumDataNodes = 1)
5858
public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
59-
private static final Long NUM_DOCS = 2000L;
60-
private static final Long TIME_RANGE_SECONDS = 3600L;
59+
private static final Long NUM_DOCS = 500L;
60+
private static final Long TIME_RANGE_SECONDS = 900L;
6161
private static final String DATASTREAM_NAME = "tsit_ds";
6262
private static final Integer SECONDS_IN_WINDOW = 60;
63+
private static final List<Tuple<String, Integer>> WINDOW_OPTIONS = List.of(
64+
Tuple.tuple("10 seconds", 10),
65+
Tuple.tuple("30 seconds", 30),
66+
Tuple.tuple("1 minute", 60),
67+
Tuple.tuple("2 minutes", 120),
68+
Tuple.tuple("3 minutes", 180),
69+
Tuple.tuple("5 minutes", 300),
70+
Tuple.tuple("10 minutes", 600),
71+
Tuple.tuple("30 minutes", 1800),
72+
Tuple.tuple("1 hour", 3600)
73+
);
74+
6375
private List<XContentBuilder> documents;
6476
private TSDataGenerationHelper dataGenerationHelper;
6577

@@ -271,7 +283,7 @@ static RateStats calculateRateAggregation(
271283
);
272284
}).filter(Objects::nonNull).toList();
273285
if (allRates.isEmpty()) {
274-
return new RateStats(0L, null, null, null, new RateRange(0.0, 0.0));
286+
return new RateStats(0L, null, null, null, null);
275287
}
276288
return new RateStats(
277289
(long) allRates.size(),
@@ -353,31 +365,38 @@ void assertNoFailedWindows(List<String> failedWindows, List<List<Object>> rows)
353365
* the same values from the documents in the group.
354366
*/
355367
public void testRateGroupBySubset() {
356-
var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics);
357-
var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", "));
368+
var window = ESTestCase.randomFrom(WINDOW_OPTIONS);
369+
var windowSize = window.v2();
370+
var windowStr = window.v1();
371+
var dimensions = ESTestCase.randomSubsetOf(dataGenerationHelper.attributesForMetrics);
372+
var dimensionsStr = dimensions.isEmpty()
373+
? ""
374+
: ", " + dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", "));
358375
try (var resp = run(String.format(Locale.ROOT, """
359376
TS %s
360377
| STATS count(rate(metrics.counterl_hdd.bytes.read)),
361378
max(rate(metrics.counterl_hdd.bytes.read)),
362379
avg(rate(metrics.counterl_hdd.bytes.read)),
363-
min(rate(metrics.counterl_hdd.bytes.read))
364-
BY tbucket=bucket(@timestamp, 1 minute), %s
380+
min(rate(metrics.counterl_hdd.bytes.read)),
381+
sum(rate(metrics.counterl_hdd.bytes.read))
382+
BY tbucket=bucket(@timestamp, %s) %s
365383
| SORT tbucket
366384
| LIMIT 1000
367-
""", DATASTREAM_NAME, dimensionsStr))) {
385+
""", DATASTREAM_NAME, windowStr, dimensionsStr))) {
368386
List<List<Object>> rows = consumeRows(resp);
369387
List<String> failedWindows = new ArrayList<>();
370-
var groups = groupedRows(documents, dimensions, SECONDS_IN_WINDOW);
388+
var groups = groupedRows(documents, dimensions, windowSize);
371389
for (List<Object> row : rows) {
372-
var rowKey = getRowKey(row, dimensions, 4);
390+
var rowKey = getRowKey(row, dimensions, 5);
373391
var windowDataPoints = groups.get(rowKey);
374392
var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counterl_hdd.bytes.read");
375-
var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW);
393+
var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), windowSize);
376394
try {
377395
assertThat(row.getFirst(), equalTo(rateAgg.count));
378396
checkWithin((Double) row.get(1), rateAgg.max);
379397
checkWithin((Double) row.get(2), rateAgg.avg);
380398
checkWithin((Double) row.get(3), rateAgg.min);
399+
checkWithin((Double) row.get(4), rateAgg.sum);
381400
} catch (AssertionError e) {
382401
failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage());
383402
}
@@ -425,7 +444,9 @@ public void testRateGroupByNothing() {
425444
}
426445

427446
public void testGaugeGroupByRandomAndRandomAgg() {
428-
// TODO: randomize window size as well!
447+
var randomWindow = ESTestCase.randomFrom(WINDOW_OPTIONS);
448+
var windowSize = randomWindow.v2();
449+
var windowStr = randomWindow.v1();
429450
var dimensions = ESTestCase.randomSubsetOf(dataGenerationHelper.attributesForMetrics);
430451
var dimensionsStr = dimensions.isEmpty()
431452
? ""
@@ -445,11 +466,11 @@ public void testGaugeGroupByRandomAndRandomAgg() {
445466
| WHERE %s IS NOT NULL
446467
| STATS
447468
%s
448-
BY tbucket=bucket(@timestamp, 1 minute) %s
469+
BY tbucket=bucket(@timestamp, %s) %s
449470
| SORT tbucket
450-
| LIMIT 1000""", DATASTREAM_NAME, metricName, aggExpression, dimensionsStr);
471+
| LIMIT 1000""", DATASTREAM_NAME, metricName, aggExpression, windowStr, dimensionsStr);
451472
try (EsqlQueryResponse resp = run(query)) {
452-
var groups = groupedRows(documents, dimensions, 60);
473+
var groups = groupedRows(documents, dimensions, windowSize);
453474
List<List<Object>> rows = consumeRows(resp);
454475
for (List<Object> row : rows) {
455476
var rowKey = getRowKey(row, dimensions, 1);

0 commit comments

Comments
 (0)