Skip to content

Commit 09541c5

Browse files
authored
Avoid sorted source for time_series aggs without rates (elastic#127033)
With this change, TS index | STATS ... will be translated to FROM index METADATA _tsid | STATS ... to avoid emitting docs in _tsid and timestamp order, which is expensive. For example, this reduces the execution time of the below query with tsdb track from 50 seconds to 4.5 seconds. TS tsdb | STATS sum(max_over_time(kubernetes.container.memory.usage.bytes)) BY bucket(@timestamp, 5minute)
1 parent c2fdc06 commit 09541c5

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext
132132
public Query termQuery(Object value, SearchExecutionContext context) {
133133
throw new IllegalArgumentException("[" + NAME + "] is not searchable");
134134
}
135+
136+
@Override
137+
public BlockLoader blockLoader(BlockLoaderContext blContext) {
138+
return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name());
139+
}
135140
}
136141

137142
private final boolean useDocValuesSkipper;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
99

10+
import org.elasticsearch.index.IndexMode;
1011
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
1112
import org.elasticsearch.xpack.esql.core.expression.Alias;
1213
import org.elasticsearch.xpack.esql.core.expression.Attribute;
@@ -19,6 +20,7 @@
1920
import org.elasticsearch.xpack.esql.core.util.Holder;
2021
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
2122
import org.elasticsearch.xpack.esql.expression.function.aggregate.FromPartial;
23+
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
2224
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
2325
import org.elasticsearch.xpack.esql.expression.function.aggregate.ToPartial;
2426
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
@@ -120,7 +122,7 @@
120122
*
121123
* becomes
122124
*
123-
* TS k8s
125+
* FROM k8s
124126
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
125127
* | STATS sum(max_memory_usage) BY host_values, time_bucket
126128
*
@@ -129,7 +131,7 @@
129131
*
130132
* becomes
131133
*
132-
* TS k8s
134+
* FROM k8s
133135
* | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
134136
* | STATS sum(avg_memory_usage) BY host_values, time_bucket
135137
*
@@ -154,11 +156,15 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
154156
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
155157
List<NamedExpression> firstPassAggs = new ArrayList<>();
156158
List<NamedExpression> secondPassAggs = new ArrayList<>();
159+
Holder<Boolean> hasRateAggregates = new Holder<>(Boolean.FALSE);
157160
for (NamedExpression agg : aggregate.aggregates()) {
158161
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
159162
Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
160163
Expression outerAgg = af.transformDown(TimeSeriesAggregateFunction.class, tsAgg -> {
161164
changed.set(Boolean.TRUE);
165+
if (tsAgg instanceof Rate) {
166+
hasRateAggregates.set(Boolean.TRUE);
167+
}
162168
AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
163169
Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
164170
Alias firstStageAlias = new Alias(tsAgg.source(), agg.name(), firstStageFn);
@@ -231,16 +237,17 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
231237
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
232238
}
233239
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
240+
IndexMode indexMode = hasRateAggregates.get() ? r.indexMode() : IndexMode.STANDARD;
234241
if (r.output().contains(tsid.get()) == false) {
235242
return new EsRelation(
236243
r.source(),
237244
r.indexPattern(),
238-
r.indexMode(),
245+
indexMode,
239246
r.indexNameWithModes(),
240247
CollectionUtils.combine(r.output(), tsid.get())
241248
);
242249
} else {
243-
return r;
250+
return new EsRelation(r.source(), r.indexPattern(), indexMode, r.indexNameWithModes(), r.output());
244251
}
245252
});
246253
final var firstPhase = new TimeSeriesAggregate(

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6847,7 +6847,8 @@ public void testTranslateMixedAggsWithMathWithoutGrouping() {
68476847
Eval addEval = as(aggsByTsid.child(), Eval.class);
68486848
assertThat(addEval.fields(), hasSize(1));
68496849
Add add = as(Alias.unwrap(addEval.fields().get(0)), Add.class);
6850-
as(addEval.child(), EsRelation.class);
6850+
EsRelation relation = as(addEval.child(), EsRelation.class);
6851+
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
68516852

68526853
assertThat(Expressions.attribute(mul.left()).id(), equalTo(finalAggs.aggregates().get(1).id()));
68536854
assertThat(mul.right().fold(FoldContext.small()), equalTo(1.1));
@@ -6877,7 +6878,8 @@ public void testTranslateMetricsGroupedByOneDimension() {
68776878
TimeSeriesAggregate aggsByTsid = as(aggsByCluster.child(), TimeSeriesAggregate.class);
68786879
assertThat(aggsByTsid.aggregates(), hasSize(2)); // _tsid is dropped
68796880
assertNull(aggsByTsid.timeBucket());
6880-
as(aggsByTsid.child(), EsRelation.class);
6881+
EsRelation relation = as(aggsByTsid.child(), EsRelation.class);
6882+
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
68816883

68826884
Sum sum = as(Alias.unwrap(aggsByCluster.aggregates().get(0)), Sum.class);
68836885
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
@@ -6904,7 +6906,8 @@ public void testTranslateMetricsGroupedByTwoDimension() {
69046906
TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class);
69056907
assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped
69066908
assertNull(aggsByTsid.timeBucket());
6907-
as(aggsByTsid.child(), EsRelation.class);
6909+
EsRelation relation = as(aggsByTsid.child(), EsRelation.class);
6910+
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
69086911

69096912
Div div = as(Alias.unwrap(eval.fields().get(0)), Div.class);
69106913
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAggs.aggregates().get(0).id()));
@@ -6943,7 +6946,8 @@ public void testTranslateMetricsGroupedByTimeBucket() {
69436946
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
69446947
Eval eval = as(aggsByTsid.child(), Eval.class);
69456948
assertThat(eval.fields(), hasSize(1));
6946-
as(eval.child(), EsRelation.class);
6949+
EsRelation relation = as(eval.child(), EsRelation.class);
6950+
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
69476951

69486952
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
69496953
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
@@ -6977,7 +6981,8 @@ public void testTranslateMetricsGroupedByTimeBucketAndDimensions() {
69776981
assertNotNull(aggsByTsid.timeBucket());
69786982
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
69796983
Eval bucket = as(aggsByTsid.child(), Eval.class);
6980-
as(bucket.child(), EsRelation.class);
6984+
EsRelation relation = as(bucket.child(), EsRelation.class);
6985+
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
69816986
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
69826987
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
69836988

@@ -7018,7 +7023,8 @@ public void testTranslateMixedAggsGroupedByTimeBucketAndDimensions() {
70187023
assertNotNull(aggsByTsid.timeBucket());
70197024
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
70207025
Eval bucket = as(aggsByTsid.child(), Eval.class);
7021-
as(bucket.child(), EsRelation.class);
7026+
EsRelation relation = as(bucket.child(), EsRelation.class);
7027+
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
70227028
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
70237029
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
70247030

@@ -7082,7 +7088,8 @@ public void testAdjustMetricsRateBeforeFinalAgg() {
70827088
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
70837089
assertThat(evalBucket.fields(), hasSize(1));
70847090
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
7085-
as(evalBucket.child(), EsRelation.class);
7091+
EsRelation relation = as(evalBucket.child(), EsRelation.class);
7092+
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
70867093

70877094
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
70887095
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
@@ -7120,7 +7127,8 @@ public void testTranslateMaxOverTime() {
71207127
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
71217128
Eval eval = as(aggsByTsid.child(), Eval.class);
71227129
assertThat(eval.fields(), hasSize(1));
7123-
as(eval.child(), EsRelation.class);
7130+
EsRelation relation = as(eval.child(), EsRelation.class);
7131+
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
71247132

71257133
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
71267134
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
@@ -7149,7 +7157,8 @@ public void testTranslateAvgOverTime() {
71497157
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
71507158
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
71517159
assertThat(evalBucket.fields(), hasSize(1));
7152-
as(evalBucket.child(), EsRelation.class);
7160+
EsRelation relation = as(evalBucket.child(), EsRelation.class);
7161+
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
71537162

71547163
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
71557164
assertThat(Expressions.attribute(sum.field()).id(), equalTo(evalAvg.fields().get(0).id()));

0 commit comments

Comments
 (0)