Skip to content

Commit 8021aba

Browse files
committed
Implicitly use last_over_time for time-series aggregations
1 parent 66d1a8e commit 8021aba

File tree

12 files changed

+311
-556
lines changed

12 files changed

+311
-556
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-last-over-time.csv-spec

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,24 @@ clients:double | cluster:keyword | time_bucket:datetime
1717
357.0 | staging | 2024-05-10T00:03:00.000Z
1818
;
1919

20+
implicit_last_over_time_of_integer
21+
required_capability: metrics_command
22+
required_capability: implicit_last_over_time
23+
TS k8s | STATS clients = avg(network.eth0.currently_connected_clients) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT time_bucket, cluster | LIMIT 10;
24+
25+
clients:double | cluster:keyword | time_bucket:datetime
26+
429.0 | prod | 2024-05-10T00:00:00.000Z
27+
615.5 | staging | 2024-05-10T00:00:00.000Z
28+
396.5 | prod | 2024-05-10T00:01:00.000Z
29+
440.0 | qa | 2024-05-10T00:01:00.000Z
30+
632.5 | prod | 2024-05-10T00:02:00.000Z
31+
565.0 | qa | 2024-05-10T00:02:00.000Z
32+
205.0 | staging | 2024-05-10T00:02:00.000Z
33+
742.0 | prod | 2024-05-10T00:03:00.000Z
34+
454.0 | qa | 2024-05-10T00:03:00.000Z
35+
357.0 | staging | 2024-05-10T00:03:00.000Z
36+
;
37+
2038
last_over_time_of_long
2139
required_capability: metrics_command
2240
required_capability: last_over_time
@@ -35,6 +53,24 @@ bytes:double | cluster:keyword | time_bucket:datetime
3553
612.5 | staging | 2024-05-10T00:03:00.000Z
3654
;
3755

56+
implicit_last_over_time_of_long
57+
required_capability: metrics_command
58+
required_capability: implicit_last_over_time
59+
TS k8s | STATS bytes = avg(network.bytes_in) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT time_bucket, cluster | LIMIT 10;
60+
61+
bytes:double | cluster:keyword | time_bucket:datetime
62+
677.0 | prod | 2024-05-10T00:00:00.000Z
63+
586.0 | staging | 2024-05-10T00:00:00.000Z
64+
628.5 | prod | 2024-05-10T00:01:00.000Z
65+
538.5 | qa | 2024-05-10T00:01:00.000Z
66+
612.0 | prod | 2024-05-10T00:02:00.000Z
67+
749.0 | qa | 2024-05-10T00:02:00.000Z
68+
382.5 | staging | 2024-05-10T00:02:00.000Z
69+
970.0 | prod | 2024-05-10T00:03:00.000Z
70+
373.0 | qa | 2024-05-10T00:03:00.000Z
71+
612.5 | staging | 2024-05-10T00:03:00.000Z
72+
;
73+
3874
last_over_time_with_filtering
3975
required_capability: metrics_command
4076
required_capability: last_over_time
@@ -52,6 +88,25 @@ tx:long | cluster:keyword | time_bucket:datetime
5288
238 | staging | 2024-05-10T00:20:00.000Z
5389
;
5490

91+
92+
implicit_last_over_time_with_filtering
93+
required_capability: metrics_command
94+
required_capability: implicit_last_over_time
95+
TS k8s | WHERE pod == "one" | STATS tx = sum(network.bytes_in) BY cluster, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, cluster | LIMIT 10;
96+
97+
tx:long | cluster:keyword | time_bucket:datetime
98+
3 | prod | 2024-05-10T00:00:00.000Z
99+
830 | qa | 2024-05-10T00:00:00.000Z
100+
753 | staging | 2024-05-10T00:00:00.000Z
101+
542 | prod | 2024-05-10T00:10:00.000Z
102+
187 | qa | 2024-05-10T00:10:00.000Z
103+
4 | staging | 2024-05-10T00:10:00.000Z
104+
931 | prod | 2024-05-10T00:20:00.000Z
105+
206 | qa | 2024-05-10T00:20:00.000Z
106+
238 | staging | 2024-05-10T00:20:00.000Z
107+
;
108+
109+
55110
last_over_time_older_than_10d
56111
required_capability: metrics_command
57112
required_capability: last_over_time
@@ -65,6 +120,19 @@ cost:double | pod:keyword | time_bucket:datetime
65120
1038.0 | three | 2024-05-10T00:10:00.000Z
66121
;
67122

123+
implicit_last_over_time_older_than_10d
124+
required_capability: metrics_command
125+
required_capability: implicit_last_over_time
126+
TS k8s | WHERE cluster == "qa" AND @timestamp < now() - 10 day | STATS cost = avg(network.eth0.rx) BY pod, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, pod | LIMIT 5;
127+
128+
cost:double | pod:keyword | time_bucket:datetime
129+
818.0 | one | 2024-05-10T00:00:00.000Z
130+
529.0 | three | 2024-05-10T00:00:00.000Z
131+
620.0 | two | 2024-05-10T00:00:00.000Z
132+
1262.0 | one | 2024-05-10T00:10:00.000Z
133+
1038.0 | three | 2024-05-10T00:10:00.000Z
134+
;
135+
68136
eval_on_last_over_time
69137
required_capability: metrics_command
70138
required_capability: last_over_time
@@ -82,6 +150,23 @@ max_bytes:double | cluster:keyword | time_bucket:datetime | kb_minus_offset
82150
81.33333333333333 | staging | 2024-05-10T00:20:00.000Z | -0.01866666666666667
83151
;
84152

153+
implicit_eval_on_last_over_time
154+
required_capability: metrics_command
155+
required_capability: implicit_last_over_time
156+
TS k8s | STATS max_bytes = avg(network.bytes_in) BY cluster, time_bucket = bucket(@timestamp, 10minute) | EVAL kb_minus_offset = (max_bytes - 100) / 1000.0 | LIMIT 10 | SORT time_bucket, cluster ;
157+
158+
max_bytes:double | cluster:keyword | time_bucket:datetime | kb_minus_offset:double
159+
225.0 | prod | 2024-05-10T00:00:00.000Z | 0.125
160+
485.6666666666667 | qa | 2024-05-10T00:00:00.000Z | 0.3856666666666667
161+
572.6666666666666 | staging | 2024-05-10T00:00:00.000Z | 0.4726666666666666
162+
517.6666666666666 | prod | 2024-05-10T00:10:00.000Z | 0.41766666666666663
163+
426.6666666666667 | qa | 2024-05-10T00:10:00.000Z | 0.32666666666666666
164+
482.3333333333333 | staging | 2024-05-10T00:10:00.000Z | 0.3823333333333333
165+
839.0 | prod | 2024-05-10T00:20:00.000Z | 0.739
166+
697.0 | qa | 2024-05-10T00:20:00.000Z | 0.597
167+
81.33333333333333 | staging | 2024-05-10T00:20:00.000Z | -0.01866666666666667
168+
;
169+
85170
last_over_time_multi_values
86171
required_capability: metrics_command
87172
required_capability: last_over_time
@@ -101,6 +186,26 @@ events:long | pod:keyword | time_bucket:datetime
101186
9 | three | 2024-05-10T00:02:00.000Z
102187
;
103188

189+
190+
implicit_last_over_time_multi_values
191+
required_capability: metrics_command
192+
required_capability: implicit_last_over_time
193+
TS k8s | WHERE @timestamp < "2024-05-10T00:10:00.000Z" | STATS events = sum(events_received) by pod, time_bucket = bucket(@timestamp, 1minute) | SORT events desc, pod, time_bucket | LIMIT 10;
194+
195+
events:long | pod:keyword | time_bucket:datetime
196+
18 | one | 2024-05-10T00:01:00.000Z
197+
16 | one | 2024-05-10T00:08:00.000Z
198+
12 | one | 2024-05-10T00:03:00.000Z
199+
12 | three | 2024-05-10T00:00:00.000Z
200+
12 | two | 2024-05-10T00:09:00.000Z
201+
10 | three | 2024-05-10T00:06:00.000Z
202+
10 | two | 2024-05-10T00:02:00.000Z
203+
10 | two | 2024-05-10T00:04:00.000Z
204+
9 | one | 2024-05-10T00:09:00.000Z
205+
9 | three | 2024-05-10T00:02:00.000Z
206+
;
207+
208+
104209
last_over_time_null_values
105210
required_capability: metrics_command
106211
required_capability: last_over_time
@@ -120,6 +225,24 @@ null | two | 2024-05-10T00:13:00.000Z
120225
7 | three | 2024-05-10T00:12:00.000Z
121226
;
122227

228+
implicit_last_over_time_null_values
229+
required_capability: metrics_command
230+
required_capability: implicit_last_over_time
231+
TS k8s | WHERE @timestamp > "2024-05-10T00:10:00.000Z" and @timestamp < "2024-05-10T00:15:00.000Z" | STATS events = sum(events_received) by pod, time_bucket = bucket(@timestamp, 1minute) | SORT events desc, pod, time_bucket | LIMIT 10;
232+
233+
events:long | pod:keyword | time_bucket:datetime
234+
null | one | 2024-05-10T00:12:00.000Z
235+
null | two | 2024-05-10T00:13:00.000Z
236+
20 | two | 2024-05-10T00:14:00.000Z
237+
18 | two | 2024-05-10T00:12:00.000Z
238+
16 | one | 2024-05-10T00:13:00.000Z
239+
16 | one | 2024-05-10T00:14:00.000Z
240+
11 | one | 2024-05-10T00:10:00.000Z
241+
9 | one | 2024-05-10T00:11:00.000Z
242+
9 | three | 2024-05-10T00:13:00.000Z
243+
7 | three | 2024-05-10T00:12:00.000Z
244+
;
245+
123246
last_over_time_all_value_types
124247
required_capability: metrics_command
125248
required_capability: last_over_time
@@ -138,3 +261,21 @@ events:long | pod:keyword | time_bucket:datetime
138261
5 | two | 2024-05-10T00:20:00.000Z
139262
;
140263

264+
265+
implicit_last_over_time_all_value_types
266+
required_capability: metrics_command
267+
required_capability: implicit_last_over_time
268+
TS k8s | STATS events = sum(events_received) by pod, time_bucket = bucket(@timestamp, 10minute) | SORT events desc, pod, time_bucket | LIMIT 10 ;
269+
270+
events:long | pod:keyword | time_bucket:datetime
271+
21 | three | 2024-05-10T00:10:00.000Z
272+
20 | one | 2024-05-10T00:10:00.000Z
273+
15 | one | 2024-05-10T00:20:00.000Z
274+
15 | three | 2024-05-10T00:20:00.000Z
275+
13 | two | 2024-05-10T00:10:00.000Z
276+
12 | two | 2024-05-10T00:00:00.000Z
277+
9 | one | 2024-05-10T00:00:00.000Z
278+
9 | three | 2024-05-10T00:00:00.000Z
279+
5 | two | 2024-05-10T00:20:00.000Z
280+
;
281+

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,51 @@ max_cost: double
4949

5050
maxRateAndBytes
5151
required_capability: metrics_command
52+
required_capability: implicit_last_over_time
5253
TS k8s | STATS max(60 * rate(network.total_bytes_in)), max(network.bytes_in);
5354

5455
max(60 * rate(network.total_bytes_in)): double | max(network.bytes_in): long
55-
790.4235090751944 | 1021
56+
790.4235090751944 | 972
57+
;
58+
59+
maxRateAndBytesExplicit
60+
required_capability: metrics_command
61+
TS k8s | STATS max(60 * rate(network.total_bytes_in)), max(last_over_time(network.bytes_in));
62+
63+
max(60 * rate(network.total_bytes_in)): double | max(last_over_time(network.bytes_in)): long
64+
790.4235090751944 | 972
5665
;
5766

5867
maxRateAndMarkupBytes
5968
required_capability: metrics_command
6069
TS k8s | STATS max(rate(network.total_bytes_in)), max(network.bytes_in * 1.05);
6170

6271
max(rate(network.total_bytes_in)): double | max(network.bytes_in * 1.05): double
63-
13.17372515125324 | 1072.05
72+
13.17372515125324 | 1020.6
73+
;
74+
75+
maxRateAndMarkupBytesExplicit
76+
required_capability: metrics_command
77+
TS k8s | STATS max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in) * 1.05);
78+
79+
max(rate(network.total_bytes_in)): double | max_bytes_in: double
80+
13.17372515125324 | 1020.6
81+
;
82+
83+
maxRateAndLastBytesIn
84+
required_capability: metrics_command
85+
TS k8s | STATS max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in * 1.05));
86+
87+
max(rate(network.total_bytes_in)): double | max_bytes_in: double
88+
13.17372515125324 | 1020.6
6489
;
6590

6691
maxRateAndBytesAndCost
6792
required_capability: metrics_command
68-
TS k8s | STATS max(rate(network.total_bytes_in)), max(network.bytes_in), max(rate(network.total_cost));
93+
TS k8s | STATS max(rate(network.total_bytes_in)), max(max_over_time(network.bytes_in)), max(rate(network.total_cost));
6994

70-
max(rate(network.total_bytes_in)): double| max(network.bytes_in): long| max(rate(network.total_cost)): double
71-
13.17372515125324 | 1021 | 0.16151685393258428
95+
max(rate(network.total_bytes_in)): double| max(max_over_time(network.bytes_in)): long| max(rate(network.total_cost)): double
96+
13.17372515125324 | 1021 | 0.16151685393258428
7297
;
7398

7499
sumRate
@@ -144,15 +169,15 @@ max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:ke
144169

145170
BytesAndCostByBucketAndCluster
146171
required_capability: metrics_command
147-
TS k8s | STATS max(rate(network.total_bytes_in)), max(network.cost) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6;
148-
149-
max(rate(network.total_bytes_in)): double | max(network.cost): double | time_bucket:date | cluster: keyword
150-
6.980660660660663 | 10.75 | 2024-05-10T00:20:00.000Z | prod
151-
4.05 | 11.875 | 2024-05-10T00:20:00.000Z | qa
152-
3.19 | 9.5 | 2024-05-10T00:20:00.000Z | staging
153-
11.860805860805861 | 12.375 | 2024-05-10T00:15:00.000Z | prod
154-
23.702205882352942 | 12.125 | 2024-05-10T00:15:00.000Z | qa
155-
7.784911616161616 | 11.5 | 2024-05-10T00:15:00.000Z | staging
172+
TS k8s | STATS max(rate(network.total_bytes_in)), max(max_over_time(network.cost)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6;
173+
174+
max(rate(network.total_bytes_in)): double | max(max_over_time(network.cost)): double | time_bucket:date | cluster: keyword
175+
6.980660660660663 | 10.75 | 2024-05-10T00:20:00.000Z | prod
176+
4.05 | 11.875 | 2024-05-10T00:20:00.000Z | qa
177+
3.19 | 9.5 | 2024-05-10T00:20:00.000Z | staging
178+
11.860805860805861 | 12.375 | 2024-05-10T00:15:00.000Z | prod
179+
23.702205882352942 | 12.125 | 2024-05-10T00:15:00.000Z | qa
180+
7.784911616161616 | 11.5 | 2024-05-10T00:15:00.000Z | staging
156181
;
157182

158183
oneRateWithBucketAndClusterThenFilter

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4848
import static org.hamcrest.Matchers.allOf;
4949
import static org.hamcrest.Matchers.closeTo;
50-
import static org.hamcrest.Matchers.containsInAnyOrder;
5150
import static org.hamcrest.Matchers.equalTo;
5251
import static org.hamcrest.Matchers.lessThan;
5352
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -526,7 +525,6 @@ public void testGroupBySubset() {
526525
try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """
527526
TS %s
528527
| STATS
529-
values(metrics.gaugel_hdd.bytes.used),
530528
max(max_over_time(metrics.gaugel_hdd.bytes.used)),
531529
min(min_over_time(metrics.gaugel_hdd.bytes.used)),
532530
sum(count_over_time(metrics.gaugel_hdd.bytes.used)),
@@ -539,29 +537,20 @@ public void testGroupBySubset() {
539537
var groups = groupedRows(documents, dimensions, 60);
540538
List<List<Object>> rows = consumeRows(resp);
541539
for (List<Object> row : rows) {
542-
var rowKey = getRowKey(row, dimensions, 7);
540+
var rowKey = getRowKey(row, dimensions, 6);
543541
var tsGroups = groupByTimeseries(groups.get(rowKey), "gaugel_hdd.bytes.used");
544-
var docValues = valuesInWindow(groups.get(rowKey), "gaugel_hdd.bytes.used");
545-
if (row.get(0) instanceof List) {
546-
assertThat(
547-
(Collection<Long>) row.getFirst(),
548-
containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new))
549-
);
550-
} else {
551-
assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue()));
552-
}
553542
Function<Object, Double> toDouble = cell -> switch (cell) {
554543
case Long l -> l.doubleValue();
555544
case Double d -> d;
556545
case null -> null;
557546
default -> throw new IllegalStateException("Unexpected value type: " + cell + " of class " + cell.getClass());
558547
};
559-
assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX)));
560-
assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN)));
561-
assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT)));
562-
assertThat(toDouble.apply(row.get(4)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM)));
548+
assertThat(toDouble.apply(row.get(0)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX)));
549+
assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN)));
550+
assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT)));
551+
assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM)));
563552
var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG);
564-
assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01));
553+
assertThat((Double) row.get(4), row.get(4) == null ? equalTo(null) : closeTo(avg, avg * 0.01));
565554
// assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
566555
}
567556
}
@@ -577,7 +566,6 @@ public void testGroupByNothing() {
577566
try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """
578567
TS %s
579568
| STATS
580-
values(metrics.gaugel_hdd.bytes.used),
581569
max(max_over_time(metrics.gaugel_hdd.bytes.used)),
582570
min(min_over_time(metrics.gaugel_hdd.bytes.used)),
583571
sum(count_over_time(metrics.gaugel_hdd.bytes.used)),
@@ -590,29 +578,20 @@ public void testGroupByNothing() {
590578
List<List<Object>> rows = consumeRows(resp);
591579
var groups = groupedRows(documents, List.of(), 60);
592580
for (List<Object> row : rows) {
593-
var windowStart = windowStart(row.get(7), 60);
594-
List<Integer> docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gaugel_hdd.bytes.used");
581+
var windowStart = windowStart(row.get(6), 60);
595582
var tsGroups = groupByTimeseries(groups.get(List.of(Long.toString(windowStart))), "gaugel_hdd.bytes.used");
596-
if (row.get(0) instanceof List) {
597-
assertThat(
598-
(Collection<Long>) row.get(0),
599-
containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new))
600-
);
601-
} else {
602-
assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue()));
603-
}
604583
Function<Object, Double> toDouble = cell -> switch (cell) {
605584
case Long l -> l.doubleValue();
606585
case Double d -> d;
607586
case null -> null;
608587
default -> throw new IllegalStateException("Unexpected value type: " + cell + " of class " + cell.getClass());
609588
};
610-
assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX)));
611-
assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN)));
612-
assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT)));
613-
assertThat(toDouble.apply(row.get(4)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM)));
589+
assertThat(toDouble.apply(row.get(0)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX)));
590+
assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN)));
591+
assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT)));
592+
assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM)));
614593
var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG);
615-
assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01));
594+
assertThat((Double) row.get(4), row.get(4) == null ? equalTo(null) : closeTo(avg, avg * 0.01));
616595
// assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
617596
}
618597
}

0 commit comments

Comments
 (0)