Skip to content

Commit edb97d3

Browse files
authored
Add dimension values for time-series (#136052)
We optimized the VALUES aggregation for time-series queries rather than introducing a specialized version for dimension fields, and the VALUES execution disappeared from the profiler. However, the VALUES aggregation reappeared during recent testing with multi-valued dimension fields. Therefore, this change introduces dimension_values for this purpose.
1 parent 7c0d2eb commit edb97d3

File tree

9 files changed

+164
-41
lines changed

9 files changed

+164
-41
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9197000,9185002
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
esql_resolve_fields_response_created,9185001
1+
dimension_values,9185002
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ilm_downsample_force_merge,9196000
1+
dimension_values,9197000

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
4949
Present.ENTRY,
5050
PresentOverTime.ENTRY,
5151
Absent.ENTRY,
52-
AbsentOverTime.ENTRY
52+
AbsentOverTime.ENTRY,
53+
DimensionValues.ENTRY
5354
);
5455
}
5556
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
14+
import org.elasticsearch.xpack.esql.core.expression.Expression;
15+
import org.elasticsearch.xpack.esql.core.expression.Literal;
16+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
17+
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.core.type.DataType;
19+
import org.elasticsearch.xpack.esql.planner.ToAggregator;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* A specialization of {@link Values} for collecting dimension fields in time-series queries.
28+
*/
29+
public class DimensionValues extends AggregateFunction implements ToAggregator {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"DimensionValues",
33+
DimensionValues::new
34+
);
35+
36+
public static final TransportVersion DIMENSION_VALUES_VERSION = TransportVersion.fromName("dimension_values");
37+
38+
public DimensionValues(Source source, Expression field) {
39+
super(source, field, Literal.TRUE, emptyList());
40+
}
41+
42+
private DimensionValues(StreamInput in) throws IOException {
43+
super(in);
44+
}
45+
46+
@Override
47+
public String getWriteableName() {
48+
return ENTRY.name;
49+
}
50+
51+
@Override
52+
protected NodeInfo<DimensionValues> info() {
53+
return NodeInfo.create(this, DimensionValues::new, field());
54+
}
55+
56+
@Override
57+
public DimensionValues replaceChildren(List<Expression> newChildren) {
58+
return new DimensionValues(source(), newChildren.get(0));
59+
}
60+
61+
@Override
62+
public DimensionValues withFilter(Expression filter) {
63+
if (filter instanceof Literal l && l.value() == Boolean.TRUE) {
64+
return this;
65+
}
66+
throw new UnsupportedOperationException("Dimension values do not support filters");
67+
}
68+
69+
@Override
70+
public DataType dataType() {
71+
return field().dataType().noText();
72+
}
73+
74+
@Override
75+
protected TypeResolution resolveType() {
76+
return new Values(source(), field(), filter()).resolveType();
77+
}
78+
79+
@Override
80+
public AggregatorFunctionSupplier supplier() {
81+
// TODO: link new implementation
82+
return new Values(source(), field(), filter()).supplier();
83+
}
84+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
2121
import org.elasticsearch.xpack.esql.core.util.Holder;
2222
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
23+
import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues;
2324
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
2425
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
2526
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
2627
import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket;
2728
import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket;
2829
import org.elasticsearch.xpack.esql.expression.function.scalar.internal.PackDimension;
2930
import org.elasticsearch.xpack.esql.expression.function.scalar.internal.UnpackDimension;
31+
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
3032
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
3133
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
3234
import org.elasticsearch.xpack.esql.plan.logical.Eval;
@@ -63,16 +65,16 @@
6365
* becomes
6466
*
6567
* TS k8s
66-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
67-
* | STATS max(rate_$1) BY host=`VALUES(host)`
68+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
69+
* | STATS max(rate_$1) BY host=`DIMENSION_VALUES(host)`
6870
*
6971
* TS k8s | STATS avg(rate(request)) BY host
7072
*
7173
* becomes
7274
*
7375
* TS k8s
74-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
75-
* | STATS sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`
76+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
77+
* | STATS sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`
7678
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
7779
* | KEEP `avg(rate(request))`, host
7880
*
@@ -82,8 +84,8 @@
8284
*
8385
* TS k8s
8486
* | EVAL `bucket(@timestamp, 1minute)`=datetrunc(@timestamp, 1minute)
85-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
86-
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
87+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
88+
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`, `bucket(@timestamp, 1minute)`
8789
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
8890
* | KEEP `avg(rate(request))`, host, `bucket(@timestamp, 1minute)`
8991
* </pre>
@@ -146,22 +148,16 @@
146148
* | STATS max(rate_$1 + rate_$2) BY host_values, time_bucket
147149
* </pre>
148150
*/
149-
public final class TranslateTimeSeriesAggregate extends OptimizerRules.OptimizerRule<Aggregate> {
151+
public final class TranslateTimeSeriesAggregate extends OptimizerRules.ParameterizedOptimizerRule<
152+
TimeSeriesAggregate,
153+
LogicalOptimizerContext> {
150154

151155
public TranslateTimeSeriesAggregate() {
152156
super(OptimizerRules.TransformDirection.UP);
153157
}
154158

155159
@Override
156-
protected LogicalPlan rule(Aggregate aggregate) {
157-
if (aggregate instanceof TimeSeriesAggregate ts && ts.timeBucket() == null) {
158-
return translate(ts);
159-
} else {
160-
return aggregate;
161-
}
162-
}
163-
164-
LogicalPlan translate(TimeSeriesAggregate aggregate) {
160+
protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContext context) {
165161
Holder<Attribute> tsid = new Holder<>();
166162
Holder<Attribute> timestamp = new Holder<>();
167163
aggregate.forEachDown(EsRelation.class, r -> {
@@ -278,7 +274,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
278274
firstPassGroupings.add(newFinalGroup);
279275
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
280276
} else {
281-
var valuesAgg = new Alias(g.source(), g.name(), new Values(g.source(), g));
277+
var valuesAgg = new Alias(g.source(), g.name(), valuesAggregate(context, g));
282278
firstPassAggs.add(valuesAgg);
283279
Alias pack = new Alias(
284280
g.source(),
@@ -361,6 +357,14 @@ private static List<? extends NamedExpression> mergeExpressions(
361357
return merged;
362358
}
363359

360+
private AggregateFunction valuesAggregate(LogicalOptimizerContext context, Attribute group) {
361+
if (group.isDimension() && context.minimumVersion().supports(DimensionValues.DIMENSION_VALUES_VERSION)) {
362+
return new DimensionValues(group.source(), group);
363+
} else {
364+
return new Values(group.source(), group);
365+
}
366+
}
367+
364368
private static class InternalNames {
365369
final Map<String, Integer> next = new HashMap<>();
366370

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests;
11+
12+
import java.io.IOException;
13+
14+
public class DimensionValuesSerializationTests extends AbstractExpressionSerializationTests<DimensionValues> {
15+
@Override
16+
protected DimensionValues createTestInstance() {
17+
return new DimensionValues(randomSource(), randomChild());
18+
}
19+
20+
@Override
21+
protected DimensionValues mutateInstance(DimensionValues instance) throws IOException {
22+
return new DimensionValues(
23+
instance.source(),
24+
randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild)
25+
);
26+
}
27+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLogicalPlanOptimizerTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.index.IndexMode;
1112
import org.elasticsearch.test.ESTestCase;
1213
import org.elasticsearch.xpack.esql.EsqlTestUtils;
@@ -41,6 +42,8 @@ public abstract class AbstractLogicalPlanOptimizerTests extends ESTestCase {
4142
protected static LogicalOptimizerContext logicalOptimizerCtx;
4243
protected static LogicalPlanOptimizer logicalOptimizer;
4344

45+
protected static LogicalPlanOptimizer logicalOptimizerWithLatestVersion;
46+
4447
protected static Map<String, EsField> mapping;
4548
protected static Analyzer analyzer;
4649
protected static Map<String, EsField> mappingAirports;
@@ -74,6 +77,9 @@ public static void init() {
7477
parser = new EsqlParser();
7578
logicalOptimizerCtx = unboundLogicalOptimizerContext();
7679
logicalOptimizer = new LogicalPlanOptimizer(logicalOptimizerCtx);
80+
logicalOptimizerWithLatestVersion = new LogicalPlanOptimizer(
81+
new LogicalOptimizerContext(logicalOptimizerCtx.configuration(), logicalOptimizerCtx.foldCtx(), TransportVersion.current())
82+
);
7783
enrichResolution = new EnrichResolution();
7884
AnalyzerTestUtils.loadEnrichPolicyResolution(enrichResolution, "languages_idx", "id", "languages_idx", "mapping-languages.json");
7985

0 commit comments

Comments
 (0)