Skip to content

Commit 3b77702

Browse files
committed
Merge remote-tracking branch 'origin/main' into repositories-service-project-aware
2 parents cbf06b7 + d3049e0 commit 3b77702

File tree

4 files changed

+78
-22
lines changed

4 files changed

+78
-22
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-mappings.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
"type": "long",
3131
"time_series_metric": "counter"
3232
},
33+
"total_bytes_out": {
34+
"type": "long",
35+
"time_series_metric": "counter"
36+
},
3337
"cost": {
3438
"type": "double"
3539
},

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,3 +336,17 @@ distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:dateti
336336
2 |2 | staging | 2024-05-10T00:18:00.000Z
337337

338338
;
339+
340+
341+
two_rates
342+
required_capability: metrics_command
343+
344+
TS k8s | STATS cost_per_mb=max(rate(network.total_bytes_in) / 1024 * 1024 * rate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp,5minute) | SORT cost_per_mb DESC, cluster, time_bucket DESC | LIMIT 5;
345+
346+
cost_per_mb:double | cluster:keyword | time_bucket:datetime
347+
5.119502189662629 | qa | 2024-05-10T00:15:00.000Z
348+
4.1135056380088795 | qa | 2024-05-10T00:05:00.000Z
349+
2.0974277092655393 | qa | 2024-05-10T00:10:00.000Z
350+
2.071474095190272 | prod | 2024-05-10T00:15:00.000Z
351+
1.59556462585034 | staging | 2024-05-10T00:10:00.000Z
352+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -51,25 +51,25 @@
5151
* becomes
5252
*
5353
* TS k8s
54-
* | STATS rate(request) BY _tsid
55-
* | STATS max(`rate(request)`)
54+
* | STATS rate_$1 = rate(request) BY _tsid
55+
* | STATS max(rate_$1)
5656
*
5757
* TS k8s | STATS max(rate(request)) BY host
5858
*
5959
* becomes
6060
*
6161
* TS k8s
62-
* | STATS rate(request), VALUES(host) BY _tsid
63-
* | STATS max(`rate(request)`) BY host=`VALUES(host)`
62+
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
63+
* | STATS max(rate_$1) BY host=`VALUES(host)`
6464
*
6565
* TS k8s | STATS avg(rate(request)) BY host
6666
*
6767
* becomes
6868
*
6969
* TS k8s
70-
* | STATS rate(request), VALUES(host) BY _tsid
71-
* | STATS sum=sum(`rate(request)`), count(`rate(request)`) BY host=`VALUES(host)`
72-
* | EVAL `avg(rate(request))` = `sum(rate(request))` / `count(rate(request))`
70+
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
71+
* | STATS sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`
72+
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
7373
* | KEEP `avg(rate(request))`, host
7474
*
7575
* TS k8s | STATS avg(rate(request)) BY host, bucket(@timestamp, 1minute)
@@ -78,9 +78,9 @@
7878
*
7979
* TS k8s
8080
* | EVAL `bucket(@timestamp, 1minute)`=datetrunc(@timestamp, 1minute)
81-
* | STATS rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
82-
* | STATS sum=sum(`rate(request)`), count(`rate(request)`) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
83-
* | EVAL `avg(rate(request))` = `sum(rate(request))` / `count(rate(request))`
81+
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
82+
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
83+
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
8484
* | KEEP `avg(rate(request))`, host, `bucket(@timestamp, 1minute)`
8585
* </pre>
8686
*
@@ -93,16 +93,16 @@
9393
* TS k8s | STATS max(rate(request)), max(memory_used) becomes:
9494
*
9595
* TS k8s
96-
* | STATS rate(request), $p1=to_partial(max(memory_used)) BY _tsid
97-
* | STATS max(`rate(request)`), `max(memory_used)` = from_partial($p1, max($_))
96+
* | STATS rate_$1=rate(request), $p1=to_partial(max(memory_used)) BY _tsid
97+
* | STATS max(rate_$1), `max(memory_used)` = from_partial($p1, max($_))
9898
*
9999
* TS k8s | STATS max(rate(request)) avg(memory_used) BY host
100100
*
101101
* becomes
102102
*
103103
* TS k8s
104-
* | STATS rate(request), $p1=to_partial(sum(memory_used)), $p2=to_partial(count(memory_used)), VALUES(host) BY _tsid
105-
* | STATS max(`rate(request)`), $sum=from_partial($p1, sum($_)), $count=from_partial($p2, count($_)) BY host=`VALUES(host)`
104+
* | STATS rate_$1=rate(request), $p1=to_partial(sum(memory_used)), $p2=to_partial(count(memory_used)), VALUES(host) BY _tsid
105+
* | STATS max(rate_$1), $sum=from_partial($p1, sum($_)), $count=from_partial($p2, count($_)) BY host=`VALUES(host)`
106106
* | EVAL `avg(memory_used)` = $sum / $count
107107
* | KEEP `max(rate(request))`, `avg(memory_used)`, host
108108
*
@@ -112,9 +112,9 @@
112112
*
113113
* TS k8s
114114
* | EVAL `bucket(@timestamp, 5m)` = datetrunc(@timestamp, '5m')
115-
* | STATS rate(request), $p1=to_partial(min(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
116-
* | STATS sum(`rate(request)`), `min(memory_used)` = from_partial($p1, min($)) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
117-
* | KEEP `min(memory_used)`, `sum(rate(request))`, pod, `bucket(@timestamp, 5m)`
115+
* | STATS rate_$1=rate(request), $p1=to_partial(min(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
116+
* | STATS sum(rate_$1), `min(memory_used)` = from_partial($p1, min($)) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
117+
* | KEEP `min(memory_used)`, `sum(rate_$1)`, pod, `bucket(@timestamp, 5m)`
118118
*
119119
* {agg}_over_time time-series aggregation will be rewritten in the similar way
120120
*
@@ -123,18 +123,25 @@
123123
* becomes
124124
*
125125
* FROM k8s
126-
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
127-
* | STATS sum(max_memory_usage) BY host_values, time_bucket
126+
* | STATS max_over_time_$1 = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
127+
* | STATS sum(max_over_time_$1) BY host_values, time_bucket
128128
*
129129
*
130130
* TS k8s | STATS sum(avg_over_time(memory_usage)) BY host, bucket(@timestamp, 1minute)
131131
*
132132
* becomes
133133
*
134134
* FROM k8s
135-
* | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
136-
* | STATS sum(avg_memory_usage) BY host_values, time_bucket
135+
* | STATS avg_over_time_$1 = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
136+
* | STATS sum(avg_over_time_$1) BY host_values, time_bucket
137137
*
138+
* TS k8s | STATS max(rate(post_requests) + rate(get_requests)) BY host, bucket(@timestamp, 1minute)
139+
*
140+
* becomes
141+
*
142+
* FROM k8s
143+
* | STATS rate_$1=rate(post_requests), rate_$2=rate(post_requests) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
144+
* | STATS max(rate_$1 + rate_$2) BY host_values, time_bucket
138145
* </pre>
139146
*/
140147
public final class TranslateTimeSeriesAggregate extends OptimizerRules.OptimizerRule<Aggregate> {
@@ -157,6 +164,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
157164
List<NamedExpression> firstPassAggs = new ArrayList<>();
158165
List<NamedExpression> secondPassAggs = new ArrayList<>();
159166
Holder<Boolean> hasRateAggregates = new Holder<>(Boolean.FALSE);
167+
var internalNames = new InternalNames();
160168
for (NamedExpression agg : aggregate.aggregates()) {
161169
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
162170
Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
@@ -167,7 +175,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
167175
}
168176
AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
169177
Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
170-
Alias firstStageAlias = new Alias(tsAgg.source(), agg.name(), firstStageFn);
178+
Alias firstStageAlias = new Alias(tsAgg.source(), internalNames.next(tsAgg.functionName()), firstStageFn);
171179
firstPassAggs.add(firstStageAlias);
172180
return firstStageAlias;
173181
});
@@ -269,4 +277,13 @@ private static List<? extends NamedExpression> mergeExpressions(
269277
groupings.forEach(g -> merged.add(Expressions.attribute(g)));
270278
return merged;
271279
}
280+
281+
private static class InternalNames {
282+
final Map<String, Integer> next = new HashMap<>();
283+
284+
String next(String prefix) {
285+
int id = next.merge(prefix, 1, Integer::sum);
286+
return prefix + "_$" + id;
287+
}
288+
}
272289
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6900,6 +6900,27 @@ public void testTranslateMetricsGroupedByTimeBucketAndDimensions() {
69006900
assertThat(Expressions.attribute(clusterValues.field()).name(), equalTo("cluster"));
69016901
}
69026902

6903+
public void testTranslateSumOfTwoRates() {
6904+
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
6905+
var query = """
6906+
TS k8s
6907+
| STATS max(rate(network.total_bytes_in) + rate(network.total_bytes_out)) BY pod, bucket(@timestamp, 5 minute), cluster
6908+
| SORT cluster
6909+
| LIMIT 10
6910+
""";
6911+
var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query)));
6912+
TopN topN = as(plan, TopN.class);
6913+
Aggregate finalAgg = as(topN.child(), Aggregate.class);
6914+
Eval eval = as(finalAgg.child(), Eval.class);
6915+
assertThat(eval.fields(), hasSize(1));
6916+
Add sum = as(Alias.unwrap(eval.fields().get(0)), Add.class);
6917+
assertThat(Expressions.name(sum.left()), equalTo("RATE_$1"));
6918+
assertThat(Expressions.name(sum.right()), equalTo("RATE_$2"));
6919+
TimeSeriesAggregate aggsByTsid = as(eval.child(), TimeSeriesAggregate.class);
6920+
assertThat(Expressions.name(aggsByTsid.aggregates().get(0)), equalTo("RATE_$1"));
6921+
assertThat(Expressions.name(aggsByTsid.aggregates().get(1)), equalTo("RATE_$2"));
6922+
}
6923+
69036924
public void testTranslateMixedAggsGroupedByTimeBucketAndDimensions() {
69046925
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
69056926
var query = """

0 commit comments

Comments
 (0)