Skip to content

Commit 4427964

Browse files
committed
ESQL: Speed loading stored fields
This speeds up loading from stored fields by opting more blocks into the "sequential" strategy. This really kicks in when loading stored fields like `text`. And when you need less than 100% of documents, but more than, say, 10%. This is most useful when you need 99.9% of field documents. That sort of thing. Here's the perf numbers: ``` %100.0 {"took": 403 -> 401,"documents_found":1000000} %099.9 {"took":3990 -> 436,"documents_found": 999000} %099.0 {"took":4069 -> 440,"documents_found": 990000} %090.0 {"took":3468 -> 421,"documents_found": 900000} %030.0 {"took":1213 -> 152,"documents_found": 300000} %020.0 {"took": 766 -> 104,"documents_found": 200000} %010.0 {"took": 397 -> 55,"documents_found": 100000} %009.0 {"took": 352 -> 375,"documents_found": 90000} %008.0 {"took": 304 -> 317,"documents_found": 80000} %007.0 {"took": 273 -> 287,"documents_found": 70000} %005.0 {"took": 199 -> 204,"documents_found": 50000} %001.0 {"took": 46 -> 46,"documents_found": 10000} ``` Let's explain this with an example. First, jump to `main` and load a million documents: ``` rm -f /tmp/bulk for a in {1..1000}; do echo '{"index":{}}' >> /tmp/bulk echo '{"text":"text '$(printf %04d $a)'"}' >> /tmp/bulk done curl -s -uelastic:password -HContent-Type:application/json -XDELETE localhost:9200/test for a in {1..1000}; do echo -n $a: curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_bulk?pretty --data-binary @/tmp/bulk | grep errors done curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_forcemerge?max_num_segments=1 curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_refresh echo ``` Now query them all. Run this a few times until it's stable: ``` echo -n "%100.0 " curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' ``` Now fetch 99.9% of documents: ``` echo -n "%099.9 " curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' ``` This should spit out something like: ``` %100.0 { "took":403,"documents_found":1000000} %099.9 {"took":4098, "documents_found":999000} ``` We're loading *fewer* documents but it's slower! What in the world?! If you dig into the profile you'll see that it's value loading: ``` $ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" }, "profile": true }' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))' { "operator": "ValuesSourceReaderOperator[fields = [text]]", "status": { "readers_built": { "stored_fields[requires_source:true, fields:0, sequential: true]": 222, "text:column_at_a_time:null": 222, "text:row_stride:BlockSourceReader.Bytes": 1 }, "values_loaded": 1000000, "process_nanos": 370687157, "pages_processed": 222, "rows_received": 1000000, "rows_emitted": 1000000 } } $ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" }, "profile": true }' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))' { "operator": "ValuesSourceReaderOperator[fields = [text]]", "status": { "readers_built": { "stored_fields[requires_source:true, fields:0, sequential: false]": 222, "text:column_at_a_time:null": 222, "text:row_stride:BlockSourceReader.Bytes": 1 }, "values_loaded": 999000, "process_nanos": 3965803793, "pages_processed": 222, "rows_received": 999000, "rows_emitted": 999000 } } ``` It jumps from 370ms to almost four seconds! Loading fewer values! The second big difference is in the `stored_fields` marker. In the second on it's `sequential: false` and in the first `sequential: true`. `sequential: true` uses Lucene's "merge" stored fields reader instead of the default one. It's much more optimized at decoding sequences of documents. Previously we only enabled this reader when loading compact sequences of documents - when the entire block looks like ``` 1, 2, 3, 4, 5, ... 1230, 1231 ``` If there are any gaps we wouldn't enable it. That was a very conservative thing we did long ago without doing any experiments. We knew it was faster without any gaps, but not otherwise. It turns out it's a lot faster in a lot more cases. I've measured it as faster for 99% gaps, at least on simple documents. I'm a bit worried that this is too aggressive, so I've set made it configurable and made the default being to use the "merge" loader with 10% gaps. So we'd use the merge loader with a block like: ``` 1, 11, 21, 31, ..., 1231, 1241 ```
1 parent 49a9137 commit 4427964

File tree

10 files changed

+96
-19
lines changed

10 files changed

+96
-19
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.util.NumericUtils;
2626
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
2727
import org.elasticsearch.common.lucene.Lucene;
28+
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.util.BigArrays;
2930
import org.elasticsearch.compute.data.BlockFactory;
3031
import org.elasticsearch.compute.data.BytesRefBlock;
@@ -50,6 +51,7 @@
5051
import org.elasticsearch.index.mapper.MappedFieldType;
5152
import org.elasticsearch.index.mapper.NumberFieldMapper;
5253
import org.elasticsearch.search.lookup.SearchLookup;
54+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
5355
import org.openjdk.jmh.annotations.Benchmark;
5456
import org.openjdk.jmh.annotations.BenchmarkMode;
5557
import org.openjdk.jmh.annotations.Fork;
@@ -336,7 +338,8 @@ public void benchmark() {
336338
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
337339
throw new UnsupportedOperationException("can't load _source here");
338340
})),
339-
0
341+
0,
342+
QueryPragmas.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY)
340343
);
341344
long sum = 0;
342345
for (Page page : pages) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,18 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
7272
* @param shardContexts per-shard loading information
7373
* @param docChannel the channel containing the shard, leaf/segment and doc id
7474
*/
75-
public record Factory(List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) implements OperatorFactory {
75+
public record Factory(List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel, double storedFieldsSequentialProportion)
76+
implements
77+
OperatorFactory {
7678
@Override
7779
public Operator get(DriverContext driverContext) {
78-
return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel);
80+
return new ValuesSourceReaderOperator(
81+
driverContext.blockFactory(),
82+
fields,
83+
shardContexts,
84+
docChannel,
85+
storedFieldsSequentialProportion
86+
);
7987
}
8088

8189
@Override
@@ -113,6 +121,7 @@ public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceL
113121
private final List<ShardContext> shardContexts;
114122
private final int docChannel;
115123
private final BlockFactory blockFactory;
124+
private final double storedFieldsSequentialProportion;
116125

117126
private final Map<String, Integer> readersBuilt = new TreeMap<>();
118127
private long valuesLoaded;
@@ -125,11 +134,18 @@ public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceL
125134
* @param fields fields to load
126135
* @param docChannel the channel containing the shard, leaf/segment and doc id
127136
*/
128-
public ValuesSourceReaderOperator(BlockFactory blockFactory, List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) {
137+
public ValuesSourceReaderOperator(
138+
BlockFactory blockFactory,
139+
List<FieldInfo> fields,
140+
List<ShardContext> shardContexts,
141+
int docChannel,
142+
double storedFieldsSequentialProportion
143+
) {
129144
this.fields = fields.stream().map(f -> new FieldWork(f)).toArray(FieldWork[]::new);
130145
this.shardContexts = shardContexts;
131146
this.docChannel = docChannel;
132147
this.blockFactory = blockFactory;
148+
this.storedFieldsSequentialProportion = storedFieldsSequentialProportion;
133149
}
134150

135151
@Override
@@ -440,7 +456,11 @@ public void close() {
440456
*/
441457
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) {
442458
int count = docs.count();
443-
return count >= SEQUENTIAL_BOUNDARY && docs.get(count - 1) - docs.get(0) == count - 1;
459+
if (count < SEQUENTIAL_BOUNDARY) {
460+
return false;
461+
}
462+
int range = docs.get(count - 1) - docs.get(0);
463+
return range * storedFieldsSequentialProportion < count - 1;
444464
}
445465

446466
private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public record OrdinalsGroupingOperatorFactory(
6363
int docChannel,
6464
String groupingField,
6565
List<Factory> aggregators,
66-
int maxPageSize
66+
int maxPageSize,
67+
double storedFieldsSequentialProportion
6768
) implements OperatorFactory {
6869

6970
@Override
@@ -76,6 +77,7 @@ public Operator get(DriverContext driverContext) {
7677
groupingField,
7778
aggregators,
7879
maxPageSize,
80+
storedFieldsSequentialProportion,
7981
driverContext
8082
);
8183
}
@@ -94,6 +96,7 @@ public String describe() {
9496
private final List<Factory> aggregatorFactories;
9597
private final ElementType groupingElementType;
9698
private final Map<SegmentID, OrdinalSegmentAggregator> ordinalAggregators;
99+
private final double storedFieldsSequentialProportion;
97100

98101
private final DriverContext driverContext;
99102

@@ -111,6 +114,7 @@ public OrdinalsGroupingOperator(
111114
String groupingField,
112115
List<GroupingAggregator.Factory> aggregatorFactories,
113116
int maxPageSize,
117+
double storedFieldsSequentialProportion,
114118
DriverContext driverContext
115119
) {
116120
Objects.requireNonNull(aggregatorFactories);
@@ -122,6 +126,7 @@ public OrdinalsGroupingOperator(
122126
this.aggregatorFactories = aggregatorFactories;
123127
this.ordinalAggregators = new HashMap<>();
124128
this.maxPageSize = maxPageSize;
129+
this.storedFieldsSequentialProportion = storedFieldsSequentialProportion;
125130
this.driverContext = driverContext;
126131
}
127132

@@ -171,6 +176,7 @@ public void addInput(Page page) {
171176
channelIndex,
172177
aggregatorFactories,
173178
maxPageSize,
179+
storedFieldsSequentialProportion,
174180
driverContext
175181
);
176182
}
@@ -485,6 +491,7 @@ boolean next() throws IOException {
485491
private static class ValuesAggregator implements Releasable {
486492
private final ValuesSourceReaderOperator extractor;
487493
private final HashAggregationOperator aggregator;
494+
private final double storedFieldsSequentialProportion;
488495

489496
ValuesAggregator(
490497
IntFunction<BlockLoader> blockLoaders,
@@ -495,13 +502,15 @@ private static class ValuesAggregator implements Releasable {
495502
int channelIndex,
496503
List<GroupingAggregator.Factory> aggregatorFactories,
497504
int maxPageSize,
505+
double storedFieldsSequentialProportion,
498506
DriverContext driverContext
499507
) {
500508
this.extractor = new ValuesSourceReaderOperator(
501509
driverContext.blockFactory(),
502510
List.of(new ValuesSourceReaderOperator.FieldInfo(groupingField, groupingElementType, blockLoaders)),
503511
shardContexts,
504-
docChannel
512+
docChannel,
513+
storedFieldsSequentialProportion
505514
);
506515
this.aggregator = new HashAggregationOperator(
507516
aggregatorFactories,
@@ -513,6 +522,7 @@ private static class ValuesAggregator implements Releasable {
513522
),
514523
driverContext
515524
);
525+
this.storedFieldsSequentialProportion = storedFieldsSequentialProportion;
516526
}
517527

518528
void addInput(Page page) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ public String toString() {
204204
gField,
205205
List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(INITIAL, List.of(1))),
206206
randomPageSize(),
207+
0.1,
207208
driverContext
208209
)
209210
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,8 @@ private static Operator extractFieldsOperator(
409409
driverContext.blockFactory(),
410410
fields,
411411
List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)),
412-
0
412+
0,
413+
0.1
413414
);
414415
}
415416

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,19 @@ public interface ShardContext extends org.elasticsearch.compute.lucene.ShardCont
103103

104104
private final List<ShardContext> shardContexts;
105105
private final DataPartitioning defaultDataPartitioning;
106+
private final double storedFieldsSequentialProportion;
106107

107108
public EsPhysicalOperationProviders(
108109
FoldContext foldContext,
109110
List<ShardContext> shardContexts,
110111
AnalysisRegistry analysisRegistry,
111-
DataPartitioning defaultDataPartitioning
112+
DataPartitioning defaultDataPartitioning,
113+
double storedFieldsSequentialProportion
112114
) {
113115
super(foldContext, analysisRegistry);
114116
this.shardContexts = shardContexts;
115117
this.defaultDataPartitioning = defaultDataPartitioning;
118+
this.storedFieldsSequentialProportion = storedFieldsSequentialProportion;
116119
}
117120

118121
@Override
@@ -132,7 +135,10 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
132135
IntFunction<BlockLoader> loader = s -> getBlockLoaderFor(s, attr, fieldExtractPreference);
133136
fields.add(new ValuesSourceReaderOperator.FieldInfo(getFieldName(attr), elementType, loader));
134137
}
135-
return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build());
138+
return source.with(
139+
new ValuesSourceReaderOperator.Factory(fields, readers, docChannel, storedFieldsSequentialProportion),
140+
layout.build()
141+
);
136142
}
137143

138144
private static String getFieldName(Attribute attr) {
@@ -278,7 +284,8 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory(
278284
docChannel,
279285
attrSource.name(),
280286
aggregatorFactories,
281-
context.pageSize(aggregateExec.estimatedRowSize())
287+
context.pageSize(aggregateExec.estimatedRowSize()),
288+
storedFieldsSequentialProportion
282289
);
283290
}
284291

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,13 @@ public SourceProvider createSourceProvider() {
535535
new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter())
536536
);
537537
}
538+
EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders(
539+
context.foldCtx(),
540+
contexts,
541+
searchService.getIndicesService().getAnalysis(),
542+
defaultDataPartitioning,
543+
context.configuration().pragmas().storedFieldsSequentialProportion()
544+
);
538545
final List<Driver> drivers;
539546
try {
540547
LocalExecutionPlanner planner = new LocalExecutionPlanner(
@@ -550,12 +557,7 @@ public SourceProvider createSourceProvider() {
550557
enrichLookupService,
551558
lookupFromIndexService,
552559
inferenceRunner,
553-
new EsPhysicalOperationProviders(
554-
context.foldCtx(),
555-
contexts,
556-
searchService.getIndicesService().getAnalysis(),
557-
defaultDataPartitioning
558-
),
560+
physicalOperationProviders,
559561
contexts
560562
);
561563

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,27 @@ public final class QueryPragmas implements Writeable {
7373

7474
public static final Setting<ByteSizeValue> FOLD_LIMIT = Setting.memorySizeSetting("fold_limit", "5%");
7575

76+
/**
77+
* Tuning parameter for deciding when to use the "merge" stored field loader.
78+
* Think of it as "how similar to a sequential block of documents do I have to
79+
* be before I'll use the merge reader?" So a value of {@code 1} means I have to
80+
* be <strong>exactly</strong> a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}.
81+
* A value of {@code .1} means we'll use the sequential reader even if we only
82+
* need one in ten documents.
83+
* <p>
84+
* The default value of this was experimentally derived using a
85+
* <a href="https://gist.github.com/nik9000/4e84ff5a76b86890e540d4a381606a55">script</a>.
86+
* And a little paranoia. A lower default value was looking good locally, but
87+
* I'm concerned about the implications of effectively using this all the time.
88+
* </p>
89+
*/
90+
public static final Setting<Double> STORED_FIELDS_SEQUENTIAL_PROPORTION = Setting.doubleSetting(
91+
"stored_fields_sequential_proportion",
92+
0.10,
93+
0,
94+
1
95+
);
96+
7697
public static final Setting<MappedFieldType.FieldExtractPreference> FIELD_EXTRACT_PREFERENCE = Setting.enumSetting(
7798
MappedFieldType.FieldExtractPreference.class,
7899
"field_extract_preference",
@@ -120,6 +141,18 @@ public int taskConcurrency() {
120141
return TASK_CONCURRENCY.get(settings);
121142
}
122143

144+
/**
145+
* Tuning parameter for deciding when to use the "merge" stored field loader.
146+
* Think of it as "how similar to a sequential block of documents do I have to
147+
* be before I'll use the merge reader?" So a value of {@code 1} means I have to
148+
* be <strong>exactly</strong> a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}.
149+
* A value of {@code .1} means significant gaps are allowed and we'll still use the
150+
* sequential reader.
151+
*/
152+
public double storedFieldsSequentialProportion() {
153+
return STORED_FIELDS_SEQUENTIAL_PROPORTION.get(settings);
154+
}
155+
123156
/**
124157
* Size of a page in entries with {@code 0} being a special value asking
125158
* to adaptively size based on the number of columns in the page.

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7681,7 +7681,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
76817681
null,
76827682
null,
76837683
null,
7684-
new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO),
7684+
new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO, 0.1),
76857685
List.of()
76867686
);
76877687

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ private Configuration config() {
257257
}
258258

259259
private EsPhysicalOperationProviders esPhysicalOperationProviders(List<EsPhysicalOperationProviders.ShardContext> shardContexts) {
260-
return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO);
260+
return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO, 0.1);
261261
}
262262

263263
private List<EsPhysicalOperationProviders.ShardContext> createShardContexts() throws IOException {

0 commit comments

Comments
 (0)