Skip to content

Commit 67c70b8

Browse files
committed
Add dimension values for time-series
1 parent 40f90fb commit 67c70b8

File tree

10 files changed

+225
-25
lines changed

10 files changed

+225
-25
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9188000,9185001
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
initial_9.2.0,9185000
1+
dimension_values,9185001
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
resolved_index_expressions,9187000
1+
dimension_values,9188000

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
4949
Present.ENTRY,
5050
PresentOverTime.ENTRY,
5151
Absent.ENTRY,
52-
AbsentOverTime.ENTRY
52+
AbsentOverTime.ENTRY,
53+
DimensionValues.ENTRY
5354
);
5455
}
5556
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.NamedWriteable;
12+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
15+
import org.elasticsearch.xpack.esql.core.expression.Expression;
16+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
17+
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.core.type.DataType;
19+
import org.elasticsearch.xpack.esql.io.stream.VersionedExpression;
20+
import org.elasticsearch.xpack.esql.planner.ToAggregator;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
import static java.util.Collections.emptyList;
26+
27+
/**
28+
* A specialization of {@link Values} for collecting dimension fields in time-series queries.
29+
*/
30+
public class DimensionValues extends AggregateFunction implements ToAggregator, VersionedExpression {
31+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
32+
Expression.class,
33+
"DimensionValues",
34+
DimensionValues::new
35+
);
36+
37+
static final TransportVersion DIMENSION_VALUES_VERSION = TransportVersion.fromName("dimension_values");
38+
39+
public DimensionValues(Source source, Expression field, Expression filter) {
40+
super(source, field, filter, emptyList());
41+
}
42+
43+
private DimensionValues(StreamInput in) throws IOException {
44+
super(in);
45+
}
46+
47+
@Override
48+
public String getWriteableName() {
49+
return ENTRY.name;
50+
}
51+
52+
@Override
53+
public NamedWriteable getVersionedNamedWriteable(TransportVersion transportVersion) {
54+
if (transportVersion.onOrAfter(DIMENSION_VALUES_VERSION)) {
55+
return this;
56+
} else {
57+
// fallback to VALUES
58+
return new Values(source(), field(), filter());
59+
}
60+
}
61+
62+
@Override
63+
protected NodeInfo<DimensionValues> info() {
64+
return NodeInfo.create(this, DimensionValues::new, field(), filter());
65+
}
66+
67+
@Override
68+
public DimensionValues replaceChildren(List<Expression> newChildren) {
69+
return new DimensionValues(source(), newChildren.get(0), newChildren.get(1));
70+
}
71+
72+
@Override
73+
public DimensionValues withFilter(Expression filter) {
74+
return new DimensionValues(source(), field(), filter);
75+
}
76+
77+
@Override
78+
public DataType dataType() {
79+
return field().dataType().noText();
80+
}
81+
82+
@Override
83+
protected TypeResolution resolveType() {
84+
return new Values(source(), field(), filter()).resolveType();
85+
}
86+
87+
@Override
88+
public AggregatorFunctionSupplier supplier() {
89+
// TODO: link new implementation
90+
return new Values(source(), field(), filter()).supplier();
91+
}
92+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.TransportVersion;
1111
import org.elasticsearch.common.bytes.BytesReference;
1212
import org.elasticsearch.common.io.stream.BytesStreamOutput;
13+
import org.elasticsearch.common.io.stream.NamedWriteable;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.compute.data.Block;
1516
import org.elasticsearch.compute.data.BlockFactory;
@@ -296,4 +297,12 @@ static BytesReference fromConfigKey(String table, String column) throws IOExcept
296297
return key.bytes();
297298
}
298299
}
300+
301+
@Override
302+
public void writeNamedWriteable(NamedWriteable namedWriteable) throws IOException {
303+
if (namedWriteable instanceof VersionedExpression v) {
304+
namedWriteable = v.getVersionedNamedWriteable(getTransportVersion());
305+
}
306+
super.writeNamedWriteable(namedWriteable);
307+
}
299308
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.io.stream;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.NamedWriteable;
12+
13+
/**
14+
* An interface indicating to {@link PlanStreamOutput} that this expression can have multiple versions and may need to fallback.
15+
* TODO: Should we move this to VersionedNamedWriteable?
16+
*/
17+
public interface VersionedExpression {
18+
/**
19+
* Returns the writeable expression for the specified transport version.
20+
* This allows introducing a new expression when nodes are ready to handle it; otherwise, it falls back to the previous expression.
21+
*/
22+
NamedWriteable getVersionedNamedWriteable(TransportVersion transportVersion);
23+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
2121
import org.elasticsearch.xpack.esql.core.util.Holder;
2222
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
23+
import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues;
2324
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
2425
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
25-
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
2626
import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket;
2727
import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket;
2828
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
@@ -59,16 +59,16 @@
5959
* becomes
6060
*
6161
* TS k8s
62-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
63-
* | STATS max(rate_$1) BY host=`VALUES(host)`
62+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
63+
* | STATS max(rate_$1) BY host=`DIMENSION_VALUES(host)`
6464
*
6565
* TS k8s | STATS avg(rate(request)) BY host
6666
*
6767
* becomes
6868
*
6969
* TS k8s
70-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
71-
* | STATS sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`
70+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
71+
* | STATS sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`
7272
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
7373
* | KEEP `avg(rate(request))`, host
7474
*
@@ -78,8 +78,8 @@
7878
*
7979
* TS k8s
8080
* | EVAL `bucket(@timestamp, 1minute)`=datetrunc(@timestamp, 1minute)
81-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
82-
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
81+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
82+
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`, `bucket(@timestamp, 1minute)`
8383
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
8484
* | KEEP `avg(rate(request))`, host, `bucket(@timestamp, 1minute)`
8585
* </pre>
@@ -99,8 +99,8 @@
9999
* becomes
100100
*
101101
* TS k8s
102-
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used), VALUES(host) BY _tsid
103-
* | STATS max(rate_$1), $sum=sum($p1), $count=count($p1) BY host=`VALUES(host)`
102+
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used), DIMENSION_VALUES(host) BY _tsid
103+
* | STATS max(rate_$1), $sum=sum($p1), $count=count($p1) BY host=`DIMENSION_VALUES(host)`
104104
* | EVAL `avg(memory_used)` = $sum / $count
105105
* | KEEP `max(rate(request))`, `avg(memory_used)`, host
106106
*
@@ -110,8 +110,8 @@
110110
*
111111
* TS k8s
112112
* | EVAL `bucket(@timestamp, 5m)` = datetrunc(@timestamp, '5m')
113-
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
114-
* | STATS sum(rate_$1), `min(memory_used)` = min($p1) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
113+
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used)), DIMENSION_VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
114+
* | STATS sum(rate_$1), `min(memory_used)` = min($p1) BY pod=`DIMENSION_VALUES(pod)`, `bucket(@timestamp, 5m)`
115115
* | KEEP `min(memory_used)`, `sum(rate_$1)`, pod, `bucket(@timestamp, 5m)`
116116
*
117117
* {agg}_over_time time-series aggregation will be rewritten in the similar way
@@ -121,7 +121,7 @@
121121
* becomes
122122
*
123123
* FROM k8s
124-
* | STATS max_over_time_$1 = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
124+
* | STATS max_over_time_$1 = max(memory_usage), host_values=DIMENSION_VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
125125
* | STATS sum(max_over_time_$1) BY host_values, time_bucket
126126
*
127127
*
@@ -130,7 +130,7 @@
130130
* becomes
131131
*
132132
* FROM k8s
133-
* | STATS avg_over_time_$1 = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
133+
* | STATS avg_over_time_$1 = avg(memory_usage), host_values=DIMENSION_VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
134134
* | STATS sum(avg_over_time_$1) BY host_values, time_bucket
135135
*
136136
* TS k8s | STATS max(rate(post_requests) + rate(get_requests)) BY host, bucket(@timestamp, 1minute)
@@ -272,7 +272,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
272272
newFinalGroup = timeBucket.toAttribute();
273273
firstPassGroupings.add(newFinalGroup);
274274
} else {
275-
newFinalGroup = new Alias(g.source(), g.name(), new Values(g.source(), g), g.id());
275+
newFinalGroup = new Alias(g.source(), g.name(), new DimensionValues(g.source(), g, Literal.TRUE), g.id());
276276
firstPassAggs.add(newFinalGroup);
277277
}
278278
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
12+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
13+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.elasticsearch.test.TransportVersionUtils;
17+
import org.elasticsearch.xpack.esql.core.expression.Expression;
18+
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
19+
import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests;
20+
import org.elasticsearch.xpack.esql.expression.function.ReferenceAttributeTests;
21+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
22+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
23+
24+
import java.io.IOException;
25+
26+
import static org.hamcrest.Matchers.equalTo;
27+
28+
public class DimensionValuesSerializationTests extends AbstractExpressionSerializationTests<DimensionValues> {
29+
@Override
30+
protected DimensionValues createTestInstance() {
31+
return new DimensionValues(randomSource(), randomChild(), randomChild());
32+
}
33+
34+
@Override
35+
protected DimensionValues mutateInstance(DimensionValues instance) throws IOException {
36+
return new DimensionValues(
37+
instance.source(),
38+
randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild),
39+
randomValueOtherThan(instance.filter(), AbstractExpressionSerializationTests::randomChild)
40+
);
41+
}
42+
43+
public void testBWC() throws Exception {
44+
DimensionValues dimensionValues = new DimensionValues(
45+
randomSource(),
46+
randomAttributeWithoutQualifier(),
47+
randomAttributeWithoutQualifier()
48+
);
49+
TransportVersion version = TransportVersionUtils.randomVersionBetween(
50+
random(),
51+
TransportVersion.minimumCompatible(),
52+
TransportVersionUtils.getPreviousVersion(DimensionValues.DIMENSION_VALUES_VERSION)
53+
);
54+
NamedWriteableRegistry namedWriteableRegistry = getNamedWriteableRegistry();
55+
try (BytesStreamOutput output = new BytesStreamOutput()) {
56+
output.setTransportVersion(version);
57+
PlanStreamOutput planOut = new PlanStreamOutput(output, configuration());
58+
planOut.writeNamedWriteable(dimensionValues);
59+
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
60+
PlanStreamInput planIn = new PlanStreamInput(in, namedWriteableRegistry, configuration());
61+
planIn.setTransportVersion(version);
62+
Expression oldValues = (Expression) planIn.readNamedWriteable(categoryClass());
63+
Values expected = new Values(dimensionValues.source(), dimensionValues.field(), dimensionValues.filter());
64+
assertThat(oldValues, equalTo(expected));
65+
}
66+
}
67+
}
68+
69+
// can't serialize qualified attribute with older versions
70+
private Expression randomAttributeWithoutQualifier() {
71+
ReferenceAttribute e = ReferenceAttributeTests.randomReferenceAttribute(ESTestCase.randomBoolean());
72+
return new ReferenceAttribute(e.source(), null, e.name(), e.dataType(), e.nullable(), e.id(), e.synthetic());
73+
}
74+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@
4848
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
4949
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
5050
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
51+
import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues;
5152
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
5253
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
5354
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
5455
import org.elasticsearch.xpack.esql.expression.function.aggregate.Percentile;
5556
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
5657
import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
5758
import org.elasticsearch.xpack.esql.expression.function.aggregate.SummationMode;
58-
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
5959
import org.elasticsearch.xpack.esql.expression.function.fulltext.Match;
6060
import org.elasticsearch.xpack.esql.expression.function.fulltext.MultiMatch;
6161
import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket;
@@ -7465,7 +7465,7 @@ public void testTranslateMetricsGroupedByOneDimension() {
74657465

74667466
Rate rate = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Rate.class);
74677467
assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
7468-
Values values = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), Values.class);
7468+
DimensionValues values = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), DimensionValues.class);
74697469
assertThat(Expressions.attribute(values.field()).name(), equalTo("cluster"));
74707470
}
74717471

@@ -7502,9 +7502,9 @@ public void testTranslateMetricsGroupedByTwoDimension() {
75027502
assertThat(aggsByTsid.aggregates(), hasSize(3)); // rates, values(cluster), values(pod)
75037503
Rate rate = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Rate.class);
75047504
assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
7505-
Values values1 = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), Values.class);
7505+
DimensionValues values1 = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), DimensionValues.class);
75067506
assertThat(Expressions.attribute(values1.field()).name(), equalTo("cluster"));
7507-
Values values2 = as(Alias.unwrap(aggsByTsid.aggregates().get(2)), Values.class);
7507+
DimensionValues values2 = as(Alias.unwrap(aggsByTsid.aggregates().get(2)), DimensionValues.class);
75087508
assertThat(Expressions.attribute(values2.field()).name(), equalTo("pod"));
75097509
}
75107510

@@ -7571,9 +7571,9 @@ public void testTranslateMetricsGroupedByTimeBucketAndDimensions() {
75717571
assertThat(aggsByTsid.aggregates(), hasSize(4)); // rate, values(pod), values(cluster), bucket
75727572
Rate rate = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Rate.class);
75737573
assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
7574-
Values podValues = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), Values.class);
7574+
DimensionValues podValues = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), DimensionValues.class);
75757575
assertThat(Expressions.attribute(podValues.field()).name(), equalTo("pod"));
7576-
Values clusterValues = as(Alias.unwrap(aggsByTsid.aggregates().get(3)), Values.class);
7576+
DimensionValues clusterValues = as(Alias.unwrap(aggsByTsid.aggregates().get(3)), DimensionValues.class);
75777577
assertThat(Expressions.attribute(clusterValues.field()).name(), equalTo("cluster"));
75787578
}
75797579

@@ -7640,7 +7640,7 @@ public void testTranslateMixedAggsGroupedByTimeBucketAndDimensions() {
76407640
assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
76417641
LastOverTime lastSum = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), LastOverTime.class);
76427642
assertThat(Expressions.attribute(lastSum.field()).name(), equalTo("network.cost"));
7643-
Values clusterValues = as(Alias.unwrap(aggsByTsid.aggregates().get(3)), Values.class);
7643+
DimensionValues clusterValues = as(Alias.unwrap(aggsByTsid.aggregates().get(3)), DimensionValues.class);
76447644
assertThat(Expressions.attribute(clusterValues.field()).name(), equalTo("cluster"));
76457645
}
76467646

@@ -7697,7 +7697,7 @@ public void testAdjustMetricsRateBeforeFinalAgg() {
76977697
assertThat(mul.right().fold(FoldContext.small()), equalTo(1.05));
76987698
Rate rate = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Rate.class);
76997699
assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
7700-
Values values = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), Values.class);
7700+
DimensionValues values = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), DimensionValues.class);
77017701
assertThat(Expressions.attribute(values.field()).name(), equalTo("cluster"));
77027702
}
77037703

0 commit comments

Comments
 (0)