Skip to content

Commit 5cfd4d1

Browse files
committed
Add dimension values for time-series
1 parent 40f90fb commit 5cfd4d1

File tree

10 files changed

+250
-23
lines changed

10 files changed

+250
-23
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9188000,9185001
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
initial_9.2.0,9185000
1+
dimension_values,9185001
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
resolved_index_expressions,9187000
1+
dimension_values,9188000

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
4949
Present.ENTRY,
5050
PresentOverTime.ENTRY,
5151
Absent.ENTRY,
52-
AbsentOverTime.ENTRY
52+
AbsentOverTime.ENTRY,
53+
DimensionValues.ENTRY
5354
);
5455
}
5556
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.NamedWriteable;
12+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
15+
import org.elasticsearch.xpack.esql.core.expression.Expression;
16+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
17+
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.core.type.DataType;
19+
import org.elasticsearch.xpack.esql.io.stream.VersionedExpression;
20+
import org.elasticsearch.xpack.esql.planner.ToAggregator;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
import static java.util.Collections.emptyList;
26+
27+
/**
28+
* A specialization of {@link Values} for collecting dimension fields in time-series queries.
29+
*/
30+
public class DimensionValues extends AggregateFunction implements ToAggregator, VersionedExpression {
31+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
32+
Expression.class,
33+
"DimensionValues",
34+
DimensionValues::new
35+
);
36+
37+
static final TransportVersion DIMENSION_VALUES_VERSION = TransportVersion.fromName("dimension_values");
38+
39+
public DimensionValues(Source source, Expression field, Expression filter) {
40+
super(source, field, filter, emptyList());
41+
}
42+
43+
private DimensionValues(StreamInput in) throws IOException {
44+
super(in);
45+
}
46+
47+
@Override
48+
public String getWriteableName() {
49+
return ENTRY.name;
50+
}
51+
52+
@Override
53+
public NamedWriteable getVersionedNamedWriteable(TransportVersion transportVersion) {
54+
if (transportVersion.onOrAfter(DIMENSION_VALUES_VERSION)) {
55+
return this;
56+
} else {
57+
// fallback to VALUES
58+
return new Values(source(), field(), filter());
59+
}
60+
}
61+
62+
@Override
63+
protected NodeInfo<DimensionValues> info() {
64+
return NodeInfo.create(this, DimensionValues::new, field(), filter());
65+
}
66+
67+
@Override
68+
public DimensionValues replaceChildren(List<Expression> newChildren) {
69+
return new DimensionValues(source(), newChildren.get(0), newChildren.get(1));
70+
}
71+
72+
@Override
73+
public DimensionValues withFilter(Expression filter) {
74+
return new DimensionValues(source(), field(), filter);
75+
}
76+
77+
@Override
78+
public DataType dataType() {
79+
return field().dataType().noText();
80+
}
81+
82+
@Override
83+
protected TypeResolution resolveType() {
84+
return new Values(source(), field(), filter()).resolveType();
85+
}
86+
87+
@Override
88+
public AggregatorFunctionSupplier supplier() {
89+
// TODO: link new implementation
90+
return new Values(source(), field(), filter()).supplier();
91+
}
92+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.TransportVersion;
1111
import org.elasticsearch.common.bytes.BytesReference;
1212
import org.elasticsearch.common.io.stream.BytesStreamOutput;
13+
import org.elasticsearch.common.io.stream.NamedWriteable;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.compute.data.Block;
1516
import org.elasticsearch.compute.data.BlockFactory;
@@ -296,4 +297,12 @@ static BytesReference fromConfigKey(String table, String column) throws IOExcept
296297
return key.bytes();
297298
}
298299
}
300+
301+
@Override
302+
public void writeNamedWriteable(NamedWriteable namedWriteable) throws IOException {
303+
if (namedWriteable instanceof VersionedExpression v) {
304+
namedWriteable = v.getVersionedNamedWriteable(getTransportVersion());
305+
}
306+
super.writeNamedWriteable(namedWriteable);
307+
}
299308
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.io.stream;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.NamedWriteable;
12+
13+
/**
14+
* An interface indicating to {@link PlanStreamOutput} that this expression can have multiple versions and may need to fallback.
15+
* TODO: Should we move this to VersionedNamedWriteable?
16+
*/
17+
public interface VersionedExpression {
18+
/**
19+
* Returns the writeable expression for the specified transport version.
20+
* This allows introducing a new expression when nodes are ready to handle it; otherwise, it falls back to the previous expression.
21+
*/
22+
NamedWriteable getVersionedNamedWriteable(TransportVersion transportVersion);
23+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
2121
import org.elasticsearch.xpack.esql.core.util.Holder;
2222
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
23+
import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues;
2324
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
2425
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
2526
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
@@ -59,16 +60,16 @@
5960
* becomes
6061
*
6162
* TS k8s
62-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
63-
* | STATS max(rate_$1) BY host=`VALUES(host)`
63+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
64+
* | STATS max(rate_$1) BY host=`DIMENSION_VALUES(host)`
6465
*
6566
* TS k8s | STATS avg(rate(request)) BY host
6667
*
6768
* becomes
6869
*
6970
* TS k8s
70-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
71-
* | STATS sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`
71+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
72+
* | STATS sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`
7273
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
7374
* | KEEP `avg(rate(request))`, host
7475
*
@@ -78,8 +79,8 @@
7879
*
7980
* TS k8s
8081
* | EVAL `bucket(@timestamp, 1minute)`=datetrunc(@timestamp, 1minute)
81-
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
82-
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
82+
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
83+
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`, `bucket(@timestamp, 1minute)`
8384
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
8485
* | KEEP `avg(rate(request))`, host, `bucket(@timestamp, 1minute)`
8586
* </pre>
@@ -99,8 +100,8 @@
99100
* becomes
100101
*
101102
* TS k8s
102-
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used), VALUES(host) BY _tsid
103-
* | STATS max(rate_$1), $sum=sum($p1), $count=count($p1) BY host=`VALUES(host)`
103+
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used), DIMENSION_VALUES(host) BY _tsid
104+
* | STATS max(rate_$1), $sum=sum($p1), $count=count($p1) BY host=`DIMENSION_VALUES(host)`
104105
* | EVAL `avg(memory_used)` = $sum / $count
105106
* | KEEP `max(rate(request))`, `avg(memory_used)`, host
106107
*
@@ -110,8 +111,8 @@
110111
*
111112
* TS k8s
112113
* | EVAL `bucket(@timestamp, 5m)` = datetrunc(@timestamp, '5m')
113-
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
114-
* | STATS sum(rate_$1), `min(memory_used)` = min($p1) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
114+
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used)), DIMENSION_VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
115+
* | STATS sum(rate_$1), `min(memory_used)` = min($p1) BY pod=`DIMENSION_VALUES(pod)`, `bucket(@timestamp, 5m)`
115116
* | KEEP `min(memory_used)`, `sum(rate_$1)`, pod, `bucket(@timestamp, 5m)`
116117
*
117118
* {agg}_over_time time-series aggregation will be rewritten in the similar way
@@ -121,7 +122,7 @@
121122
* becomes
122123
*
123124
* FROM k8s
124-
* | STATS max_over_time_$1 = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
125+
* | STATS max_over_time_$1 = max(memory_usage), host_values=DIMENSION_VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
125126
* | STATS sum(max_over_time_$1) BY host_values, time_bucket
126127
*
127128
*
@@ -130,7 +131,7 @@
130131
* becomes
131132
*
132133
* FROM k8s
133-
* | STATS avg_over_time_$1 = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
134+
* | STATS avg_over_time_$1 = avg(memory_usage), host_values=DIMENSION_VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
134135
* | STATS sum(avg_over_time_$1) BY host_values, time_bucket
135136
*
136137
* TS k8s | STATS max(rate(post_requests) + rate(get_requests)) BY host, bucket(@timestamp, 1minute)
@@ -272,7 +273,11 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
272273
newFinalGroup = timeBucket.toAttribute();
273274
firstPassGroupings.add(newFinalGroup);
274275
} else {
275-
newFinalGroup = new Alias(g.source(), g.name(), new Values(g.source(), g), g.id());
276+
if (g.isDimension()) {
277+
newFinalGroup = new Alias(g.source(), g.name(), new DimensionValues(g.source(), g, Literal.TRUE), g.id());
278+
} else {
279+
newFinalGroup = new Alias(g.source(), g.name(), new Values(g.source(), g, Literal.TRUE), g.id());
280+
}
276281
firstPassAggs.add(newFinalGroup);
277282
}
278283
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
12+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
13+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.elasticsearch.test.TransportVersionUtils;
17+
import org.elasticsearch.xpack.esql.core.expression.Expression;
18+
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
19+
import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests;
20+
import org.elasticsearch.xpack.esql.expression.function.ReferenceAttributeTests;
21+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
22+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
23+
24+
import java.io.IOException;
25+
26+
import static org.hamcrest.Matchers.equalTo;
27+
28+
public class DimensionValuesSerializationTests extends AbstractExpressionSerializationTests<DimensionValues> {
29+
@Override
30+
protected DimensionValues createTestInstance() {
31+
return new DimensionValues(randomSource(), randomChild(), randomChild());
32+
}
33+
34+
@Override
35+
protected DimensionValues mutateInstance(DimensionValues instance) throws IOException {
36+
return new DimensionValues(
37+
instance.source(),
38+
randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild),
39+
randomValueOtherThan(instance.filter(), AbstractExpressionSerializationTests::randomChild)
40+
);
41+
}
42+
43+
public void testBWC() throws Exception {
44+
DimensionValues dimensionValues = new DimensionValues(
45+
randomSource(),
46+
randomAttributeWithoutQualifier(),
47+
randomAttributeWithoutQualifier()
48+
);
49+
TransportVersion version = TransportVersionUtils.randomVersionBetween(
50+
random(),
51+
TransportVersion.minimumCompatible(),
52+
TransportVersionUtils.getPreviousVersion(DimensionValues.DIMENSION_VALUES_VERSION)
53+
);
54+
NamedWriteableRegistry namedWriteableRegistry = getNamedWriteableRegistry();
55+
try (BytesStreamOutput output = new BytesStreamOutput()) {
56+
output.setTransportVersion(version);
57+
PlanStreamOutput planOut = new PlanStreamOutput(output, configuration());
58+
planOut.writeNamedWriteable(dimensionValues);
59+
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
60+
PlanStreamInput planIn = new PlanStreamInput(in, namedWriteableRegistry, configuration());
61+
planIn.setTransportVersion(version);
62+
Expression oldValues = (Expression) planIn.readNamedWriteable(categoryClass());
63+
Values expected = new Values(dimensionValues.source(), dimensionValues.field(), dimensionValues.filter());
64+
assertThat(oldValues, equalTo(expected));
65+
}
66+
}
67+
}
68+
69+
// can't serialize qualified attribute with older versions
70+
private Expression randomAttributeWithoutQualifier() {
71+
ReferenceAttribute e = ReferenceAttributeTests.randomReferenceAttribute(ESTestCase.randomBoolean());
72+
return new ReferenceAttribute(e.source(), null, e.name(), e.dataType(), e.nullable(), e.id(), e.synthetic());
73+
}
74+
}

0 commit comments

Comments
 (0)