Skip to content

Commit 19fc7ff

Browse files
committed
add some basic tests for the TS translation rule
1 parent 493ffad commit 19fc7ff

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
9+
10+
import org.elasticsearch.index.IndexMode;
11+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
12+
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
13+
import org.elasticsearch.xpack.esql.analysis.Analyzer;
14+
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
15+
import org.elasticsearch.xpack.esql.core.type.EsField;
16+
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
17+
import org.elasticsearch.xpack.esql.index.EsIndex;
18+
import org.elasticsearch.xpack.esql.index.IndexResolution;
19+
import org.elasticsearch.xpack.esql.optimizer.AbstractLogicalPlanOptimizerTests;
20+
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
21+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
22+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
23+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
24+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
25+
import org.elasticsearch.xpack.esql.plan.logical.Project;
26+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
27+
import org.elasticsearch.xpack.esql.plan.logical.TopN;
28+
import org.junit.BeforeClass;
29+
30+
import java.util.Map;
31+
32+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
33+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
34+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;
35+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
36+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
37+
38+
public class TranslateTimeSeriesAggregateTests extends AbstractLogicalPlanOptimizerTests {
39+
40+
private static Map<String, EsField> mappingK8s;
41+
private static Analyzer k8sAnalyzer;
42+
43+
@BeforeClass
44+
public static void initK8s() {
45+
// Load Time Series mappings for these tests
46+
mappingK8s = loadMapping("k8s-mappings.json");
47+
EsIndex k8sIndex = new EsIndex("k8s", mappingK8s, Map.of("k8s", IndexMode.TIME_SERIES));
48+
IndexResolution getIndexResult = IndexResolution.valid(k8sIndex);
49+
k8sAnalyzer = new Analyzer(
50+
new AnalyzerContext(
51+
EsqlTestUtils.TEST_CFG,
52+
new EsqlFunctionRegistry(),
53+
getIndexResult,
54+
defaultLookupResolution(),
55+
enrichResolution,
56+
emptyInferenceResolution()
57+
),
58+
TEST_VERIFIER
59+
);
60+
}
61+
62+
protected LogicalPlan planK8s(String query) {
63+
LogicalPlan analyzed = k8sAnalyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG));
64+
LogicalPlan optimized = logicalOptimizer.optimize(analyzed);
65+
return optimized;
66+
}
67+
68+
/**
69+
* Test that {@link TranslateTimeSeriesAggregate} correctly splits up a two stage aggregation with a time bucket.
70+
*
71+
* Expected plan:
72+
* <pre>{@code
73+
*Limit[10[INTEGER],false]
74+
* \_Aggregate[[time_bucket{r}#4],[COUNT(MAXOVERTIME_$1{r}#33,true[BOOLEAN]) AS count(max_over_time(network.cost))#6, time_bucket{r}#4]]
75+
* \_TimeSeriesAggregate[[_tsid{m}#34, time_bucket{r}#4],[MAX(network.cost{f}#24,true[BOOLEAN]) AS MAXOVERTIME_$1#33,
76+
* time_bucket{r}#4 AS time_bucket#4],BUCKET(@timestamp{f}#8,PT1M[TIME_DURATION])]
77+
* \_Eval[[BUCKET(@timestamp{f}#8,PT1M[TIME_DURATION]) AS time_bucket#4]]
78+
* \_EsRelation[k8s][@timestamp{f}#8, client.ip{f}#12, cluster{f}#9, eve..]
79+
* }</pre>
80+
*/
81+
public void testMaxOverTime() {
82+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
83+
LogicalPlan plan = planK8s("""
84+
TS k8s
85+
| STATS count(max_over_time(network.cost)) BY time_bucket = BUCKET(@timestamp, 1 minute)
86+
| LIMIT 10
87+
""");
88+
Limit limit = as(plan, Limit.class);
89+
Aggregate innerStats = as(limit.child(), Aggregate.class);
90+
TimeSeriesAggregate outerStats = as(innerStats.child(), TimeSeriesAggregate.class);
91+
// TODO: Add asserts about the specific aggregation details here
92+
Eval eval = as(outerStats.child(), Eval.class);
93+
EsRelation relation = as(eval.child(), EsRelation.class);
94+
}
95+
96+
public void testMaxOfRate() {
97+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
98+
LogicalPlan plan = planK8s("""
99+
TS k8s
100+
| STATS max(rate(network.total_bytes_in)) BY time_bucket = BUCKET(@timestamp, 1 minute)
101+
| LIMIT 10
102+
""");
103+
Limit limit = as(plan, Limit.class);
104+
Aggregate innerStats = as(limit.child(), Aggregate.class);
105+
TimeSeriesAggregate outerStats = as(innerStats.child(), TimeSeriesAggregate.class);
106+
// TODO: Add asserts about the specific aggregation details here
107+
Eval eval = as(outerStats.child(), Eval.class);
108+
EsRelation relation = as(eval.child(), EsRelation.class);
109+
}
110+
111+
/**
112+
* <pre>{@code
113+
* Project[[avg_cost{r}#8, cluster{f}#14, time_bucket{r}#5]]
114+
* \_TopN[[Order[avg_cost{r}#8,DESC,FIRST], Order[time_bucket{r}#5,DESC,FIRST], Order[cluster{f}#14,ASC,LAST]],10[INTEGER]]
115+
* \_Eval[[$$SUM$avg_cost$0{r$}#38 / $$COUNT$avg_cost$1{r$}#39 AS avg_cost#8]]
116+
* \_Aggregate[[cluster{r}#14, time_bucket{r}#5],[SUM(AVGOVERTIME_$1{r}#41,true[BOOLEAN],compensated[KEYWORD])
117+
* AS $$SUM$avg_cost$0#38, COUNT(AVGOVERTIME_$1{r}#41,true[BOOLEAN]) AS $$COUNT$avg_cost$1#39,
118+
* cluster{r}#14 AS cluster#14, time_bucket{r}#5 AS time_bucket#5]]
119+
* \_Eval[[$$SUM$AVGOVERTIME_$1$0{r$}#42 / $$COUNT$AVGOVERTIME_$1$1{r$}#43 AS AVGOVERTIME_$1#41]]
120+
* \_TimeSeriesAggregate[[_tsid{m}#40, time_bucket{r}#5],[SUM(network.cost{f}#29,true[BOOLEAN],lossy[KEYWORD])
121+
* AS $$SUM$AVGOVERTIME_$1$0#42, COUNT(network.cost{f}#29,true[BOOLEAN]) AS $$COUNT$AVGOVERTIME_$1$1#43,
122+
* VALUES(cluster{f}#14,true[BOOLEAN]) AS cluster#14, time_bucket{r}#5],
123+
* BUCKET(@timestamp{f}#13,PT1M[TIME_DURATION])]
124+
* \_Eval[[BUCKET(@timestamp{f}#13,PT1M[TIME_DURATION]) AS time_bucket#5]]
125+
* \_EsRelation[k8s][@timestamp{f}#13, client.ip{f}#17, cluster{f}#14, e..]
126+
* }</pre>
127+
*/
128+
public void testAvgOfAvgOverTime() {
129+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
130+
LogicalPlan plan = planK8s("""
131+
TS k8s
132+
| STATS avg_cost=avg(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute)
133+
| SORT avg_cost DESC, time_bucket DESC, cluster
134+
| LIMIT 10
135+
""");
136+
Project project = as(plan, Project.class);
137+
TopN topN = as(project.child(), TopN.class);
138+
Eval outerEval = as(topN.child(), Eval.class); // This is the surrogate average calculation for the outer average
139+
Aggregate outerStats = as(outerEval.child(), Aggregate.class);
140+
Eval innerEval = as(outerStats.child(), Eval.class); // Surrogate for the inner average
141+
TimeSeriesAggregate innerStats = as(innerEval.child(), TimeSeriesAggregate.class);
142+
Eval bucketEval = as(innerStats.child(), Eval.class); // compute the tbucket
143+
EsRelation relation = as(bucketEval.child(), EsRelation.class);
144+
}
145+
}

0 commit comments

Comments
 (0)