From 442796442af23aacf0898ca4d1a2399743c822a5 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 24 Apr 2025 11:21:20 -0400 Subject: [PATCH 1/5] ESQL: Speed loading stored fields This speeds up loading from stored fields by opting more blocks into the "sequential" strategy. This really kicks in when loading stored fields like `text`. And when you need less than 100% of documents, but more than, say, 10%. This is most useful when you need 99.9% of field documents. That sort of thing. Here's the perf numbers: ``` %100.0 {"took": 403 -> 401,"documents_found":1000000} %099.9 {"took":3990 -> 436,"documents_found": 999000} %099.0 {"took":4069 -> 440,"documents_found": 990000} %090.0 {"took":3468 -> 421,"documents_found": 900000} %030.0 {"took":1213 -> 152,"documents_found": 300000} %020.0 {"took": 766 -> 104,"documents_found": 200000} %010.0 {"took": 397 -> 55,"documents_found": 100000} %009.0 {"took": 352 -> 375,"documents_found": 90000} %008.0 {"took": 304 -> 317,"documents_found": 80000} %007.0 {"took": 273 -> 287,"documents_found": 70000} %005.0 {"took": 199 -> 204,"documents_found": 50000} %001.0 {"took": 46 -> 46,"documents_found": 10000} ``` Let's explain this with an example. First, jump to `main` and load a million documents: ``` rm -f /tmp/bulk for a in {1..1000}; do echo '{"index":{}}' >> /tmp/bulk echo '{"text":"text '$(printf %04d $a)'"}' >> /tmp/bulk done curl -s -uelastic:password -HContent-Type:application/json -XDELETE localhost:9200/test for a in {1..1000}; do echo -n $a: curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_bulk?pretty --data-binary @/tmp/bulk | grep errors done curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_forcemerge?max_num_segments=1 curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_refresh echo ``` Now query them all. Run this a few times until it's stable: ``` echo -n "%100.0 " curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' ``` Now fetch 99.9% of documents: ``` echo -n "%099.9 " curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' ``` This should spit out something like: ``` %100.0 { "took":403,"documents_found":1000000} %099.9 {"took":4098, "documents_found":999000} ``` We're loading *fewer* documents but it's slower! What in the world?! If you dig into the profile you'll see that it's value loading: ``` $ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" }, "profile": true }' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))' { "operator": "ValuesSourceReaderOperator[fields = [text]]", "status": { "readers_built": { "stored_fields[requires_source:true, fields:0, sequential: true]": 222, "text:column_at_a_time:null": 222, "text:row_stride:BlockSourceReader.Bytes": 1 }, "values_loaded": 1000000, "process_nanos": 370687157, "pages_processed": 222, "rows_received": 1000000, "rows_emitted": 1000000 } } $ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" }, "profile": true }' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))' { "operator": "ValuesSourceReaderOperator[fields = [text]]", "status": { "readers_built": { "stored_fields[requires_source:true, fields:0, sequential: false]": 222, "text:column_at_a_time:null": 222, "text:row_stride:BlockSourceReader.Bytes": 1 }, "values_loaded": 999000, "process_nanos": 3965803793, "pages_processed": 222, "rows_received": 999000, "rows_emitted": 999000 } } ``` It jumps from 370ms to almost four seconds! Loading fewer values! The second big difference is in the `stored_fields` marker. In the second on it's `sequential: false` and in the first `sequential: true`. `sequential: true` uses Lucene's "merge" stored fields reader instead of the default one. It's much more optimized at decoding sequences of documents. Previously we only enabled this reader when loading compact sequences of documents - when the entire block looks like ``` 1, 2, 3, 4, 5, ... 1230, 1231 ``` If there are any gaps we wouldn't enable it. That was a very conservative thing we did long ago without doing any experiments. We knew it was faster without any gaps, but not otherwise. It turns out it's a lot faster in a lot more cases. I've measured it as faster for 99% gaps, at least on simple documents. I'm a bit worried that this is too aggressive, so I've set made it configurable and made the default being to use the "merge" loader with 10% gaps. So we'd use the merge loader with a block like: ``` 1, 11, 21, 31, ..., 1231, 1241 ``` --- .../operator/ValuesSourceReaderBenchmark.java | 5 ++- .../lucene/ValuesSourceReaderOperator.java | 28 +++++++++++++--- .../operator/OrdinalsGroupingOperator.java | 14 ++++++-- .../elasticsearch/compute/OperatorTests.java | 1 + .../esql/enrich/AbstractLookupService.java | 3 +- .../planner/EsPhysicalOperationProviders.java | 13 ++++++-- .../xpack/esql/plugin/ComputeService.java | 14 ++++---- .../xpack/esql/plugin/QueryPragmas.java | 33 +++++++++++++++++++ .../optimizer/PhysicalPlanOptimizerTests.java | 2 +- .../planner/LocalExecutionPlannerTests.java | 2 +- 10 files changed, 96 insertions(+), 19 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java index 4469a2c01623d..daa27110bcc11 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.NumericUtils; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; @@ -50,6 +51,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -336,7 +338,8 @@ public void benchmark() { List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException("can't load _source here"); })), - 0 + 0, + QueryPragmas.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY) ); long sum = 0; for (Page page : pages) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 8d2465d4664f8..f376d663a70f0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -72,10 +72,18 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator { * @param shardContexts per-shard loading information * @param docChannel the channel containing the shard, leaf/segment and doc id */ - public record Factory(List fields, List shardContexts, int docChannel) implements OperatorFactory { + public record Factory(List fields, List shardContexts, int docChannel, double storedFieldsSequentialProportion) + implements + OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel); + return new ValuesSourceReaderOperator( + driverContext.blockFactory(), + fields, + shardContexts, + docChannel, + storedFieldsSequentialProportion + ); } @Override @@ -113,6 +121,7 @@ public record ShardContext(IndexReader reader, Supplier newSourceL private final List shardContexts; private final int docChannel; private final BlockFactory blockFactory; + private final double storedFieldsSequentialProportion; private final Map readersBuilt = new TreeMap<>(); private long valuesLoaded; @@ -125,11 +134,18 @@ public record ShardContext(IndexReader reader, Supplier newSourceL * @param fields fields to load * @param docChannel the channel containing the shard, leaf/segment and doc id */ - public ValuesSourceReaderOperator(BlockFactory blockFactory, List fields, List shardContexts, int docChannel) { + public ValuesSourceReaderOperator( + BlockFactory blockFactory, + List fields, + List shardContexts, + int docChannel, + double storedFieldsSequentialProportion + ) { this.fields = fields.stream().map(f -> new FieldWork(f)).toArray(FieldWork[]::new); this.shardContexts = shardContexts; this.docChannel = docChannel; this.blockFactory = blockFactory; + this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; } @Override @@ -440,7 +456,11 @@ public void close() { */ private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) { int count = docs.count(); - return count >= SEQUENTIAL_BOUNDARY && docs.get(count - 1) - docs.get(0) == count - 1; + if (count < SEQUENTIAL_BOUNDARY) { + return false; + } + int range = docs.get(count - 1) - docs.get(0); + return range * storedFieldsSequentialProportion < count - 1; } private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index c030b329dd2d8..35202d49654ea 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -63,7 +63,8 @@ public record OrdinalsGroupingOperatorFactory( int docChannel, String groupingField, List aggregators, - int maxPageSize + int maxPageSize, + double storedFieldsSequentialProportion ) implements OperatorFactory { @Override @@ -76,6 +77,7 @@ public Operator get(DriverContext driverContext) { groupingField, aggregators, maxPageSize, + storedFieldsSequentialProportion, driverContext ); } @@ -94,6 +96,7 @@ public String describe() { private final List aggregatorFactories; private final ElementType groupingElementType; private final Map ordinalAggregators; + private final double storedFieldsSequentialProportion; private final DriverContext driverContext; @@ -111,6 +114,7 @@ public OrdinalsGroupingOperator( String groupingField, List aggregatorFactories, int maxPageSize, + double storedFieldsSequentialProportion, DriverContext driverContext ) { Objects.requireNonNull(aggregatorFactories); @@ -122,6 +126,7 @@ public OrdinalsGroupingOperator( this.aggregatorFactories = aggregatorFactories; this.ordinalAggregators = new HashMap<>(); this.maxPageSize = maxPageSize; + this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; this.driverContext = driverContext; } @@ -171,6 +176,7 @@ public void addInput(Page page) { channelIndex, aggregatorFactories, maxPageSize, + storedFieldsSequentialProportion, driverContext ); } @@ -485,6 +491,7 @@ boolean next() throws IOException { private static class ValuesAggregator implements Releasable { private final ValuesSourceReaderOperator extractor; private final HashAggregationOperator aggregator; + private final double storedFieldsSequentialProportion; ValuesAggregator( IntFunction blockLoaders, @@ -495,13 +502,15 @@ private static class ValuesAggregator implements Releasable { int channelIndex, List aggregatorFactories, int maxPageSize, + double storedFieldsSequentialProportion, DriverContext driverContext ) { this.extractor = new ValuesSourceReaderOperator( driverContext.blockFactory(), List.of(new ValuesSourceReaderOperator.FieldInfo(groupingField, groupingElementType, blockLoaders)), shardContexts, - docChannel + docChannel, + storedFieldsSequentialProportion ); this.aggregator = new HashAggregationOperator( aggregatorFactories, @@ -513,6 +522,7 @@ private static class ValuesAggregator implements Releasable { ), driverContext ); + this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; } void addInput(Page page) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 8ec54895025ea..7187c45c7adab 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -204,6 +204,7 @@ public String toString() { gField, List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(INITIAL, List.of(1))), randomPageSize(), + 0.1, driverContext ) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 34df5c0a6368e..90d9e095e1e1f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -409,7 +409,8 @@ private static Operator extractFieldsOperator( driverContext.blockFactory(), fields, List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)), - 0 + 0, + 0.1 ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index a93e2f16c075b..b8a89245f14cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -103,16 +103,19 @@ public interface ShardContext extends org.elasticsearch.compute.lucene.ShardCont private final List shardContexts; private final DataPartitioning defaultDataPartitioning; + private final double storedFieldsSequentialProportion; public EsPhysicalOperationProviders( FoldContext foldContext, List shardContexts, AnalysisRegistry analysisRegistry, - DataPartitioning defaultDataPartitioning + DataPartitioning defaultDataPartitioning, + double storedFieldsSequentialProportion ) { super(foldContext, analysisRegistry); this.shardContexts = shardContexts; this.defaultDataPartitioning = defaultDataPartitioning; + this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; } @Override @@ -132,7 +135,10 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi IntFunction loader = s -> getBlockLoaderFor(s, attr, fieldExtractPreference); fields.add(new ValuesSourceReaderOperator.FieldInfo(getFieldName(attr), elementType, loader)); } - return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build()); + return source.with( + new ValuesSourceReaderOperator.Factory(fields, readers, docChannel, storedFieldsSequentialProportion), + layout.build() + ); } private static String getFieldName(Attribute attr) { @@ -278,7 +284,8 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory( docChannel, attrSource.name(), aggregatorFactories, - context.pageSize(aggregateExec.estimatedRowSize()) + context.pageSize(aggregateExec.estimatedRowSize()), + storedFieldsSequentialProportion ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index e8bab076e1d01..4a0df7f15fce8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -535,6 +535,13 @@ public SourceProvider createSourceProvider() { new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter()) ); } + EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders( + context.foldCtx(), + contexts, + searchService.getIndicesService().getAnalysis(), + defaultDataPartitioning, + context.configuration().pragmas().storedFieldsSequentialProportion() + ); final List drivers; try { LocalExecutionPlanner planner = new LocalExecutionPlanner( @@ -550,12 +557,7 @@ public SourceProvider createSourceProvider() { enrichLookupService, lookupFromIndexService, inferenceRunner, - new EsPhysicalOperationProviders( - context.foldCtx(), - contexts, - searchService.getIndicesService().getAnalysis(), - defaultDataPartitioning - ), + physicalOperationProviders, contexts ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index b0a99abf14288..56d2a9e6ef219 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -73,6 +73,27 @@ public final class QueryPragmas implements Writeable { public static final Setting FOLD_LIMIT = Setting.memorySizeSetting("fold_limit", "5%"); + /** + * Tuning parameter for deciding when to use the "merge" stored field loader. + * Think of it as "how similar to a sequential block of documents do I have to + * be before I'll use the merge reader?" So a value of {@code 1} means I have to + * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. + * A value of {@code .1} means we'll use the sequential reader even if we only + * need one in ten documents. + *

+ * The default value of this was experimentally derived using a + * script. + * And a little paranoia. A lower default value was looking good locally, but + * I'm concerned about the implications of effectively using this all the time. + *

+ */ + public static final Setting STORED_FIELDS_SEQUENTIAL_PROPORTION = Setting.doubleSetting( + "stored_fields_sequential_proportion", + 0.10, + 0, + 1 + ); + public static final Setting FIELD_EXTRACT_PREFERENCE = Setting.enumSetting( MappedFieldType.FieldExtractPreference.class, "field_extract_preference", @@ -120,6 +141,18 @@ public int taskConcurrency() { return TASK_CONCURRENCY.get(settings); } + /** + * Tuning parameter for deciding when to use the "merge" stored field loader. + * Think of it as "how similar to a sequential block of documents do I have to + * be before I'll use the merge reader?" So a value of {@code 1} means I have to + * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. + * A value of {@code .1} means significant gaps are allowed and we'll still use the + * sequential reader. + */ + public double storedFieldsSequentialProportion() { + return STORED_FIELDS_SEQUENTIAL_PROPORTION.get(settings); + } + /** * Size of a page in entries with {@code 0} being a special value asking * to adaptively size based on the number of columns in the page. diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index d8da50585bdfa..fc6332f57be65 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7681,7 +7681,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP null, null, null, - new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO), + new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO, 0.1), List.of() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index d556ee68d4033..366dac80db512 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -257,7 +257,7 @@ private Configuration config() { } private EsPhysicalOperationProviders esPhysicalOperationProviders(List shardContexts) { - return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO); + return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO, 0.1); } private List createShardContexts() throws IOException { From 81bb22d83fbcd6badd552d312d10809c5face7d8 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 24 Apr 2025 14:06:33 -0400 Subject: [PATCH 2/5] Update docs/changelog/127348.yaml --- docs/changelog/127348.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/127348.yaml diff --git a/docs/changelog/127348.yaml b/docs/changelog/127348.yaml new file mode 100644 index 0000000000000..933869b2a9d55 --- /dev/null +++ b/docs/changelog/127348.yaml @@ -0,0 +1,5 @@ +pr: 127348 +summary: Speed loading stored fields +area: ES|QL +type: enhancement +issues: [] From e5469a78cb11d04fe539835e35c4cabb3d519cf8 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 24 Apr 2025 14:55:55 -0400 Subject: [PATCH 3/5] Compile --- .../lucene/LuceneQueryEvaluatorTests.java | 3 +- .../ValueSourceReaderTypeConversionTests.java | 31 +++++++++++++----- .../ValuesSourceReaderOperatorTests.java | 32 +++++++++++++------ .../xpack/esql/action/LookupFromIndexIT.java | 3 +- 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 146fdb2a6cdd3..4ab21ceb015a5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -209,7 +209,8 @@ private List runQuery(Set values, Query query, boolean shuffleDocs List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException(); })), - 0 + 0, + 0.1 ) ); LuceneQueryEvaluator.ShardConfig[] shards = new LuceneQueryEvaluator.ShardConfig[] { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java index f52b8282abe45..c7f67a8a723dc 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java @@ -108,6 +108,7 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; +import static org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorTests.STORED_FIELDS_SEQUENTIAL_PROPORTIONS; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.core.type.DataType.IP; @@ -239,7 +240,7 @@ private static Operator.OperatorFactory factory( fail("unexpected shardIdx [" + shardIdx + "]"); } return loader; - })), shardContexts, 0); + })), shardContexts, 0, STORED_FIELDS_SEQUENTIAL_PROPORTIONS); } protected SourceOperator simpleInput(DriverContext context, int size) { @@ -488,7 +489,8 @@ public void testManySingleDocPages() { new ValuesSourceReaderOperator.Factory( List.of(testCase.info, fieldInfo(mapperService(indexKey).fieldType("key"), ElementType.INT)), shardContexts, - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ); List results = drive(operators, input.iterator(), driverContext); @@ -598,7 +600,8 @@ private void loadSimpleAndAssert( fieldInfo(mapperService("index1").fieldType("indexKey"), ElementType.BYTES_REF) ), shardContexts, - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ); List tests = new ArrayList<>(); @@ -607,7 +610,12 @@ private void loadSimpleAndAssert( cases.removeAll(b); tests.addAll(b); operators.add( - new ValuesSourceReaderOperator.Factory(b.stream().map(i -> i.info).toList(), shardContexts, 0).get(driverContext) + new ValuesSourceReaderOperator.Factory( + b.stream().map(i -> i.info).toList(), + shardContexts, + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ).get(driverContext) ); } List results = drive(operators, input.iterator(), driverContext); @@ -709,7 +717,11 @@ private void testLoadAllStatus(boolean allInOnePage) { Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING ); List operators = cases.stream() - .map(i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0).get(driverContext)) + .map( + i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0, STORED_FIELDS_SEQUENTIAL_PROPORTIONS).get( + driverContext + ) + ) .toList(); if (allInOnePage) { input = List.of(CannedSourceOperator.mergePages(input)); @@ -1385,7 +1397,8 @@ public void testNullsShared() { new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) ), shardContexts, - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ), new PageConsumerOperator(page -> { @@ -1416,7 +1429,8 @@ public void testDescriptionOfMany() throws IOException { ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( cases.stream().map(c -> c.info).toList(), List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); try (Operator op = factory.get(driverContext())) { @@ -1462,7 +1476,8 @@ public void testManyShards() throws IOException { return ft.blockLoader(blContext()); })), readerShardContexts, - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ); DriverContext driverContext = driverContext(); List results = drive( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index 564d7ccc64854..20ed9d0ee72c9 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -114,6 +114,8 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase { { false, true, true }, { false, false, true, true } }; + static final double STORED_FIELDS_SEQUENTIAL_PROPORTIONS = 0.1; + private Directory directory = newDirectory(); private MapperService mapperService; private IndexReader reader; @@ -147,7 +149,11 @@ static Operator.OperatorFactory factory(IndexReader reader, String name, Element fail("unexpected shardIdx [" + shardIdx + "]"); } return loader; - })), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), 0); + })), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ); } @Override @@ -444,7 +450,8 @@ public void testManySingleDocPages() { new ValuesSourceReaderOperator.Factory( List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ); List results = drive(operators, input.iterator(), driverContext); @@ -550,7 +557,8 @@ private void loadSimpleAndAssert( new ValuesSourceReaderOperator.Factory( List.of(fieldInfo(mapperService.fieldType("key"), ElementType.INT)), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ); List tests = new ArrayList<>(); @@ -562,7 +570,8 @@ private void loadSimpleAndAssert( new ValuesSourceReaderOperator.Factory( b.stream().map(i -> i.info).toList(), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ); } @@ -652,7 +661,8 @@ private void testLoadAllStatus(boolean allInOnePage) { i -> new ValuesSourceReaderOperator.Factory( List.of(i.info), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ) .toList(); @@ -1418,7 +1428,8 @@ public void testNullsShared() { new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) ), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext) ), new PageConsumerOperator(page -> { @@ -1463,7 +1474,8 @@ private void testSequentialStoredFields(boolean sequential, int docCount) throws fieldInfo(storedTextField("stored_text"), ElementType.BYTES_REF) ), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ).get(driverContext); List results = drive(op, source.iterator(), driverContext); Checks checks = new Checks(Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED); @@ -1491,7 +1503,8 @@ public void testDescriptionOfMany() throws IOException { ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( cases.stream().map(c -> c.info).toList(), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); try (Operator op = factory.get(driverContext())) { @@ -1535,7 +1548,8 @@ public void testManyShards() throws IOException { return ft.blockLoader(blContext()); })), readerShardContexts, - 0 + 0, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS ); DriverContext driverContext = driverContext(); List results = drive( diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 9e65d640ca943..7654fe41538e0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -205,7 +205,8 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> { throw new IllegalStateException("can't load source here"); })), - 0 + 0, + 0.1 ); CancellableTask parentTask = new EsqlQueryTask( 1, From fb65eb08ff75d7ef748d0b577278ea053bfa6c6a Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 28 Apr 2025 14:56:57 -0400 Subject: [PATCH 4/5] Index setting --- .../operator/ValuesSourceReaderBenchmark.java | 7 +- .../lucene/ValuesSourceReaderOperator.java | 33 +-- .../operator/OrdinalsGroupingOperator.java | 14 +- .../elasticsearch/compute/OperatorTests.java | 3 +- .../lucene/LuceneQueryEvaluatorTests.java | 5 +- .../ValueSourceReaderTypeConversionTests.java | 41 ++-- .../ValuesSourceReaderOperatorTests.java | 101 +++++--- .../single_node/StoredFieldsSequentialIT.java | 219 ++++++++++++++++++ .../xpack/esql/action/LookupFromIndexIT.java | 5 +- .../esql/enrich/AbstractLookupService.java | 12 +- .../planner/EsPhysicalOperationProviders.java | 45 +++- .../xpack/esql/plugin/ComputeService.java | 3 +- .../xpack/esql/plugin/EsqlPlugin.java | 26 ++- .../xpack/esql/plugin/QueryPragmas.java | 33 --- .../optimizer/PhysicalPlanOptimizerTests.java | 2 +- .../planner/LocalExecutionPlannerTests.java | 2 +- 16 files changed, 395 insertions(+), 156 deletions(-) create mode 100644 x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java index daa27110bcc11..2796bb5c6de1f 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java @@ -51,7 +51,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.lookup.SearchLookup; -import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -337,9 +337,8 @@ public void benchmark() { fields(name), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException("can't load _source here"); - })), - 0, - QueryPragmas.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY) + }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), + 0 ); long sum = 0; for (Page page : pages) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index f376d663a70f0..5b0ebefa38456 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -72,18 +72,10 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator { * @param shardContexts per-shard loading information * @param docChannel the channel containing the shard, leaf/segment and doc id */ - public record Factory(List fields, List shardContexts, int docChannel, double storedFieldsSequentialProportion) - implements - OperatorFactory { + public record Factory(List fields, List shardContexts, int docChannel) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new ValuesSourceReaderOperator( - driverContext.blockFactory(), - fields, - shardContexts, - docChannel, - storedFieldsSequentialProportion - ); + return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel); } @Override @@ -115,13 +107,12 @@ public String describe() { */ public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} - public record ShardContext(IndexReader reader, Supplier newSourceLoader) {} + public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} private final FieldWork[] fields; private final List shardContexts; private final int docChannel; private final BlockFactory blockFactory; - private final double storedFieldsSequentialProportion; private final Map readersBuilt = new TreeMap<>(); private long valuesLoaded; @@ -134,18 +125,11 @@ public record ShardContext(IndexReader reader, Supplier newSourceL * @param fields fields to load * @param docChannel the channel containing the shard, leaf/segment and doc id */ - public ValuesSourceReaderOperator( - BlockFactory blockFactory, - List fields, - List shardContexts, - int docChannel, - double storedFieldsSequentialProportion - ) { + public ValuesSourceReaderOperator(BlockFactory blockFactory, List fields, List shardContexts, int docChannel) { this.fields = fields.stream().map(f -> new FieldWork(f)).toArray(FieldWork[]::new); this.shardContexts = shardContexts; this.docChannel = docChannel; this.blockFactory = blockFactory; - this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; } @Override @@ -263,8 +247,9 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa } SourceLoader sourceLoader = null; + ShardContext shardContext = shardContexts.get(shard); if (storedFieldsSpec.requiresSource()) { - sourceLoader = shardContexts.get(shard).newSourceLoader.get(); + sourceLoader = shardContext.newSourceLoader.get(); storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); } @@ -277,7 +262,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa ); } StoredFieldLoader storedFieldLoader; - if (useSequentialStoredFieldsReader(docs)) { + if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) { storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec); trackStoredFields(storedFieldsSpec, true); } else { @@ -454,13 +439,13 @@ public void close() { * Is it more efficient to use a sequential stored field reader * when reading stored fields for the documents contained in {@code docIds}? */ - private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) { + private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) { int count = docs.count(); if (count < SEQUENTIAL_BOUNDARY) { return false; } int range = docs.get(count - 1) - docs.get(0); - return range * storedFieldsSequentialProportion < count - 1; + return range * storedFieldsSequentialProportion <= count; } private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 35202d49654ea..c030b329dd2d8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -63,8 +63,7 @@ public record OrdinalsGroupingOperatorFactory( int docChannel, String groupingField, List aggregators, - int maxPageSize, - double storedFieldsSequentialProportion + int maxPageSize ) implements OperatorFactory { @Override @@ -77,7 +76,6 @@ public Operator get(DriverContext driverContext) { groupingField, aggregators, maxPageSize, - storedFieldsSequentialProportion, driverContext ); } @@ -96,7 +94,6 @@ public String describe() { private final List aggregatorFactories; private final ElementType groupingElementType; private final Map ordinalAggregators; - private final double storedFieldsSequentialProportion; private final DriverContext driverContext; @@ -114,7 +111,6 @@ public OrdinalsGroupingOperator( String groupingField, List aggregatorFactories, int maxPageSize, - double storedFieldsSequentialProportion, DriverContext driverContext ) { Objects.requireNonNull(aggregatorFactories); @@ -126,7 +122,6 @@ public OrdinalsGroupingOperator( this.aggregatorFactories = aggregatorFactories; this.ordinalAggregators = new HashMap<>(); this.maxPageSize = maxPageSize; - this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; this.driverContext = driverContext; } @@ -176,7 +171,6 @@ public void addInput(Page page) { channelIndex, aggregatorFactories, maxPageSize, - storedFieldsSequentialProportion, driverContext ); } @@ -491,7 +485,6 @@ boolean next() throws IOException { private static class ValuesAggregator implements Releasable { private final ValuesSourceReaderOperator extractor; private final HashAggregationOperator aggregator; - private final double storedFieldsSequentialProportion; ValuesAggregator( IntFunction blockLoaders, @@ -502,15 +495,13 @@ private static class ValuesAggregator implements Releasable { int channelIndex, List aggregatorFactories, int maxPageSize, - double storedFieldsSequentialProportion, DriverContext driverContext ) { this.extractor = new ValuesSourceReaderOperator( driverContext.blockFactory(), List.of(new ValuesSourceReaderOperator.FieldInfo(groupingField, groupingElementType, blockLoaders)), shardContexts, - docChannel, - storedFieldsSequentialProportion + docChannel ); this.aggregator = new HashAggregationOperator( aggregatorFactories, @@ -522,7 +513,6 @@ private static class ValuesAggregator implements Releasable { ), driverContext ); - this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; } void addInput(Page page) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 7187c45c7adab..8060d42458056 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -198,13 +198,12 @@ public String toString() { operators.add( new OrdinalsGroupingOperator( shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(mockBlContext()), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), ElementType.BYTES_REF, 0, gField, List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(INITIAL, List.of(1))), randomPageSize(), - 0.1, driverContext ) ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 4ab21ceb015a5..18db35be608d7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -208,9 +208,8 @@ private List runQuery(Set values, Query query, boolean shuffleDocs ), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException(); - })), - 0, - 0.1 + }, 0.2)), + 0 ) ); LuceneQueryEvaluator.ShardConfig[] shards = new LuceneQueryEvaluator.ShardConfig[] { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java index c7f67a8a723dc..3778b6965b4f3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java @@ -108,7 +108,6 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; -import static org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorTests.STORED_FIELDS_SEQUENTIAL_PROPORTIONS; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.core.type.DataType.IP; @@ -201,7 +200,7 @@ private MapperService mapperService(String indexKey) { private List initShardContexts() { return INDICES.keySet() .stream() - .map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE)) + .map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)) .toList(); } @@ -240,7 +239,7 @@ private static Operator.OperatorFactory factory( fail("unexpected shardIdx [" + shardIdx + "]"); } return loader; - })), shardContexts, 0, STORED_FIELDS_SEQUENTIAL_PROPORTIONS); + })), shardContexts, 0); } protected SourceOperator simpleInput(DriverContext context, int size) { @@ -489,8 +488,7 @@ public void testManySingleDocPages() { new ValuesSourceReaderOperator.Factory( List.of(testCase.info, fieldInfo(mapperService(indexKey).fieldType("key"), ElementType.INT)), shardContexts, - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + 0 ).get(driverContext) ); List results = drive(operators, input.iterator(), driverContext); @@ -600,8 +598,7 @@ private void loadSimpleAndAssert( fieldInfo(mapperService("index1").fieldType("indexKey"), ElementType.BYTES_REF) ), shardContexts, - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + 0 ).get(driverContext) ); List tests = new ArrayList<>(); @@ -610,12 +607,7 @@ private void loadSimpleAndAssert( cases.removeAll(b); tests.addAll(b); operators.add( - new ValuesSourceReaderOperator.Factory( - b.stream().map(i -> i.info).toList(), - shardContexts, - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS - ).get(driverContext) + new ValuesSourceReaderOperator.Factory(b.stream().map(i -> i.info).toList(), shardContexts, 0).get(driverContext) ); } List results = drive(operators, input.iterator(), driverContext); @@ -717,11 +709,7 @@ private void testLoadAllStatus(boolean allInOnePage) { Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING ); List operators = cases.stream() - .map( - i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0, STORED_FIELDS_SEQUENTIAL_PROPORTIONS).get( - driverContext - ) - ) + .map(i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0).get(driverContext)) .toList(); if (allInOnePage) { input = List.of(CannedSourceOperator.mergePages(input)); @@ -1309,7 +1297,7 @@ public void testWithNulls() throws IOException { LuceneOperator.NO_LIMIT, false // no scoring ); - var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE); + var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2); try ( Driver driver = TestDriverFactory.create( driverContext, @@ -1397,8 +1385,7 @@ public void testNullsShared() { new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) ), shardContexts, - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + 0 ).get(driverContext) ), new PageConsumerOperator(page -> { @@ -1428,9 +1415,8 @@ public void testDescriptionOfMany() throws IOException { ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( cases.stream().map(c -> c.info).toList(), - List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), + 0 ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); try (Operator op = factory.get(driverContext())) { @@ -1457,7 +1443,9 @@ public void testManyShards() throws IOException { List readerShardContexts = new ArrayList<>(); for (int s = 0; s < shardCount; s++) { contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s)); - readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE)); + readerShardContexts.add( + new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE, 0.2) + ); } var luceneFactory = new LuceneSourceOperator.Factory( contexts, @@ -1476,8 +1464,7 @@ public void testManyShards() throws IOException { return ft.blockLoader(blContext()); })), readerShardContexts, - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + 0 ); DriverContext driverContext = driverContext(); List results = drive( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index 20ed9d0ee72c9..1fbe42aa4119d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -114,7 +114,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase { { false, true, true }, { false, false, true, true } }; - static final double STORED_FIELDS_SEQUENTIAL_PROPORTIONS = 0.1; + static final double STORED_FIELDS_SEQUENTIAL_PROPORTIONS = 0.2; private Directory directory = newDirectory(); private MapperService mapperService; @@ -150,9 +150,14 @@ static Operator.OperatorFactory factory(IndexReader reader, String name, Element } return loader; })), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ); } @@ -449,9 +454,14 @@ public void testManySingleDocPages() { operators.add( new ValuesSourceReaderOperator.Factory( List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ).get(driverContext) ); List results = drive(operators, input.iterator(), driverContext); @@ -556,9 +566,14 @@ private void loadSimpleAndAssert( operators.add( new ValuesSourceReaderOperator.Factory( List.of(fieldInfo(mapperService.fieldType("key"), ElementType.INT)), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ).get(driverContext) ); List tests = new ArrayList<>(); @@ -569,9 +584,14 @@ private void loadSimpleAndAssert( operators.add( new ValuesSourceReaderOperator.Factory( b.stream().map(i -> i.info).toList(), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ).get(driverContext) ); } @@ -660,9 +680,14 @@ private void testLoadAllStatus(boolean allInOnePage) { .map( i -> new ValuesSourceReaderOperator.Factory( List.of(i.info), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ).get(driverContext) ) .toList(); @@ -1427,9 +1452,14 @@ public void testNullsShared() { new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) ), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ).get(driverContext) ), new PageConsumerOperator(page -> { @@ -1473,9 +1503,14 @@ private void testSequentialStoredFields(boolean sequential, int docCount) throws fieldInfo(mapperService.fieldType("key"), ElementType.INT), fieldInfo(storedTextField("stored_text"), ElementType.BYTES_REF) ), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ).get(driverContext); List results = drive(op, source.iterator(), driverContext); Checks checks = new Checks(Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED); @@ -1502,9 +1537,14 @@ public void testDescriptionOfMany() throws IOException { ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( cases.stream().map(c -> c.info).toList(), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); try (Operator op = factory.get(driverContext())) { @@ -1530,7 +1570,13 @@ public void testManyShards() throws IOException { List readerShardContexts = new ArrayList<>(); for (int s = 0; s < shardCount; s++) { contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s)); - readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE)); + readerShardContexts.add( + new ValuesSourceReaderOperator.ShardContext( + readers[s], + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ); } var luceneFactory = new LuceneSourceOperator.Factory( contexts, @@ -1548,8 +1594,7 @@ public void testManyShards() throws IOException { return ft.blockLoader(blContext()); })), readerShardContexts, - 0, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS + 0 ); DriverContext driverContext = driverContext(); List results = drive( diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java new file mode 100644 index 0000000000000..5577c8393e065 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java @@ -0,0 +1,219 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.single_node; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.MapMatcher; +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.esql.AssertWarnings; +import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.entityToMap; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql; +import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile; +import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile; +import static org.hamcrest.Matchers.*; + +/** + * Tests for {@code index.esql.stored_fields_sequential_proportion} which controls + * an optimization we use when loading from {@code _source}. + */ +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class StoredFieldsSequentialIT extends ESRestTestCase { + private static final Logger LOG = LogManager.getLogger(StoredFieldsSequentialIT.class); + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(); + + public void testFetchTen() throws IOException { + testQuery(null, """ + FROM test + | LIMIT 10 + """, 10, true); + } + + public void testAggAll() throws IOException { + testQuery(null, """ + FROM test + | STATS SUM(LENGTH(test)) + """, 1000, true); + } + + public void testAggTwentyPercent() throws IOException { + testQuery(null, """ + FROM test + | WHERE STARTS_WITH(test.keyword, "test1") OR STARTS_WITH(test.keyword, "test2") + | STATS SUM(LENGTH(test)) + """, 200, true); + } + + public void testAggTenPercentDefault() throws IOException { + testAggTenPercent(null, false); + } + + public void testAggTenPercentConfiguredToTenPct() throws IOException { + testAggTenPercent(0.10, true); + } + + public void testAggTenPercentConfiguredToOnePct() throws IOException { + testAggTenPercent(0.01, true); + } + + /** + * It's important for the test that the queries we use detect "scattered" docs. + * If they were "compact" in the index we'd still load them using the sequential + * reader. + */ + private void testAggTenPercent(Double percent, boolean sequential) throws IOException { + String filter = IntStream.range(0, 10) + .mapToObj(i -> String.format(Locale.ROOT, "STARTS_WITH(test.keyword, \"test%s%s\")", i, i)) + .collect(Collectors.joining(" OR ")); + testQuery(percent, String.format(Locale.ROOT, """ + FROM test + | WHERE %s + | STATS SUM(LENGTH(test)) + """, filter), 100, sequential); + } + + private void testQuery(Double percent, String query, int documentsFound, boolean sequential) throws IOException { + setPercent(percent); + RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query(query); + builder.profile(true); + Map result = runEsql(builder, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC); + assertMap( + result, + matchesMap().entry("documents_found", documentsFound) + .entry( + "profile", + matchesMap().entry("drivers", instanceOf(List.class)) + .entry("planning", matchesMap().extraOk()) + .entry("query", matchesMap().extraOk()) + ) + .extraOk() + ); + + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + fixTypesOnProfile(p); + assertThat(p, commonProfile()); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + LOG.info("profile {}", o.get("operator")); + } + for (Map o : operators) { + checkOperatorProfile(o, sequential); + } + } + } + + private void setPercent(Double percent) throws IOException { + Request set = new Request("PUT", "test/_settings"); + set.setJsonEntity(String.format(Locale.ROOT, """ + { + "index": { + "esql": { + "stored_fields_sequential_proportion": %s + } + } + } + """, percent)); + assertMap(entityToMap(client().performRequest(set).getEntity(), XContentType.JSON), matchesMap().entry("acknowledged", true)); + } + + @Before + public void buildIndex() throws IOException { + Request exists = new Request("GET", "test"); + try { + client().performRequest(exists); + return; + } catch (ResponseException e) { + if (e.getResponse().getStatusLine().getStatusCode() != 404) { + throw e; + } + } + Request createIndex = new Request("PUT", "test"); + createIndex.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "sort.field": "i" + } + }, + "mappings": { + "properties": { + "i": {"type": "long"} + } + } + }"""); + Response createResponse = client().performRequest(createIndex); + assertThat( + entityToMap(createResponse.getEntity(), XContentType.JSON), + matchesMap().entry("shards_acknowledged", true).entry("index", "test").entry("acknowledged", true) + ); + + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", ""); + StringBuilder b = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + b.append(String.format(""" + {"create":{"_index":"test"}} + {"test":"test%03d", "i": %d} + """, i, i)); + } + bulk.setJsonEntity(b.toString()); + Response bulkResponse = client().performRequest(bulk); + assertThat(entityToMap(bulkResponse.getEntity(), XContentType.JSON), matchesMap().entry("errors", false).extraOk()); + } + + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + private static void checkOperatorProfile(Map o, boolean sequential) { + String name = (String) o.get("operator"); + if (name.startsWith("ValuesSourceReaderOperator")) { + MapMatcher readersBuilt = matchesMap().entry( + "stored_fields[requires_source:true, fields:0, sequential: " + sequential + "]", + greaterThanOrEqualTo(1) + ).extraOk(); + MapMatcher expectedOp = matchesMap().entry("operator", name) + .entry("status", matchesMap().entry("readers_built", readersBuilt).extraOk()); + assertMap(o, expectedOp); + } + } + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 7654fe41538e0..c5cbeaac71072 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -204,9 +204,8 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws ), List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> { throw new IllegalStateException("can't load source here"); - })), - 0, - 0.1 + }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), + 0 ); CancellableTask parentTask = new EsqlQueryTask( 1, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 90d9e095e1e1f..42d03f4e1b161 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.compute.data.Block; @@ -408,9 +409,14 @@ private static Operator extractFieldsOperator( return new ValuesSourceReaderOperator( driverContext.blockFactory(), fields, - List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)), - 0, - 0.1 + List.of( + new ValuesSourceReaderOperator.ShardContext( + shardContext.searcher().getIndexReader(), + shardContext::newSourceLoader, + EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY) + ) + ), + 0 ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index b8a89245f14cd..e72ae8ad347b1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -64,6 +64,7 @@ import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import java.io.IOException; import java.util.ArrayList; @@ -99,23 +100,30 @@ public interface ShardContext extends org.elasticsearch.compute.lucene.ShardCont * Returns something to load values from this field into a {@link Block}. */ BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference); + + /** + * Tuning parameter for deciding when to use the "merge" stored field loader. + * Think of it as "how similar to a sequential block of documents do I have to + * be before I'll use the merge reader?" So a value of {@code 1} means I have to + * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. + * A value of {@code .2} means we'll use the sequential reader even if we only + * need one in ten documents. + */ + double storedFieldsSequentialProportion(); } private final List shardContexts; private final DataPartitioning defaultDataPartitioning; - private final double storedFieldsSequentialProportion; public EsPhysicalOperationProviders( FoldContext foldContext, List shardContexts, AnalysisRegistry analysisRegistry, - DataPartitioning defaultDataPartitioning, - double storedFieldsSequentialProportion + DataPartitioning defaultDataPartitioning ) { super(foldContext, analysisRegistry); this.shardContexts = shardContexts; this.defaultDataPartitioning = defaultDataPartitioning; - this.storedFieldsSequentialProportion = storedFieldsSequentialProportion; } @Override @@ -123,7 +131,13 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi Layout.Builder layout = source.layout.builder(); var sourceAttr = fieldExtractExec.sourceAttribute(); List readers = shardContexts.stream() - .map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader)) + .map( + s -> new ValuesSourceReaderOperator.ShardContext( + s.searcher().getIndexReader(), + s::newSourceLoader, + s.storedFieldsSequentialProportion() + ) + ) .toList(); List fields = new ArrayList<>(); int docChannel = source.layout.get(sourceAttr.id()).channel(); @@ -135,10 +149,7 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi IntFunction loader = s -> getBlockLoaderFor(s, attr, fieldExtractPreference); fields.add(new ValuesSourceReaderOperator.FieldInfo(getFieldName(attr), elementType, loader)); } - return source.with( - new ValuesSourceReaderOperator.Factory(fields, readers, docChannel, storedFieldsSequentialProportion), - layout.build() - ); + return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build()); } private static String getFieldName(Attribute attr) { @@ -273,7 +284,13 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory( var sourceAttribute = FieldExtractExec.extractSourceAttributesFrom(aggregateExec.child()); int docChannel = source.layout.get(sourceAttribute.id()).channel(); List vsShardContexts = shardContexts.stream() - .map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader)) + .map( + s -> new ValuesSourceReaderOperator.ShardContext( + s.searcher().getIndexReader(), + s::newSourceLoader, + s.storedFieldsSequentialProportion() + ) + ) .toList(); // The grouping-by values are ready, let's group on them directly. // Costin: why are they ready and not already exposed in the layout? @@ -284,8 +301,7 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory( docChannel, attrSource.name(), aggregatorFactories, - context.pageSize(aggregateExec.estimatedRowSize()), - storedFieldsSequentialProportion + context.pageSize(aggregateExec.estimatedRowSize()) ); } @@ -407,6 +423,11 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { protected @Nullable MappedFieldType fieldType(String name) { return ctx.getFieldType(name); } + + @Override + public double storedFieldsSequentialProportion() { + return EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.get(ctx.getIndexSettings().getSettings()); + } } private static class TypeConvertingBlockLoader implements BlockLoader { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4a0df7f15fce8..417b1bf3b78ed 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -539,8 +539,7 @@ public SourceProvider createSourceProvider() { context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis(), - defaultDataPartitioning, - context.configuration().pragmas().storedFieldsSequentialProportion() + defaultDataPartitioning ); final List drivers; try { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 2657cda192308..12dd811e4733c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -163,6 +163,29 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { Setting.Property.Dynamic ); + /** + * Tuning parameter for deciding when to use the "merge" stored field loader. + * Think of it as "how similar to a sequential block of documents do I have to + * be before I'll use the merge reader?" So a value of {@code 1} means I have to + * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. + * A value of {@code .2} means we'll use the sequential reader even if we only + * need one in ten documents. + *

+ * The default value of this was experimentally derived using a + * script. + * And a little paranoia. A lower default value was looking good locally, but + * I'm concerned about the implications of effectively using this all the time. + *

+ */ + public static final Setting STORED_FIELDS_SEQUENTIAL_PROPORTION = Setting.doubleSetting( + "index.esql.stored_fields_sequential_proportion", + 0.20, + 0, + 1, + Setting.Property.IndexScope, + Setting.Property.Dynamic + ); + @Override public Collection createComponents(PluginServices services) { CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request"); @@ -226,7 +249,8 @@ public List> getSettings() { ESQL_QUERYLOG_THRESHOLD_INFO_SETTING, ESQL_QUERYLOG_THRESHOLD_WARN_SETTING, ESQL_QUERYLOG_INCLUDE_USER_SETTING, - DEFAULT_DATA_PARTITIONING + DEFAULT_DATA_PARTITIONING, + STORED_FIELDS_SEQUENTIAL_PROPORTION ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index 56d2a9e6ef219..b0a99abf14288 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -73,27 +73,6 @@ public final class QueryPragmas implements Writeable { public static final Setting FOLD_LIMIT = Setting.memorySizeSetting("fold_limit", "5%"); - /** - * Tuning parameter for deciding when to use the "merge" stored field loader. - * Think of it as "how similar to a sequential block of documents do I have to - * be before I'll use the merge reader?" So a value of {@code 1} means I have to - * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. - * A value of {@code .1} means we'll use the sequential reader even if we only - * need one in ten documents. - *

- * The default value of this was experimentally derived using a - * script. - * And a little paranoia. A lower default value was looking good locally, but - * I'm concerned about the implications of effectively using this all the time. - *

- */ - public static final Setting STORED_FIELDS_SEQUENTIAL_PROPORTION = Setting.doubleSetting( - "stored_fields_sequential_proportion", - 0.10, - 0, - 1 - ); - public static final Setting FIELD_EXTRACT_PREFERENCE = Setting.enumSetting( MappedFieldType.FieldExtractPreference.class, "field_extract_preference", @@ -141,18 +120,6 @@ public int taskConcurrency() { return TASK_CONCURRENCY.get(settings); } - /** - * Tuning parameter for deciding when to use the "merge" stored field loader. - * Think of it as "how similar to a sequential block of documents do I have to - * be before I'll use the merge reader?" So a value of {@code 1} means I have to - * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. - * A value of {@code .1} means significant gaps are allowed and we'll still use the - * sequential reader. - */ - public double storedFieldsSequentialProportion() { - return STORED_FIELDS_SEQUENTIAL_PROPORTION.get(settings); - } - /** * Size of a page in entries with {@code 0} being a special value asking * to adaptively size based on the number of columns in the page. diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index fc6332f57be65..d8da50585bdfa 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7681,7 +7681,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP null, null, null, - new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO, 0.1), + new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO), List.of() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 366dac80db512..d556ee68d4033 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -257,7 +257,7 @@ private Configuration config() { } private EsPhysicalOperationProviders esPhysicalOperationProviders(List shardContexts) { - return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO, 0.1); + return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO); } private List createShardContexts() throws IOException { From 8ba45417540231de45d0de7058342ba6893f073e Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 29 Apr 2025 07:22:51 -0400 Subject: [PATCH 5/5] Doc --- .../elasticsearch/index-settings/index-modules.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/reference/elasticsearch/index-settings/index-modules.md b/docs/reference/elasticsearch/index-settings/index-modules.md index 36ff715aa5cb4..3aea9c0a41ac5 100644 --- a/docs/reference/elasticsearch/index-settings/index-modules.md +++ b/docs/reference/elasticsearch/index-settings/index-modules.md @@ -49,7 +49,7 @@ $$$index-codec$$$ `index.codec` $$$index-mode-setting$$$ `index.mode` : The `index.mode` setting is used to control settings applied in specific domains like ingestion of time series data or logs. Different mutually exclusive modes exist, which are used to apply settings or default values controlling indexing of documents, sorting and other parameters whose value affects indexing or query performance. - + **Example** ```console @@ -246,3 +246,8 @@ $$$index-final-pipeline$$$ $$$index-hidden$$$ `index.hidden` : Indicates whether the index should be hidden by default. Hidden indices are not returned by default when using a wildcard expression. This behavior is controlled per request through the use of the `expand_wildcards` parameter. Possible values are `true` and `false` (default). + +$$$index-esql-stored-fields-sequential-proportion$$$ + +`index.esql.stored_fields_sequential_proportion` +: Tuning parameter for deciding when {{esql}} will load [Stored fields](/reference/elasticsearch/rest-apis/retrieve-selected-fields.md#stored-fields) using a strategy tuned for loading dense sequence of documents. Allows values between 0.0 and 1.0 and defaults to 0.2. Indices with documents smaller than 10kb may see speed improvements loading `text` fields by setting this lower.