Skip to content

Commit 741961e

Browse files
authored
Merge branch 'main' into optimize/createIndexWithAlias
2 parents 9169982 + d65f34d commit 741961e

File tree

12 files changed

+637
-184
lines changed

12 files changed

+637
-184
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
package org.elasticsearch.compute.lucene;
99

1010
import org.apache.lucene.search.IndexSearcher;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.index.mapper.BlockLoader;
13+
import org.elasticsearch.index.mapper.MappedFieldType;
14+
import org.elasticsearch.index.mapper.SourceLoader;
1115
import org.elasticsearch.search.sort.SortAndFormats;
1216
import org.elasticsearch.search.sort.SortBuilder;
1317

@@ -39,4 +43,14 @@ public interface ShardContext {
3943
* {@code _cat/shards}.
4044
*/
4145
String shardIdentifier();
46+
47+
/**
48+
* Build something to load source {@code _source}.
49+
*/
50+
SourceLoader newSourceLoader();
51+
52+
/**
53+
* Returns something to load values from this field into a {@link Block}.
54+
*/
55+
BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);
4256
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

Lines changed: 275 additions & 73 deletions
Large diffs are not rendered by default.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -639,16 +639,44 @@ public String toString() {
639639
}
640640
}
641641

642-
private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory, Releasable {
643-
private final BlockFactory factory;
642+
private static class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
644643
private final int pageSize;
645644
private Block nullBlock;
646645

647646
private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
648-
this.factory = factory;
647+
super(factory);
649648
this.pageSize = pageSize;
650649
}
651650

651+
@Override
652+
public Block constantNulls() {
653+
if (nullBlock == null) {
654+
nullBlock = factory.newConstantNullBlock(pageSize);
655+
}
656+
nullBlock.incRef();
657+
return nullBlock;
658+
}
659+
660+
@Override
661+
public void close() {
662+
if (nullBlock != null) {
663+
nullBlock.close();
664+
}
665+
}
666+
667+
@Override
668+
public BytesRefBlock constantBytes(BytesRef value) {
669+
return factory.newConstantBytesRefBlockWith(value, pageSize);
670+
}
671+
}
672+
673+
public abstract static class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
674+
protected final BlockFactory factory;
675+
676+
protected DelegatingBlockLoaderFactory(BlockFactory factory) {
677+
this.factory = factory;
678+
}
679+
652680
@Override
653681
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
654682
return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
@@ -704,27 +732,6 @@ public BlockLoader.Builder nulls(int expectedCount) {
704732
return ElementType.NULL.newBlockBuilder(expectedCount, factory);
705733
}
706734

707-
@Override
708-
public Block constantNulls() {
709-
if (nullBlock == null) {
710-
nullBlock = factory.newConstantNullBlock(pageSize);
711-
}
712-
nullBlock.incRef();
713-
return nullBlock;
714-
}
715-
716-
@Override
717-
public void close() {
718-
if (nullBlock != null) {
719-
nullBlock.close();
720-
}
721-
}
722-
723-
@Override
724-
public BytesRefBlock constantBytes(BytesRef value) {
725-
return factory.newConstantBytesRefBlockWith(value, pageSize);
726-
}
727-
728735
@Override
729736
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
730737
return new SingletonOrdinalsBuilder(factory, ordinals, count);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import org.elasticsearch.core.IOUtils;
3636
import org.elasticsearch.core.TimeValue;
3737
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
38+
import org.elasticsearch.index.mapper.BlockLoader;
3839
import org.elasticsearch.index.mapper.MappedFieldType;
3940
import org.elasticsearch.index.mapper.NumberFieldMapper;
41+
import org.elasticsearch.index.mapper.SourceLoader;
4042
import org.elasticsearch.indices.CrankyCircuitBreakerService;
4143
import org.elasticsearch.search.internal.ContextIndexSearcher;
4244
import org.elasticsearch.search.sort.SortAndFormats;
@@ -299,6 +301,20 @@ public IndexSearcher searcher() {
299301
return searcher;
300302
}
301303

304+
@Override
305+
public SourceLoader newSourceLoader() {
306+
return SourceLoader.FROM_STORED_SOURCE;
307+
}
308+
309+
@Override
310+
public BlockLoader blockLoader(
311+
String name,
312+
boolean asUnsupportedSource,
313+
MappedFieldType.FieldExtractPreference fieldExtractPreference
314+
) {
315+
throw new UnsupportedOperationException();
316+
}
317+
302318
@Override
303319
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
304320
return Optional.empty();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
4545
import org.elasticsearch.index.mapper.DateFieldMapper;
4646
import org.elasticsearch.index.mapper.KeywordFieldMapper;
47+
import org.elasticsearch.index.mapper.MappedFieldType;
4748
import org.elasticsearch.index.mapper.NumberFieldMapper;
4849
import org.elasticsearch.index.mapper.RoutingPathFields;
4950
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
@@ -155,9 +156,12 @@ record Doc(int host, long timestamp, long metric) {}
155156
}
156157
int maxPageSize = between(1, 1024);
157158
int limit = randomBoolean() ? between(1, 100000) : Integer.MAX_VALUE;
159+
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
158160
var timeSeriesFactory = createTimeSeriesSourceOperator(
159161
directory,
160162
r -> this.reader = r,
163+
true,
164+
List.of(new ExtractField(metricField, ElementType.LONG)),
161165
limit,
162166
maxPageSize,
163167
randomBoolean(),
@@ -171,12 +175,11 @@ record Doc(int host, long timestamp, long metric) {}
171175
);
172176
DriverContext driverContext = driverContext();
173177
List<Page> results = new ArrayList<>();
174-
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
175178
OperatorTestCase.runDriver(
176179
TestDriverFactory.create(
177180
driverContext,
178181
timeSeriesFactory.get(driverContext),
179-
List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)),
182+
List.of(),
180183
new TestResultPageSinkOperator(results::add)
181184
)
182185
);
@@ -240,7 +243,9 @@ public void testMatchNone() throws Exception {
240243
Integer.MAX_VALUE,
241244
randomIntBetween(1, 1024),
242245
1,
246+
randomBoolean(),
243247
List.of(ctx),
248+
List.of(),
244249
unused -> query
245250
);
246251
var driverContext = driverContext();
@@ -260,7 +265,7 @@ public void testMatchNone() throws Exception {
260265

261266
@Override
262267
protected Operator.OperatorFactory simple() {
263-
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, false, writer -> {
268+
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, randomBoolean(), List.of(), 1, 1, false, writer -> {
264269
long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
265270
writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 });
266271
return 1;
@@ -279,9 +284,13 @@ protected Matcher<String> expectedToStringOfSimple() {
279284

280285
List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
281286
var ctx = driverContext();
287+
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
288+
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
282289
var timeSeriesFactory = createTimeSeriesSourceOperator(
283290
directory,
284291
indexReader -> this.reader = indexReader,
292+
true,
293+
List.of(new ExtractField(voltageField, ElementType.LONG), new ExtractField(hostnameField, ElementType.BYTES_REF)),
285294
limit,
286295
maxPageSize,
287296
forceMerge,
@@ -300,18 +309,8 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
300309
);
301310

302311
List<Page> results = new ArrayList<>();
303-
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
304-
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
305312
OperatorTestCase.runDriver(
306-
TestDriverFactory.create(
307-
ctx,
308-
timeSeriesFactory.get(ctx),
309-
List.of(
310-
ValuesSourceReaderOperatorTests.factory(reader, voltageField, ElementType.LONG).get(ctx),
311-
ValuesSourceReaderOperatorTests.factory(reader, hostnameField, ElementType.BYTES_REF).get(ctx)
312-
),
313-
new TestResultPageSinkOperator(results::add)
314-
)
313+
TestDriverFactory.create(ctx, timeSeriesFactory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add))
315314
);
316315
OperatorTestCase.assertDriverContext(ctx);
317316
for (Page result : results) {
@@ -321,9 +320,15 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
321320
return results;
322321
}
323322

323+
public record ExtractField(MappedFieldType ft, ElementType elementType) {
324+
325+
}
326+
324327
public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
325328
Directory directory,
326329
Consumer<IndexReader> readerConsumer,
330+
boolean emitDocIds,
331+
List<ExtractField> extractFields,
327332
int limit,
328333
int maxPageSize,
329334
boolean forceMerge,
@@ -354,7 +359,18 @@ public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperat
354359
}
355360
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
356361
Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
357-
return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction);
362+
363+
var fieldInfos = extractFields.stream()
364+
.map(
365+
f -> new ValuesSourceReaderOperator.FieldInfo(
366+
f.ft.name(),
367+
f.elementType,
368+
n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext())
369+
)
370+
)
371+
.toList();
372+
373+
return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction);
358374
}
359375

360376
public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft
495495
});
496496
}
497497

498-
private static MappedFieldType.BlockLoaderContext blContext() {
498+
public static MappedFieldType.BlockLoaderContext blContext() {
499499
return new MappedFieldType.BlockLoaderContext() {
500500
@Override
501501
public String indexName() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
14-
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource;
14+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushDownFieldExtractionToTimeSeriesSource;
1515
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
1616
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
1717
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
@@ -81,7 +81,7 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
8181
new InsertFieldExtraction(),
8282
new SpatialDocValuesExtraction(),
8383
new SpatialShapeBoundsExtraction(),
84-
new PushFieldExtractionToTimeSeriesSource()
84+
new PushDownFieldExtractionToTimeSeriesSource()
8585
);
8686
return List.of(pushdown, fieldExtraction);
8787
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
9+
10+
import org.elasticsearch.index.IndexMode;
11+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.core.util.Holder;
13+
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
14+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
15+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
16+
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
17+
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
18+
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
19+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
20+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
21+
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
22+
23+
import java.util.ArrayList;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Set;
27+
28+
/**
29+
* An optimization rule that pushes down field extractions to occur at the lowest filter, limit, or topN in the time-series source plan.
30+
* For example:
31+
* `TS index | WHERE host = 'a' AND cluster = 'b' | STATS max(rate(counter)) BY host, bucket(1minute)`
32+
* In this query, the extraction of the `host` and `cluster` fields will be pushed down to the time-series source,
33+
* while the extraction of the `counter` field will occur later. In such cases, the `doc_ids` still need to be returned
34+
* for the later extraction. However, if the filter (`host = 'a' AND cluster = 'b'`) is pushed down to Lucene, all field extractions
35+
* (e.g., `host` and `counter`) will be pushed down to the time-series source, and `doc_ids` will not be returned.
36+
*/
37+
public class PushDownFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
38+
PhysicalPlan,
39+
LocalPhysicalOptimizerContext> {
40+
41+
@Override
42+
public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) {
43+
if (plan.anyMatch(p -> p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) == false) {
44+
return plan;
45+
}
46+
final List<FieldExtractExec> pushDownExtracts = new ArrayList<>();
47+
final Holder<Boolean> keepDocIds = new Holder<>(Boolean.FALSE);
48+
plan.forEachDown(p -> {
49+
if (p instanceof FieldExtractExec) {
50+
pushDownExtracts.add((FieldExtractExec) p);
51+
} else if (stopPushDownExtract(p)) {
52+
if (pushDownExtracts.isEmpty() == false) {
53+
keepDocIds.set(Boolean.TRUE);
54+
pushDownExtracts.clear();
55+
}
56+
}
57+
});
58+
final Holder<Boolean> aborted = new Holder<>(Boolean.FALSE);
59+
return plan.transformUp(PhysicalPlan.class, p -> {
60+
if (aborted.get()) {
61+
return p;
62+
}
63+
if (p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) {
64+
return addFieldExtract(context, q, keepDocIds.get(), pushDownExtracts);
65+
}
66+
if (stopPushDownExtract(p)) {
67+
aborted.set(Boolean.TRUE);
68+
return p;
69+
}
70+
if (p instanceof FieldExtractExec e) {
71+
return e.child();
72+
}
73+
return p;
74+
});
75+
}
76+
77+
private static boolean stopPushDownExtract(PhysicalPlan p) {
78+
return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec;
79+
}
80+
81+
private TimeSeriesSourceExec addFieldExtract(
82+
LocalPhysicalOptimizerContext context,
83+
EsQueryExec query,
84+
boolean keepDocAttribute,
85+
List<FieldExtractExec> extracts
86+
) {
87+
Set<Attribute> docValuesAttributes = new HashSet<>();
88+
Set<Attribute> boundsAttributes = new HashSet<>();
89+
List<Attribute> attributesToExtract = new ArrayList<>();
90+
for (FieldExtractExec extract : extracts) {
91+
docValuesAttributes.addAll(extract.docValuesAttributes());
92+
boundsAttributes.addAll(extract.boundsAttributes());
93+
attributesToExtract.addAll(extract.attributesToExtract());
94+
}
95+
List<Attribute> attrs = query.attrs();
96+
if (keepDocAttribute == false) {
97+
attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList();
98+
}
99+
return new TimeSeriesSourceExec(
100+
query.source(),
101+
attrs,
102+
query.query(),
103+
query.limit(),
104+
context.configuration().pragmas().fieldExtractPreference(),
105+
docValuesAttributes,
106+
boundsAttributes,
107+
attributesToExtract,
108+
query.estimatedRowSize()
109+
);
110+
}
111+
}

0 commit comments

Comments
 (0)