Skip to content

Commit d65f34d

Browse files
authored
Push down field extraction to time-series source (#127445)
This change pushes down field extractions to the time-series source operator, providing these advantages: - Avoids building `DocVector` and its forward/backward maps. - Leverages the `DocValues` cache (i.e., blocks that are already decompressed/decoded) when loading values, which can be lost when reading blocks with the `ValuesSourceReaderOperator`. - Eliminates the need to rebuild blocks with backward mappings after reading values. The following query against the TSDB track previously took 19 seconds but was reduced to 13 seconds with this change: ``` TS tsdb | STATS sum(rate(kubernetes.container.memory.pagefaults)) by bucket(@timestamp, 5minute) ``` Note that with this change: ``` TS tsdb | STATS sum(rate(kubernetes.container.memory.pagefaults)) by bucket(@timestamp, 5minute) ``` now performs as well as: ``` FROM tsdb | STATS sum(last_over_time(kubernetes.container.memory.pagefaults)) by bucket(@timestamp, 5minute) ``` when using the shard level data partitioning. This means the performance of the TS command is comparable to the `FROM` command, except that it does not yet support segment-level or doc-level concurrency. I will try to add support for segment-level concurrency, as document-level partitioning is not useful when iterating over documents in order.
1 parent 637807c commit d65f34d

File tree

12 files changed

+637
-184
lines changed

12 files changed

+637
-184
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
package org.elasticsearch.compute.lucene;
99

1010
import org.apache.lucene.search.IndexSearcher;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.index.mapper.BlockLoader;
13+
import org.elasticsearch.index.mapper.MappedFieldType;
14+
import org.elasticsearch.index.mapper.SourceLoader;
1115
import org.elasticsearch.search.sort.SortAndFormats;
1216
import org.elasticsearch.search.sort.SortBuilder;
1317

@@ -39,4 +43,14 @@ public interface ShardContext {
3943
* {@code _cat/shards}.
4044
*/
4145
String shardIdentifier();
46+
47+
/**
48+
* Build something to load source {@code _source}.
49+
*/
50+
SourceLoader newSourceLoader();
51+
52+
/**
53+
* Returns something to load values from this field into a {@link Block}.
54+
*/
55+
BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);
4256
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

Lines changed: 275 additions & 73 deletions
Large diffs are not rendered by default.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -639,16 +639,44 @@ public String toString() {
639639
}
640640
}
641641

642-
private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory, Releasable {
643-
private final BlockFactory factory;
642+
private static class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
644643
private final int pageSize;
645644
private Block nullBlock;
646645

647646
private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
648-
this.factory = factory;
647+
super(factory);
649648
this.pageSize = pageSize;
650649
}
651650

651+
@Override
652+
public Block constantNulls() {
653+
if (nullBlock == null) {
654+
nullBlock = factory.newConstantNullBlock(pageSize);
655+
}
656+
nullBlock.incRef();
657+
return nullBlock;
658+
}
659+
660+
@Override
661+
public void close() {
662+
if (nullBlock != null) {
663+
nullBlock.close();
664+
}
665+
}
666+
667+
@Override
668+
public BytesRefBlock constantBytes(BytesRef value) {
669+
return factory.newConstantBytesRefBlockWith(value, pageSize);
670+
}
671+
}
672+
673+
public abstract static class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
674+
protected final BlockFactory factory;
675+
676+
protected DelegatingBlockLoaderFactory(BlockFactory factory) {
677+
this.factory = factory;
678+
}
679+
652680
@Override
653681
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
654682
return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
@@ -704,27 +732,6 @@ public BlockLoader.Builder nulls(int expectedCount) {
704732
return ElementType.NULL.newBlockBuilder(expectedCount, factory);
705733
}
706734

707-
@Override
708-
public Block constantNulls() {
709-
if (nullBlock == null) {
710-
nullBlock = factory.newConstantNullBlock(pageSize);
711-
}
712-
nullBlock.incRef();
713-
return nullBlock;
714-
}
715-
716-
@Override
717-
public void close() {
718-
if (nullBlock != null) {
719-
nullBlock.close();
720-
}
721-
}
722-
723-
@Override
724-
public BytesRefBlock constantBytes(BytesRef value) {
725-
return factory.newConstantBytesRefBlockWith(value, pageSize);
726-
}
727-
728735
@Override
729736
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
730737
return new SingletonOrdinalsBuilder(factory, ordinals, count);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import org.elasticsearch.core.IOUtils;
3636
import org.elasticsearch.core.TimeValue;
3737
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
38+
import org.elasticsearch.index.mapper.BlockLoader;
3839
import org.elasticsearch.index.mapper.MappedFieldType;
3940
import org.elasticsearch.index.mapper.NumberFieldMapper;
41+
import org.elasticsearch.index.mapper.SourceLoader;
4042
import org.elasticsearch.indices.CrankyCircuitBreakerService;
4143
import org.elasticsearch.search.internal.ContextIndexSearcher;
4244
import org.elasticsearch.search.sort.SortAndFormats;
@@ -299,6 +301,20 @@ public IndexSearcher searcher() {
299301
return searcher;
300302
}
301303

304+
@Override
305+
public SourceLoader newSourceLoader() {
306+
return SourceLoader.FROM_STORED_SOURCE;
307+
}
308+
309+
@Override
310+
public BlockLoader blockLoader(
311+
String name,
312+
boolean asUnsupportedSource,
313+
MappedFieldType.FieldExtractPreference fieldExtractPreference
314+
) {
315+
throw new UnsupportedOperationException();
316+
}
317+
302318
@Override
303319
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
304320
return Optional.empty();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
4545
import org.elasticsearch.index.mapper.DateFieldMapper;
4646
import org.elasticsearch.index.mapper.KeywordFieldMapper;
47+
import org.elasticsearch.index.mapper.MappedFieldType;
4748
import org.elasticsearch.index.mapper.NumberFieldMapper;
4849
import org.elasticsearch.index.mapper.RoutingPathFields;
4950
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
@@ -155,9 +156,12 @@ record Doc(int host, long timestamp, long metric) {}
155156
}
156157
int maxPageSize = between(1, 1024);
157158
int limit = randomBoolean() ? between(1, 100000) : Integer.MAX_VALUE;
159+
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
158160
var timeSeriesFactory = createTimeSeriesSourceOperator(
159161
directory,
160162
r -> this.reader = r,
163+
true,
164+
List.of(new ExtractField(metricField, ElementType.LONG)),
161165
limit,
162166
maxPageSize,
163167
randomBoolean(),
@@ -171,12 +175,11 @@ record Doc(int host, long timestamp, long metric) {}
171175
);
172176
DriverContext driverContext = driverContext();
173177
List<Page> results = new ArrayList<>();
174-
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
175178
OperatorTestCase.runDriver(
176179
TestDriverFactory.create(
177180
driverContext,
178181
timeSeriesFactory.get(driverContext),
179-
List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)),
182+
List.of(),
180183
new TestResultPageSinkOperator(results::add)
181184
)
182185
);
@@ -240,7 +243,9 @@ public void testMatchNone() throws Exception {
240243
Integer.MAX_VALUE,
241244
randomIntBetween(1, 1024),
242245
1,
246+
randomBoolean(),
243247
List.of(ctx),
248+
List.of(),
244249
unused -> query
245250
);
246251
var driverContext = driverContext();
@@ -260,7 +265,7 @@ public void testMatchNone() throws Exception {
260265

261266
@Override
262267
protected Operator.OperatorFactory simple() {
263-
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, false, writer -> {
268+
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, randomBoolean(), List.of(), 1, 1, false, writer -> {
264269
long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
265270
writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 });
266271
return 1;
@@ -279,9 +284,13 @@ protected Matcher<String> expectedToStringOfSimple() {
279284

280285
List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
281286
var ctx = driverContext();
287+
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
288+
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
282289
var timeSeriesFactory = createTimeSeriesSourceOperator(
283290
directory,
284291
indexReader -> this.reader = indexReader,
292+
true,
293+
List.of(new ExtractField(voltageField, ElementType.LONG), new ExtractField(hostnameField, ElementType.BYTES_REF)),
285294
limit,
286295
maxPageSize,
287296
forceMerge,
@@ -300,18 +309,8 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
300309
);
301310

302311
List<Page> results = new ArrayList<>();
303-
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
304-
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
305312
OperatorTestCase.runDriver(
306-
TestDriverFactory.create(
307-
ctx,
308-
timeSeriesFactory.get(ctx),
309-
List.of(
310-
ValuesSourceReaderOperatorTests.factory(reader, voltageField, ElementType.LONG).get(ctx),
311-
ValuesSourceReaderOperatorTests.factory(reader, hostnameField, ElementType.BYTES_REF).get(ctx)
312-
),
313-
new TestResultPageSinkOperator(results::add)
314-
)
313+
TestDriverFactory.create(ctx, timeSeriesFactory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add))
315314
);
316315
OperatorTestCase.assertDriverContext(ctx);
317316
for (Page result : results) {
@@ -321,9 +320,15 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
321320
return results;
322321
}
323322

323+
public record ExtractField(MappedFieldType ft, ElementType elementType) {
324+
325+
}
326+
324327
public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
325328
Directory directory,
326329
Consumer<IndexReader> readerConsumer,
330+
boolean emitDocIds,
331+
List<ExtractField> extractFields,
327332
int limit,
328333
int maxPageSize,
329334
boolean forceMerge,
@@ -354,7 +359,18 @@ public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperat
354359
}
355360
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
356361
Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
357-
return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction);
362+
363+
var fieldInfos = extractFields.stream()
364+
.map(
365+
f -> new ValuesSourceReaderOperator.FieldInfo(
366+
f.ft.name(),
367+
f.elementType,
368+
n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext())
369+
)
370+
)
371+
.toList();
372+
373+
return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction);
358374
}
359375

360376
public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft
495495
});
496496
}
497497

498-
private static MappedFieldType.BlockLoaderContext blContext() {
498+
public static MappedFieldType.BlockLoaderContext blContext() {
499499
return new MappedFieldType.BlockLoaderContext() {
500500
@Override
501501
public String indexName() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
14-
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource;
14+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushDownFieldExtractionToTimeSeriesSource;
1515
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
1616
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
1717
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
@@ -81,7 +81,7 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
8181
new InsertFieldExtraction(),
8282
new SpatialDocValuesExtraction(),
8383
new SpatialShapeBoundsExtraction(),
84-
new PushFieldExtractionToTimeSeriesSource()
84+
new PushDownFieldExtractionToTimeSeriesSource()
8585
);
8686
return List.of(pushdown, fieldExtraction);
8787
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
9+
10+
import org.elasticsearch.index.IndexMode;
11+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.core.util.Holder;
13+
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
14+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
15+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
16+
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
17+
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
18+
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
19+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
20+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
21+
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
22+
23+
import java.util.ArrayList;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Set;
27+
28+
/**
29+
* An optimization rule that pushes down field extractions to occur at the lowest filter, limit, or topN in the time-series source plan.
30+
* For example:
31+
* `TS index | WHERE host = 'a' AND cluster = 'b' | STATS max(rate(counter)) BY host, bucket(1minute)`
32+
* In this query, the extraction of the `host` and `cluster` fields will be pushed down to the time-series source,
33+
* while the extraction of the `counter` field will occur later. In such cases, the `doc_ids` still need to be returned
34+
* for the later extraction. However, if the filter (`host = 'a' AND cluster = 'b'`) is pushed down to Lucene, all field extractions
35+
* (e.g., `host` and `counter`) will be pushed down to the time-series source, and `doc_ids` will not be returned.
36+
*/
37+
public class PushDownFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
38+
PhysicalPlan,
39+
LocalPhysicalOptimizerContext> {
40+
41+
@Override
42+
public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) {
43+
if (plan.anyMatch(p -> p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) == false) {
44+
return plan;
45+
}
46+
final List<FieldExtractExec> pushDownExtracts = new ArrayList<>();
47+
final Holder<Boolean> keepDocIds = new Holder<>(Boolean.FALSE);
48+
plan.forEachDown(p -> {
49+
if (p instanceof FieldExtractExec) {
50+
pushDownExtracts.add((FieldExtractExec) p);
51+
} else if (stopPushDownExtract(p)) {
52+
if (pushDownExtracts.isEmpty() == false) {
53+
keepDocIds.set(Boolean.TRUE);
54+
pushDownExtracts.clear();
55+
}
56+
}
57+
});
58+
final Holder<Boolean> aborted = new Holder<>(Boolean.FALSE);
59+
return plan.transformUp(PhysicalPlan.class, p -> {
60+
if (aborted.get()) {
61+
return p;
62+
}
63+
if (p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) {
64+
return addFieldExtract(context, q, keepDocIds.get(), pushDownExtracts);
65+
}
66+
if (stopPushDownExtract(p)) {
67+
aborted.set(Boolean.TRUE);
68+
return p;
69+
}
70+
if (p instanceof FieldExtractExec e) {
71+
return e.child();
72+
}
73+
return p;
74+
});
75+
}
76+
77+
private static boolean stopPushDownExtract(PhysicalPlan p) {
78+
return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec;
79+
}
80+
81+
private TimeSeriesSourceExec addFieldExtract(
82+
LocalPhysicalOptimizerContext context,
83+
EsQueryExec query,
84+
boolean keepDocAttribute,
85+
List<FieldExtractExec> extracts
86+
) {
87+
Set<Attribute> docValuesAttributes = new HashSet<>();
88+
Set<Attribute> boundsAttributes = new HashSet<>();
89+
List<Attribute> attributesToExtract = new ArrayList<>();
90+
for (FieldExtractExec extract : extracts) {
91+
docValuesAttributes.addAll(extract.docValuesAttributes());
92+
boundsAttributes.addAll(extract.boundsAttributes());
93+
attributesToExtract.addAll(extract.attributesToExtract());
94+
}
95+
List<Attribute> attrs = query.attrs();
96+
if (keepDocAttribute == false) {
97+
attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList();
98+
}
99+
return new TimeSeriesSourceExec(
100+
query.source(),
101+
attrs,
102+
query.query(),
103+
query.limit(),
104+
context.configuration().pragmas().fieldExtractPreference(),
105+
docValuesAttributes,
106+
boundsAttributes,
107+
attributesToExtract,
108+
query.estimatedRowSize()
109+
);
110+
}
111+
}

0 commit comments

Comments
 (0)