diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java index 4469a2c01623d..2796bb5c6de1f 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.NumericUtils; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; @@ -50,6 +51,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -335,7 +337,7 @@ public void benchmark() { fields(name), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException("can't load _source here"); - })), + }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), 0 ); long sum = 0; diff --git a/docs/changelog/127348.yaml b/docs/changelog/127348.yaml new file mode 100644 index 0000000000000..933869b2a9d55 --- /dev/null +++ b/docs/changelog/127348.yaml @@ -0,0 +1,5 @@ +pr: 127348 +summary: Speed loading stored fields +area: ES|QL +type: enhancement +issues: [] diff --git a/docs/reference/elasticsearch/index-settings/index-modules.md b/docs/reference/elasticsearch/index-settings/index-modules.md index 3b04f4e5fd9cf..682a6fa2a39d4 100644 --- a/docs/reference/elasticsearch/index-settings/index-modules.md +++ b/docs/reference/elasticsearch/index-settings/index-modules.md @@ -49,7 +49,7 @@ $$$index-codec$$$ `index.codec` $$$index-mode-setting$$$ `index.mode` : The `index.mode` setting is used to control settings applied in specific domains like ingestion of time series data or logs. Different mutually exclusive modes exist, which are used to apply settings or default values controlling indexing of documents, sorting and other parameters whose value affects indexing or query performance. - + **Example** ```console @@ -248,3 +248,8 @@ $$$index-final-pipeline$$$ $$$index-hidden$$$ `index.hidden` : Indicates whether the index should be hidden by default. Hidden indices are not returned by default when using a wildcard expression. This behavior is controlled per request through the use of the `expand_wildcards` parameter. Possible values are `true` and `false` (default). + +$$$index-esql-stored-fields-sequential-proportion$$$ + +`index.esql.stored_fields_sequential_proportion` +: Tuning parameter for deciding when {{esql}} will load [Stored fields](/reference/elasticsearch/rest-apis/retrieve-selected-fields.md#stored-fields) using a strategy tuned for loading dense sequence of documents. Allows values between 0.0 and 1.0 and defaults to 0.2. Indices with documents smaller than 10kb may see speed improvements loading `text` fields by setting this lower. diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index cae85ebd7ba6e..57819d6d7e041 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -107,7 +107,7 @@ public String describe() { */ public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} - public record ShardContext(IndexReader reader, Supplier newSourceLoader) {} + public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} private final FieldWork[] fields; private final List shardContexts; @@ -247,8 +247,9 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa } SourceLoader sourceLoader = null; + ShardContext shardContext = shardContexts.get(shard); if (storedFieldsSpec.requiresSource()) { - sourceLoader = shardContexts.get(shard).newSourceLoader.get(); + sourceLoader = shardContext.newSourceLoader.get(); storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields())); } @@ -261,7 +262,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa ); } StoredFieldLoader storedFieldLoader; - if (useSequentialStoredFieldsReader(docs)) { + if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) { storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec); trackStoredFields(storedFieldsSpec, true); } else { @@ -438,9 +439,13 @@ public void close() { * Is it more efficient to use a sequential stored field reader * when reading stored fields for the documents contained in {@code docIds}? */ - private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) { + private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) { int count = docs.count(); - return count >= SEQUENTIAL_BOUNDARY && docs.get(count - 1) - docs.get(0) == count - 1; + if (count < SEQUENTIAL_BOUNDARY) { + return false; + } + int range = docs.get(count - 1) - docs.get(0); + return range * storedFieldsSequentialProportion <= count; } private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 8ec54895025ea..8060d42458056 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -198,7 +198,7 @@ public String toString() { operators.add( new OrdinalsGroupingOperator( shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(mockBlContext()), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), ElementType.BYTES_REF, 0, gField, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 146fdb2a6cdd3..18db35be608d7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -208,7 +208,7 @@ private List runQuery(Set values, Query query, boolean shuffleDocs ), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException(); - })), + }, 0.2)), 0 ) ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java index f52b8282abe45..3778b6965b4f3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java @@ -200,7 +200,7 @@ private MapperService mapperService(String indexKey) { private List initShardContexts() { return INDICES.keySet() .stream() - .map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE)) + .map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)) .toList(); } @@ -1297,7 +1297,7 @@ public void testWithNulls() throws IOException { LuceneOperator.NO_LIMIT, false // no scoring ); - var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE); + var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2); try ( Driver driver = TestDriverFactory.create( driverContext, @@ -1415,7 +1415,7 @@ public void testDescriptionOfMany() throws IOException { ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( cases.stream().map(c -> c.info).toList(), - List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE)), + List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), 0 ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); @@ -1443,7 +1443,9 @@ public void testManyShards() throws IOException { List readerShardContexts = new ArrayList<>(); for (int s = 0; s < shardCount; s++) { contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s)); - readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE)); + readerShardContexts.add( + new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE, 0.2) + ); } var luceneFactory = new LuceneSourceOperator.Factory( contexts, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index 2b97f4fefe5ea..60d160b67154a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -114,6 +114,8 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase { { false, true, true }, { false, false, true, true } }; + static final double STORED_FIELDS_SEQUENTIAL_PROPORTIONS = 0.2; + private Directory directory = newDirectory(); private MapperService mapperService; private IndexReader reader; @@ -147,7 +149,16 @@ static Operator.OperatorFactory factory(IndexReader reader, String name, Element fail("unexpected shardIdx [" + shardIdx + "]"); } return loader; - })), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), 0); + })), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + 0 + ); } @Override @@ -443,7 +454,13 @@ public void testManySingleDocPages() { operators.add( new ValuesSourceReaderOperator.Factory( List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), 0 ).get(driverContext) ); @@ -549,7 +566,13 @@ private void loadSimpleAndAssert( operators.add( new ValuesSourceReaderOperator.Factory( List.of(fieldInfo(mapperService.fieldType("key"), ElementType.INT)), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), 0 ).get(driverContext) ); @@ -561,7 +584,13 @@ private void loadSimpleAndAssert( operators.add( new ValuesSourceReaderOperator.Factory( b.stream().map(i -> i.info).toList(), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), 0 ).get(driverContext) ); @@ -651,7 +680,13 @@ private void testLoadAllStatus(boolean allInOnePage) { .map( i -> new ValuesSourceReaderOperator.Factory( List.of(i.info), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), 0 ).get(driverContext) ) @@ -1417,7 +1452,13 @@ public void testNullsShared() { new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) ), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), 0 ).get(driverContext) ), @@ -1462,7 +1503,13 @@ private void testSequentialStoredFields(boolean sequential, int docCount) throws fieldInfo(mapperService.fieldType("key"), ElementType.INT), fieldInfo(storedTextField("stored_text"), ElementType.BYTES_REF) ), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), 0 ).get(driverContext); List results = drive(op, source.iterator(), driverContext); @@ -1490,7 +1537,13 @@ public void testDescriptionOfMany() throws IOException { ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory( cases.stream().map(c -> c.info).toList(), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + reader, + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), 0 ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); @@ -1517,7 +1570,13 @@ public void testManyShards() throws IOException { List readerShardContexts = new ArrayList<>(); for (int s = 0; s < shardCount; s++) { contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s)); - readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE)); + readerShardContexts.add( + new ValuesSourceReaderOperator.ShardContext( + readers[s], + () -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ); } var luceneFactory = new LuceneSourceOperator.Factory( contexts, diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java new file mode 100644 index 0000000000000..5577c8393e065 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java @@ -0,0 +1,219 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.single_node; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.MapMatcher; +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.esql.AssertWarnings; +import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.entityToMap; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql; +import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile; +import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile; +import static org.hamcrest.Matchers.*; + +/** + * Tests for {@code index.esql.stored_fields_sequential_proportion} which controls + * an optimization we use when loading from {@code _source}. + */ +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class StoredFieldsSequentialIT extends ESRestTestCase { + private static final Logger LOG = LogManager.getLogger(StoredFieldsSequentialIT.class); + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(); + + public void testFetchTen() throws IOException { + testQuery(null, """ + FROM test + | LIMIT 10 + """, 10, true); + } + + public void testAggAll() throws IOException { + testQuery(null, """ + FROM test + | STATS SUM(LENGTH(test)) + """, 1000, true); + } + + public void testAggTwentyPercent() throws IOException { + testQuery(null, """ + FROM test + | WHERE STARTS_WITH(test.keyword, "test1") OR STARTS_WITH(test.keyword, "test2") + | STATS SUM(LENGTH(test)) + """, 200, true); + } + + public void testAggTenPercentDefault() throws IOException { + testAggTenPercent(null, false); + } + + public void testAggTenPercentConfiguredToTenPct() throws IOException { + testAggTenPercent(0.10, true); + } + + public void testAggTenPercentConfiguredToOnePct() throws IOException { + testAggTenPercent(0.01, true); + } + + /** + * It's important for the test that the queries we use detect "scattered" docs. + * If they were "compact" in the index we'd still load them using the sequential + * reader. + */ + private void testAggTenPercent(Double percent, boolean sequential) throws IOException { + String filter = IntStream.range(0, 10) + .mapToObj(i -> String.format(Locale.ROOT, "STARTS_WITH(test.keyword, \"test%s%s\")", i, i)) + .collect(Collectors.joining(" OR ")); + testQuery(percent, String.format(Locale.ROOT, """ + FROM test + | WHERE %s + | STATS SUM(LENGTH(test)) + """, filter), 100, sequential); + } + + private void testQuery(Double percent, String query, int documentsFound, boolean sequential) throws IOException { + setPercent(percent); + RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query(query); + builder.profile(true); + Map result = runEsql(builder, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC); + assertMap( + result, + matchesMap().entry("documents_found", documentsFound) + .entry( + "profile", + matchesMap().entry("drivers", instanceOf(List.class)) + .entry("planning", matchesMap().extraOk()) + .entry("query", matchesMap().extraOk()) + ) + .extraOk() + ); + + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + fixTypesOnProfile(p); + assertThat(p, commonProfile()); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + LOG.info("profile {}", o.get("operator")); + } + for (Map o : operators) { + checkOperatorProfile(o, sequential); + } + } + } + + private void setPercent(Double percent) throws IOException { + Request set = new Request("PUT", "test/_settings"); + set.setJsonEntity(String.format(Locale.ROOT, """ + { + "index": { + "esql": { + "stored_fields_sequential_proportion": %s + } + } + } + """, percent)); + assertMap(entityToMap(client().performRequest(set).getEntity(), XContentType.JSON), matchesMap().entry("acknowledged", true)); + } + + @Before + public void buildIndex() throws IOException { + Request exists = new Request("GET", "test"); + try { + client().performRequest(exists); + return; + } catch (ResponseException e) { + if (e.getResponse().getStatusLine().getStatusCode() != 404) { + throw e; + } + } + Request createIndex = new Request("PUT", "test"); + createIndex.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "sort.field": "i" + } + }, + "mappings": { + "properties": { + "i": {"type": "long"} + } + } + }"""); + Response createResponse = client().performRequest(createIndex); + assertThat( + entityToMap(createResponse.getEntity(), XContentType.JSON), + matchesMap().entry("shards_acknowledged", true).entry("index", "test").entry("acknowledged", true) + ); + + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", ""); + StringBuilder b = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + b.append(String.format(""" + {"create":{"_index":"test"}} + {"test":"test%03d", "i": %d} + """, i, i)); + } + bulk.setJsonEntity(b.toString()); + Response bulkResponse = client().performRequest(bulk); + assertThat(entityToMap(bulkResponse.getEntity(), XContentType.JSON), matchesMap().entry("errors", false).extraOk()); + } + + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + private static void checkOperatorProfile(Map o, boolean sequential) { + String name = (String) o.get("operator"); + if (name.startsWith("ValuesSourceReaderOperator")) { + MapMatcher readersBuilt = matchesMap().entry( + "stored_fields[requires_source:true, fields:0, sequential: " + sequential + "]", + greaterThanOrEqualTo(1) + ).extraOk(); + MapMatcher expectedOp = matchesMap().entry("operator", name) + .entry("status", matchesMap().entry("readers_built", readersBuilt).extraOk()); + assertMap(o, expectedOp); + } + } + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 9e65d640ca943..c5cbeaac71072 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -204,7 +204,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws ), List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> { throw new IllegalStateException("can't load source here"); - })), + }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), 0 ); CancellableTask parentTask = new EsqlQueryTask( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 34df5c0a6368e..42d03f4e1b161 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.compute.data.Block; @@ -408,7 +409,13 @@ private static Operator extractFieldsOperator( return new ValuesSourceReaderOperator( driverContext.blockFactory(), fields, - List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)), + List.of( + new ValuesSourceReaderOperator.ShardContext( + shardContext.searcher().getIndexReader(), + shardContext::newSourceLoader, + EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY) + ) + ), 0 ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 1948e48a58824..70d9cef347c89 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -69,6 +69,7 @@ import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import java.io.IOException; import java.util.ArrayList; @@ -95,6 +96,16 @@ public interface ShardContext extends org.elasticsearch.compute.lucene.ShardCont * Convert a {@link QueryBuilder} into a real {@link Query lucene query}. */ Query toQuery(QueryBuilder queryBuilder); + + /** + * Tuning parameter for deciding when to use the "merge" stored field loader. + * Think of it as "how similar to a sequential block of documents do I have to + * be before I'll use the merge reader?" So a value of {@code 1} means I have to + * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. + * A value of {@code .2} means we'll use the sequential reader even if we only + * need one in ten documents. + */ + double storedFieldsSequentialProportion(); } private final List shardContexts; @@ -116,7 +127,13 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi Layout.Builder layout = source.layout.builder(); var sourceAttr = fieldExtractExec.sourceAttribute(); List readers = shardContexts.stream() - .map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader)) + .map( + s -> new ValuesSourceReaderOperator.ShardContext( + s.searcher().getIndexReader(), + s::newSourceLoader, + s.storedFieldsSequentialProportion() + ) + ) .toList(); int docChannel = source.layout.get(sourceAttr.id()).channel(); for (Attribute attr : fieldExtractExec.attributesToExtract()) { @@ -288,7 +305,13 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory( var sourceAttribute = FieldExtractExec.extractSourceAttributesFrom(aggregateExec.child()); int docChannel = source.layout.get(sourceAttribute.id()).channel(); List vsShardContexts = shardContexts.stream() - .map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader)) + .map( + s -> new ValuesSourceReaderOperator.ShardContext( + s.searcher().getIndexReader(), + s::newSourceLoader, + s.storedFieldsSequentialProportion() + ) + ) .toList(); // The grouping-by values are ready, let's group on them directly. // Costin: why are they ready and not already exposed in the layout? @@ -439,6 +462,11 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { protected @Nullable MappedFieldType fieldType(String name) { return ctx.getFieldType(name); } + + @Override + public double storedFieldsSequentialProportion() { + return EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.get(ctx.getIndexSettings().getSettings()); + } } private static class TypeConvertingBlockLoader implements BlockLoader { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index dc2b90d39928e..396410432c16c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -536,6 +536,12 @@ public SourceProvider createSourceProvider() { new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter()) ); } + EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders( + context.foldCtx(), + contexts, + searchService.getIndicesService().getAnalysis(), + defaultDataPartitioning + ); final List drivers; try { LocalExecutionPlanner planner = new LocalExecutionPlanner( @@ -551,12 +557,7 @@ public SourceProvider createSourceProvider() { enrichLookupService, lookupFromIndexService, inferenceRunner, - new EsPhysicalOperationProviders( - context.foldCtx(), - contexts, - searchService.getIndicesService().getAnalysis(), - defaultDataPartitioning - ), + physicalOperationProviders, contexts ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index d4544eb0e1efe..713e2e08e7508 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -163,6 +163,29 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { Setting.Property.Dynamic ); + /** + * Tuning parameter for deciding when to use the "merge" stored field loader. + * Think of it as "how similar to a sequential block of documents do I have to + * be before I'll use the merge reader?" So a value of {@code 1} means I have to + * be exactly a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}. + * A value of {@code .2} means we'll use the sequential reader even if we only + * need one in ten documents. + *

+ * The default value of this was experimentally derived using a + * script. + * And a little paranoia. A lower default value was looking good locally, but + * I'm concerned about the implications of effectively using this all the time. + *

+ */ + public static final Setting STORED_FIELDS_SEQUENTIAL_PROPORTION = Setting.doubleSetting( + "index.esql.stored_fields_sequential_proportion", + 0.20, + 0, + 1, + Setting.Property.IndexScope, + Setting.Property.Dynamic + ); + @Override public Collection createComponents(PluginServices services) { CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request"); @@ -226,7 +249,8 @@ public List> getSettings() { ESQL_QUERYLOG_THRESHOLD_INFO_SETTING, ESQL_QUERYLOG_THRESHOLD_WARN_SETTING, ESQL_QUERYLOG_INCLUDE_USER_SETTING, - DEFAULT_DATA_PARTITIONING + DEFAULT_DATA_PARTITIONING, + STORED_FIELDS_SEQUENTIAL_PROPORTION ); }