Skip to content

Commit f708168

Browse files
committed
ESQL: Add GROUP BY ALL
Add output of the dimension list into the _timeseries column. Part of #136253
1 parent 2314d5d commit f708168

File tree

9 files changed

+156
-148
lines changed

9 files changed

+156
-148
lines changed

server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ default BlockLoaderFunctionConfig blockLoaderFunctionConfig() {
718718
* Returns a list of dimension field names from a MappingLookup.
719719
*/
720720
@Nullable
721-
default List<String> dimensionFields() {
721+
default Set<String> dimensionFields() {
722722
return null;
723723
}
724724
}

server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.index.IndexVersion;
3030
import org.elasticsearch.index.IndexVersions;
3131
import org.elasticsearch.index.engine.SearchBasedChangesSnapshot;
32+
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
3233
import org.elasticsearch.index.query.QueryShardException;
3334
import org.elasticsearch.index.query.SearchExecutionContext;
3435
import org.elasticsearch.search.fetch.FetchContext;
@@ -366,6 +367,10 @@ public Query termQuery(Object value, SearchExecutionContext context) {
366367
@Override
367368
public BlockLoader blockLoader(BlockLoaderContext blContext) {
368369
if (enabled) {
370+
BlockLoaderFunctionConfig config = blContext.blockLoaderFunctionConfig();
371+
if (config != null && config.function() == BlockLoaderFunctionConfig.Function.TIME_SERIES_DIMENSIONS) {
372+
return new TimeSeriesMetadataFieldBlockLoader(blContext.dimensionFields());
373+
}
369374
return new SourceFieldBlockLoader();
370375
}
371376
return BlockLoader.CONSTANT_NULLS;

server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesDimensionsFieldMapper.java

Lines changed: 0 additions & 99 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesDimensionsFieldBlockLoader.java renamed to server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesMetadataFieldBlockLoader.java

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@
1111

1212
import org.apache.lucene.index.LeafReaderContext;
1313
import org.apache.lucene.index.SortedSetDocValues;
14-
import org.elasticsearch.common.Strings;
15-
import org.elasticsearch.common.bytes.BytesReference;
1614
import org.elasticsearch.search.fetch.StoredFieldsSpec;
17-
import org.elasticsearch.search.lookup.SourceFilter;
1815

1916
import java.io.IOException;
20-
import java.util.List;
17+
import java.util.Collections;
18+
import java.util.Set;
2119

22-
public final class TimeSeriesDimensionsFieldBlockLoader implements BlockLoader {
23-
private final List<String> fieldNames;
20+
/**
21+
* Load {@code _timeseries} into blocks.
22+
*/
23+
public final class TimeSeriesMetadataFieldBlockLoader implements BlockLoader {
24+
25+
private final Set<String> includes;
2426

25-
public TimeSeriesDimensionsFieldBlockLoader(List<String> fieldNames) {
26-
this.fieldNames = fieldNames;
27+
public TimeSeriesMetadataFieldBlockLoader(Set<String> includes) {
28+
this.includes = Collections.unmodifiableSet(includes);
2729
}
2830

2931
@Override
@@ -38,12 +40,12 @@ public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) {
3840

3941
@Override
4042
public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException {
41-
return new TimeSeriesSource(new SourceFilter(fieldNames.toArray(Strings.EMPTY_ARRAY), null));
43+
return new Source();
4244
}
4345

4446
@Override
4547
public StoredFieldsSpec rowStrideStoredFieldSpec() {
46-
return new StoredFieldsSpec(true, false, java.util.Set.of());
48+
return StoredFieldsSpec.withSourcePaths(IgnoredSourceFieldMapper.IgnoredSourceFormat.COALESCED_SINGLE_IGNORED_SOURCE, includes);
4749
}
4850

4951
@Override
@@ -56,36 +58,16 @@ public SortedSetDocValues ordinals(LeafReaderContext context) {
5658
throw new UnsupportedOperationException();
5759
}
5860

59-
private static class TimeSeriesSource extends BlockStoredFieldsReader {
60-
private final SourceFilter sourceFilter;
61-
62-
TimeSeriesSource(SourceFilter sourceFilter) {
63-
this.sourceFilter = sourceFilter;
64-
}
65-
61+
private static class Source extends BlockStoredFieldsReader {
6662
@Override
6763
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
68-
org.elasticsearch.search.lookup.Source source = storedFields.source();
69-
if (source == null) {
70-
builder.appendNull();
71-
return;
72-
}
73-
74-
if (sourceFilter != null) {
75-
source = source.filter(sourceFilter);
76-
}
77-
78-
BytesReference sourceRef = source.internalSourceRef();
79-
if (sourceRef == null) {
80-
builder.appendNull();
81-
} else {
82-
((BytesRefBuilder) builder).appendBytesRef(sourceRef.toBytesRef());
83-
}
64+
// TODO support appending BytesReference
65+
((BytesRefBuilder) builder).appendBytesRef(storedFields.source().internalSourceRef().toBytesRef());
8466
}
8567

8668
@Override
8769
public String toString() {
88-
return "BlockStoredFieldsReader.TimeSeriesSource";
70+
return "BlockStoredFieldsReader.Source";
8971
}
9072
}
9173
}

server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,6 @@ enum Function {
5353
V_HAMMING,
5454
V_L1NORM,
5555
V_L2NORM,
56+
TIME_SERIES_DIMENSIONS
5657
}
5758
}

server/src/main/java/org/elasticsearch/indices/IndicesModule.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
6464
import org.elasticsearch.index.mapper.SourceFieldMapper;
6565
import org.elasticsearch.index.mapper.TextFieldMapper;
66-
import org.elasticsearch.index.mapper.TimeSeriesDimensionsFieldMapper;
6766
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
6867
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
6968
import org.elasticsearch.index.mapper.VersionFieldMapper;
@@ -290,7 +289,6 @@ private static Map<String, MetadataFieldMapper.TypeParser> initBuiltInMetadataMa
290289
builtInMetadataMappers.put(IndexFieldMapper.NAME, IndexFieldMapper.PARSER);
291290
builtInMetadataMappers.put(IndexModeFieldMapper.NAME, IndexModeFieldMapper.PARSER);
292291
builtInMetadataMappers.put(SourceFieldMapper.NAME, SourceFieldMapper.PARSER);
293-
builtInMetadataMappers.put(TimeSeriesDimensionsFieldMapper.NAME, TimeSeriesDimensionsFieldMapper.PARSER);
294292
builtInMetadataMappers.put(IgnoredSourceFieldMapper.NAME, IgnoredSourceFieldMapper.PARSER);
295293
builtInMetadataMappers.put(NestedPathFieldMapper.NAME, NestedPathFieldMapper.PARSER);
296294
builtInMetadataMappers.put(VersionFieldMapper.NAME, VersionFieldMapper.PARSER);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
12+
import org.junit.Before;
13+
14+
import java.util.List;
15+
16+
import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
17+
18+
public class ExtractDimensionFieldsAfterAggregationIT extends AbstractEsqlIntegTestCase {
19+
20+
@Before
21+
public void setupIndex() {
22+
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
23+
24+
client().admin()
25+
.indices()
26+
.prepareCreate("metrics")
27+
.setSettings(settings)
28+
.setMapping(
29+
"@timestamp",
30+
"type=date",
31+
"host",
32+
"type=keyword,time_series_dimension=true",
33+
"cluster",
34+
"type=keyword,time_series_dimension=true",
35+
"cpu",
36+
"type=double,time_series_metric=gauge",
37+
"memory",
38+
"type=long,time_series_metric=gauge",
39+
"request_count",
40+
"type=integer,time_series_metric=counter"
41+
)
42+
.get();
43+
44+
long baseTimestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
45+
46+
String[] hosts = { "host-1", "host-2", "host-3" };
47+
String[] clusters = { "prod", "qa" };
48+
int timeBuckets = 5;
49+
50+
for (String host : hosts) {
51+
for (String cluster : clusters) {
52+
for (int bucket = 0; bucket < timeBuckets; bucket++) {
53+
long timestamp = baseTimestamp + (bucket * 3600_000L);
54+
55+
client().prepareIndex("metrics")
56+
.setSource(
57+
"@timestamp",
58+
timestamp,
59+
"host",
60+
host,
61+
"cluster",
62+
cluster,
63+
"cpu",
64+
50.0 + bucket,
65+
"memory",
66+
1024L * (bucket + 1),
67+
"request_count",
68+
bucket * 100
69+
)
70+
.get();
71+
}
72+
}
73+
}
74+
75+
client().admin().indices().prepareRefresh("metrics").get();
76+
}
77+
78+
public void test() {
79+
String query = "TS metrics | STATS sum_over_time(cpu) BY tbucket(1h)";
80+
81+
try (EsqlQueryResponse resp = run(query)) {
82+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
83+
for (List<Object> row : rows) {
84+
System.out.println(" " + row);
85+
}
86+
}
87+
}
88+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ExtractDimensionFieldsAfterAggregation.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,18 @@
88
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
99

1010
import org.elasticsearch.compute.aggregation.AggregatorMode;
11+
import org.elasticsearch.index.mapper.SourceFieldMapper;
12+
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
1113
import org.elasticsearch.xpack.esql.core.expression.Alias;
1214
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1315
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
16+
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1417
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
1518
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1619
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
20+
import org.elasticsearch.xpack.esql.core.type.DataType;
21+
import org.elasticsearch.xpack.esql.core.type.EsField;
22+
import org.elasticsearch.xpack.esql.core.type.FunctionEsField;
1723
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
1824
import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues;
1925
import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstDocId;
@@ -30,6 +36,7 @@
3036
import java.util.ArrayList;
3137
import java.util.HashSet;
3238
import java.util.List;
39+
import java.util.Map;
3340
import java.util.Set;
3441

3542
/**
@@ -86,8 +93,32 @@ private PhysicalPlan rule(TimeSeriesAggregateExec oldAgg, LocalPhysicalOptimizer
8693
throw new IllegalStateException("expected one intermediate attribute for [" + af + "] but got [" + size + "]");
8794
}
8895
Attribute oldAttr = oldIntermediates.get(intermediateOffset);
89-
aliases.add(new Alias(agg.source(), agg.name(), dimensionField, oldAttr.id()));
90-
dimensionFields.add(dimensionField);
96+
if (dimensionField.name().equals(MetadataAttribute.TIMESERIES)) {
97+
var sourceField = new FieldAttribute(
98+
dimensionField.source(),
99+
null,
100+
dimensionField.qualifier(),
101+
dimensionField.name(),
102+
new FunctionEsField(
103+
new EsField(
104+
SourceFieldMapper.NAME,
105+
DataType.KEYWORD,
106+
Map.of(),
107+
false,
108+
EsField.TimeSeriesFieldType.DIMENSION
109+
),
110+
DataType.KEYWORD,
111+
new BlockLoaderFunctionConfig.JustFunction(BlockLoaderFunctionConfig.Function.TIME_SERIES_DIMENSIONS)
112+
),
113+
true // Mark as synthetic so fieldName() returns _source instead of _timeseries
114+
);
115+
// _timeseries = to_string(_tsid)
116+
// var timeSeries = new ToBase64(dimensionField.source(), tsidField(oldAgg));
117+
aliases.add(new Alias(agg.source(), agg.name(), sourceField, oldAttr.id()));
118+
} else {
119+
aliases.add(new Alias(agg.source(), agg.name(), dimensionField, oldAttr.id()));
120+
dimensionFields.add(dimensionField);
121+
}
91122
} else {
92123
for (int i = 0; i < size; i++) {
93124
newIntermediates.add(oldIntermediates.get(intermediateOffset + i));
@@ -142,5 +173,4 @@ private static Attribute valuesOfDimensionField(AggregateFunction af, AttributeS
142173
private static int intermediateStateSize(AggregateFunction af) {
143174
return AggregateMapper.intermediateStateDesc(af, true).size();
144175
}
145-
146176
}

0 commit comments

Comments
 (0)