Skip to content

Commit 1768bc7

Browse files
authored
Re-enable verifier and support post-processing (#138497)
1 parent 4fb85c6 commit 1768bc7

File tree

20 files changed

+239
-170
lines changed

20 files changed

+239
-170
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,23 @@ public List<T> collect(Predicate<? super T> predicate) {
176176
return l.isEmpty() ? emptyList() : l;
177177
}
178178

179+
public <E extends T> List<E> collect(Class<E> typeToken) {
180+
return collect(typeToken, n -> true);
181+
}
182+
183+
public <E extends T> List<E> collect(Class<E> typeToken, Predicate<? super E> predicate) {
184+
List<E> l = new ArrayList<>();
185+
forEachDown(n -> {
186+
if (typeToken.isInstance(n)) {
187+
E e = typeToken.cast(n);
188+
if (predicate.test(e)) {
189+
l.add(e);
190+
}
191+
}
192+
});
193+
return l.isEmpty() ? emptyList() : l;
194+
}
195+
179196
public List<T> collectLeaves() {
180197
return collect(n -> n.children().isEmpty());
181198
}

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ protected void shouldSkipTest(String testName) throws IOException {
6060
testCase.requiredCapabilities.contains(SUBQUERY_IN_FROM_COMMAND.capabilityName())
6161
);
6262

63-
assumeFalse("Tests using PROMQL are not supported for now", testCase.requiredCapabilities.contains(PROMQL_V0.capabilityName()));
63+
assumeFalse(
64+
"Tests using PROMQL are not supported for now",
65+
testCase.requiredCapabilities.contains(PROMQL_PRE_TECH_PREVIEW_V1.capabilityName())
66+
);
6467

6568
assumeFalse(
6669
"Tests using GROUP_BY_ALL are skipped since we add a new _timeseries field",

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-avg-over-time.csv-spec

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,25 @@ cost:double | time_bucket:datetime
1717
36.375 | 2024-05-10T00:11:00.000Z
1818
;
1919

20+
avg_over_time_of_double_no_grouping_promql
21+
required_capability: promql_pre_tech_preview_v1
22+
TS k8s
23+
| PROMQL step 1m (sum(avg_over_time(network.cost[1m])))
24+
| SORT `sum(avg_over_time(network.cost[1m]))` DESC, step DESC | LIMIT 10;
25+
26+
sum(avg_over_time(network.cost[1m])):double | step:date
27+
69.6875 | 2024-05-10T00:09:00.000Z
28+
56.5625 | 2024-05-10T00:08:00.000Z
29+
49.5 | 2024-05-10T00:17:00.000Z
30+
48.3125 | 2024-05-10T00:22:00.000Z
31+
45.8125 | 2024-05-10T00:15:00.000Z
32+
40.4375 | 2024-05-10T00:06:00.000Z
33+
39.54166666666667 | 2024-05-10T00:13:00.000Z
34+
37.9375 | 2024-05-10T00:12:00.000Z
35+
36.6875 | 2024-05-10T00:19:00.000Z
36+
36.375 | 2024-05-10T00:11:00.000Z
37+
;
38+
2039
avg_over_time_of_double_no_grouping_single_bucket
2140
required_capability: ts_command_v0
2241
required_capability: tbucket
@@ -28,11 +47,11 @@ cost:double | time_bucket:datetime
2847
;
2948

3049
avg_over_time_of_double_no_grouping_single_bucket_promql
31-
required_capability: promql_v0
50+
required_capability: promql_pre_tech_preview_v1
3251
TS k8s
3352
| PROMQL step 1h (sum(avg_over_time(network.cost[1h])));
3453

35-
sum(avg_over_time(network.cost[1h])):double | TBUCKET:date
54+
sum(avg_over_time(network.cost[1h])):double | step:date
3655
56.32254035241995 | 2024-05-10T00:00:00.000Z
3756
;
3857

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,17 @@ max(rate(network.total_bytes_in)): double | time_bucket:date
132132
23.702205882352942 | 2024-05-10T00:15:00.000Z
133133
;
134134

135+
oneRateWithPromql
136+
required_capability: promql_pre_tech_preview_v1
137+
TS k8s
138+
| PROMQL step 5m (max(rate(network.total_bytes_in[5m])))
139+
| SORT step DESC | LIMIT 2;
140+
141+
max(rate(network.total_bytes_in[5m])):double | step:date
142+
6.980660660660663 | 2024-05-10T00:20:00.000Z
143+
23.702205882352942 | 2024-05-10T00:15:00.000Z
144+
;
145+
135146
oneRateWithSingleTBucket
136147
required_capability: ts_command_v0
137148
required_capability: tbucket
@@ -142,12 +153,12 @@ max(rate(network.total_bytes_in)): double | time_bucket:date
142153
4.379885057471264 | 2024-05-10T00:00:00.000Z
143154
;
144155

145-
oneRateWithSingleTBucketPromql
146-
required_capability: promql_v0
156+
oneRateWithSingleStepPromql
157+
required_capability: promql_pre_tech_preview_v1
147158
TS k8s
148159
| PROMQL step 1h (max(rate(network.total_bytes_in[1h])));
149160

150-
max(rate(network.total_bytes_in[1h])):double | TBUCKET:date
161+
max(rate(network.total_bytes_in[1h])):double | step:date
151162
4.379885057471264 | 2024-05-10T00:00:00.000Z
152163
;
153164

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,9 +1689,13 @@ public enum Cap {
16891689
TIME_SERIES_WINDOW_V1,
16901690

16911691
/**
1692-
* PromQL support in ESQL
1692+
* PromQL support in ESQL, before it is released into tech preview.
1693+
* When implementing new functionality or breaking changes,
1694+
* we'll simply increment the version suffix at the end to prevent bwc tests from running.
1695+
* As soon as we move into tech preview, we'll replace this capability with a "EXPONENTIAL_HISTOGRAM_TECH_PREVIEW" one.
1696+
* At this point, we need to add new capabilities for any further changes.
16931697
*/
1694-
PROMQL_V0(Build.current().isSnapshot()),
1698+
PROMQL_PRE_TECH_PREVIEW_V1(Build.current().isSnapshot()),
16951699

16961700
/**
16971701
* KNN function adds support for k and visit_percentage options

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PromqlFeatures.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ private PromqlFeatures() {
2121
* Exists to provide a single point of change and minimize noise when upgrading capability versions.
2222
*/
2323
public static boolean isEnabled() {
24-
return EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled() && EsqlCapabilities.Cap.PROMQL_V0.isEnabled();
24+
return EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled() && EsqlCapabilities.Cap.PROMQL_PRE_TECH_PREVIEW_V1.isEnabled();
2525
}
2626
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/promql/function/PromqlFunctionRegistry.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,7 @@ public Boolean functionExists(String name) {
358358

359359
public FunctionDefinition functionMetadata(String name) {
360360
String normalized = normalize(name);
361-
FunctionDefinition metadata = promqlFunctions.get(normalized);
362-
return metadata;
361+
return promqlFunctions.get(normalized);
363362
}
364363

365364
public Function buildEsqlFunction(String name, Source source, List<Expression> params) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
11-
import org.elasticsearch.xpack.esql.action.PromqlFeatures;
1211
import org.elasticsearch.xpack.esql.common.Failures;
1312
import org.elasticsearch.xpack.esql.core.type.DataType;
1413
import org.elasticsearch.xpack.esql.optimizer.rules.PruneInlineJoinOnEmptyRightSide;
@@ -76,7 +75,6 @@
7675
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PruneLeftJoinOnNullMatchingField;
7776
import org.elasticsearch.xpack.esql.optimizer.rules.logical.promql.TranslatePromqlToTimeSeriesAggregate;
7877
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
79-
import org.elasticsearch.xpack.esql.plan.logical.promql.PromqlCommand;
8078
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
8179
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
8280

@@ -125,11 +123,7 @@ public LogicalPlan optimize(LogicalPlan verified) {
125123

126124
Failures failures = verifier.verify(optimized, verified.output());
127125
if (failures.hasFailures()) {
128-
// TODO re-enable verification for PromQL once we make sure the output columns don't change
129-
// Throw exception unless we have PromQL with the feature enabled
130-
if (PromqlFeatures.isEnabled() == false || verified.anyMatch(PromqlCommand.class::isInstance) == false) {
131-
throw new VerificationException(failures);
132-
}
126+
throw new VerificationException(failures);
133127
}
134128
optimized.setOptimized();
135129
return optimized;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/promql/TranslatePromqlToTimeSeriesAggregate.java

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@
1212
import org.elasticsearch.xpack.esql.capabilities.ConfigurationAware;
1313
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
1414
import org.elasticsearch.xpack.esql.core.expression.Alias;
15+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1516
import org.elasticsearch.xpack.esql.core.expression.Expression;
1617
import org.elasticsearch.xpack.esql.core.expression.Literal;
1718
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1819
import org.elasticsearch.xpack.esql.core.expression.function.Function;
1920
import org.elasticsearch.xpack.esql.core.expression.predicate.regex.RLikePattern;
2021
import org.elasticsearch.xpack.esql.core.tree.Source;
21-
import org.elasticsearch.xpack.esql.expression.Order;
2222
import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket;
23+
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDouble;
2324
import org.elasticsearch.xpack.esql.expression.function.scalar.string.EndsWith;
2425
import org.elasticsearch.xpack.esql.expression.function.scalar.string.StartsWith;
2526
import org.elasticsearch.xpack.esql.expression.function.scalar.string.regex.RLike;
@@ -37,7 +38,6 @@
3738
import org.elasticsearch.xpack.esql.plan.logical.Eval;
3839
import org.elasticsearch.xpack.esql.plan.logical.Filter;
3940
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
40-
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
4141
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
4242
import org.elasticsearch.xpack.esql.plan.logical.promql.AcrossSeriesAggregate;
4343
import org.elasticsearch.xpack.esql.plan.logical.promql.PlaceholderRelation;
@@ -55,8 +55,6 @@
5555
import java.util.List;
5656
import java.util.Map;
5757

58-
import static java.util.Arrays.asList;
59-
6058
/**
6159
* Translates PromQL logical plans into ESQL TimeSeriesAggregate nodes.
6260
*
@@ -132,15 +130,15 @@ private record MapResult(LogicalPlan plan, Map<String, Expression> extras) {}
132130
// - Selector -> EsRelation + Filter
133131
private static MapResult map(PromqlCommand promqlCommand, LogicalPlan p) {
134132
if (p instanceof Selector selector) {
135-
return map(promqlCommand, selector);
133+
return mapSelector(selector);
136134
}
137135
if (p instanceof PromqlFunctionCall functionCall) {
138-
return map(promqlCommand, functionCall);
136+
return mapFunction(promqlCommand, functionCall);
139137
}
140138
throw new QlIllegalArgumentException("Unsupported PromQL plan node: {}", p);
141139
}
142140

143-
private static MapResult map(PromqlCommand promqlCommand, Selector selector) {
141+
private static MapResult mapSelector(Selector selector) {
144142
// Create a placeholder relation to be replaced later
145143
var matchers = selector.labelMatchers();
146144
Expression matcherCondition = translateLabelMatchers(selector.source(), selector.labels(), matchers);
@@ -162,7 +160,7 @@ private static MapResult map(PromqlCommand promqlCommand, Selector selector) {
162160
return new MapResult(p, extras);
163161
}
164162

165-
private static MapResult map(PromqlCommand promqlCommand, PromqlFunctionCall functionCall) {
163+
private static MapResult mapFunction(PromqlCommand promqlCommand, PromqlFunctionCall functionCall) {
166164
MapResult childResult = map(promqlCommand, functionCall.child());
167165
Map<String, Expression> extras = childResult.extras;
168166

@@ -180,59 +178,14 @@ private static MapResult map(PromqlCommand promqlCommand, PromqlFunctionCall fun
180178
extras.put("field", esqlFunction);
181179
result = new MapResult(childResult.plan, extras);
182180
} else if (functionCall instanceof AcrossSeriesAggregate acrossAggregate) {
183-
// expects
184-
Function esqlFunction = PromqlFunctionRegistry.INSTANCE.buildEsqlFunction(
185-
acrossAggregate.functionName(),
186-
acrossAggregate.source(),
187-
List.of(target)
188-
);
189-
190181
List<NamedExpression> aggs = new ArrayList<>();
191-
aggs.add(new Alias(acrossAggregate.source(), acrossAggregate.sourceText(), esqlFunction));
192-
193182
List<Expression> groupings = new ArrayList<>(acrossAggregate.groupings().size());
194-
195-
// add groupings
196-
for (Expression grouping : acrossAggregate.groupings()) {
197-
NamedExpression named;
198-
if (grouping instanceof NamedExpression ne) {
199-
named = ne;
200-
} else {
201-
named = new Alias(grouping.source(), grouping.sourceText(), grouping);
202-
}
203-
aggs.add(named);
204-
groupings.add(named.toAttribute());
205-
}
206-
207-
Expression timeBucketSize;
208-
if (promqlCommand.isRangeQuery()) {
209-
timeBucketSize = promqlCommand.step();
210-
} else {
211-
// use default lookback for instant queries
212-
timeBucketSize = Literal.timeDuration(promqlCommand.source(), DEFAULT_LOOKBACK);
213-
}
214-
Bucket b = new Bucket(
215-
promqlCommand.source(),
216-
promqlCommand.timestamp(),
217-
timeBucketSize,
218-
null,
219-
null,
220-
ConfigurationAware.CONFIGURATION_MARKER
221-
);
222-
String bucketName = "TBUCKET";
223-
Alias tbucket = new Alias(b.source(), bucketName, b);
224-
aggs.add(tbucket.toAttribute());
225-
groupings.add(tbucket.toAttribute());
183+
Alias stepBucket = createStepBucketAlias(promqlCommand, acrossAggregate);
184+
initAggregatesAndGroupings(acrossAggregate, target, aggs, groupings, stepBucket.toAttribute());
226185

227186
LogicalPlan p = childResult.plan;
228-
p = new Eval(tbucket.source(), p, List.of(tbucket));
187+
p = new Eval(stepBucket.source(), p, List.of(stepBucket));
229188
p = new TimeSeriesAggregate(acrossAggregate.source(), p, groupings, aggs, null);
230-
// sort the data ascending by time bucket
231-
p = new OrderBy(
232-
acrossAggregate.source(),
233-
p,
234-
asList(new Order(acrossAggregate.source(), tbucket.toAttribute(), Order.OrderDirection.ASC, Order.NullsPosition.FIRST))
235-
);
236189
result = new MapResult(p, extras);
237190
} else {
238191
throw new QlIllegalArgumentException("Unsupported PromQL function call: {}", functionCall);
@@ -241,6 +194,55 @@ private static MapResult map(PromqlCommand promqlCommand, PromqlFunctionCall fun
241194
return result;
242195
}
243196

197+
private static void initAggregatesAndGroupings(
198+
AcrossSeriesAggregate acrossAggregate,
199+
Expression target,
200+
List<NamedExpression> aggs,
201+
List<Expression> groupings,
202+
Attribute stepBucket
203+
) {
204+
// main aggregation
205+
Function esqlFunction = PromqlFunctionRegistry.INSTANCE.buildEsqlFunction(
206+
acrossAggregate.functionName(),
207+
acrossAggregate.source(),
208+
// to double conversion of the metric to ensure a consistent output type
209+
// TODO it's probably more efficient to wrap the function in the ToDouble
210+
// but for some reason this doesn't work if you have an inner and outer aggregation
211+
List.of(new ToDouble(target.source(), target))
212+
);
213+
214+
aggs.add(new Alias(acrossAggregate.source(), acrossAggregate.sourceText(), esqlFunction, acrossAggregate.valueId()));
215+
216+
// timestamp/step
217+
aggs.add(stepBucket);
218+
groupings.add(stepBucket);
219+
220+
// additional groupings (by)
221+
for (NamedExpression grouping : acrossAggregate.groupings()) {
222+
aggs.add(grouping);
223+
groupings.add(grouping.toAttribute());
224+
}
225+
}
226+
227+
private static Alias createStepBucketAlias(PromqlCommand promqlCommand, AcrossSeriesAggregate acrossAggregate) {
228+
Expression timeBucketSize;
229+
if (promqlCommand.isRangeQuery()) {
230+
timeBucketSize = promqlCommand.step();
231+
} else {
232+
// use default lookback for instant queries
233+
timeBucketSize = Literal.timeDuration(promqlCommand.source(), DEFAULT_LOOKBACK);
234+
}
235+
Bucket b = new Bucket(
236+
promqlCommand.source(),
237+
promqlCommand.timestamp(),
238+
timeBucketSize,
239+
null,
240+
null,
241+
ConfigurationAware.CONFIGURATION_MARKER
242+
);
243+
return new Alias(b.source(), "step", b, acrossAggregate.stepId());
244+
}
245+
244246
/**
245247
* Translates PromQL label matchers into ESQL filter expressions.
246248
*

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/promql/PromqlLogicalPlanBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.core.expression.Expression;
1414
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1515
import org.elasticsearch.xpack.esql.core.expression.Literal;
16+
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1617
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
1718
import org.elasticsearch.xpack.esql.core.expression.predicate.operator.arithmetic.Arithmetics;
1819
import org.elasticsearch.xpack.esql.core.tree.Node;
@@ -478,8 +479,7 @@ public LogicalPlan visitFunction(PromqlBaseParser.FunctionContext ctx) {
478479

479480
PromqlBaseParser.LabelListContext labelListCtx = groupingContext.labelList();
480481
List<String> groupingKeys = visitLabelList(labelListCtx);
481-
// TODO: this
482-
List<Expression> groupings = new ArrayList<>(groupingKeys.size());
482+
List<NamedExpression> groupings = new ArrayList<>(groupingKeys.size());
483483
for (int i = 0; i < groupingKeys.size(); i++) {
484484
groupings.add(new UnresolvedAttribute(source(labelListCtx.labelName(i)), groupingKeys.get(i)));
485485
}

0 commit comments

Comments
 (0)