Skip to content

Commit fb65eb0

Browse files
committed
Index setting
1 parent dd52c87 commit fb65eb0

File tree

16 files changed

+395
-156
lines changed

16 files changed

+395
-156
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.elasticsearch.index.mapper.MappedFieldType;
5252
import org.elasticsearch.index.mapper.NumberFieldMapper;
5353
import org.elasticsearch.search.lookup.SearchLookup;
54-
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
54+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5555
import org.openjdk.jmh.annotations.Benchmark;
5656
import org.openjdk.jmh.annotations.BenchmarkMode;
5757
import org.openjdk.jmh.annotations.Fork;
@@ -337,9 +337,8 @@ public void benchmark() {
337337
fields(name),
338338
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
339339
throw new UnsupportedOperationException("can't load _source here");
340-
})),
341-
0,
342-
QueryPragmas.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY)
340+
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
341+
0
343342
);
344343
long sum = 0;
345344
for (Page page : pages) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,10 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
7272
* @param shardContexts per-shard loading information
7373
* @param docChannel the channel containing the shard, leaf/segment and doc id
7474
*/
75-
public record Factory(List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel, double storedFieldsSequentialProportion)
76-
implements
77-
OperatorFactory {
75+
public record Factory(List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) implements OperatorFactory {
7876
@Override
7977
public Operator get(DriverContext driverContext) {
80-
return new ValuesSourceReaderOperator(
81-
driverContext.blockFactory(),
82-
fields,
83-
shardContexts,
84-
docChannel,
85-
storedFieldsSequentialProportion
86-
);
78+
return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel);
8779
}
8880

8981
@Override
@@ -115,13 +107,12 @@ public String describe() {
115107
*/
116108
public record FieldInfo(String name, ElementType type, IntFunction<BlockLoader> blockLoader) {}
117109

118-
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader) {}
110+
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
119111

120112
private final FieldWork[] fields;
121113
private final List<ShardContext> shardContexts;
122114
private final int docChannel;
123115
private final BlockFactory blockFactory;
124-
private final double storedFieldsSequentialProportion;
125116

126117
private final Map<String, Integer> readersBuilt = new TreeMap<>();
127118
private long valuesLoaded;
@@ -134,18 +125,11 @@ public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceL
134125
* @param fields fields to load
135126
* @param docChannel the channel containing the shard, leaf/segment and doc id
136127
*/
137-
public ValuesSourceReaderOperator(
138-
BlockFactory blockFactory,
139-
List<FieldInfo> fields,
140-
List<ShardContext> shardContexts,
141-
int docChannel,
142-
double storedFieldsSequentialProportion
143-
) {
128+
public ValuesSourceReaderOperator(BlockFactory blockFactory, List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) {
144129
this.fields = fields.stream().map(f -> new FieldWork(f)).toArray(FieldWork[]::new);
145130
this.shardContexts = shardContexts;
146131
this.docChannel = docChannel;
147132
this.blockFactory = blockFactory;
148-
this.storedFieldsSequentialProportion = storedFieldsSequentialProportion;
149133
}
150134

151135
@Override
@@ -263,8 +247,9 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
263247
}
264248

265249
SourceLoader sourceLoader = null;
250+
ShardContext shardContext = shardContexts.get(shard);
266251
if (storedFieldsSpec.requiresSource()) {
267-
sourceLoader = shardContexts.get(shard).newSourceLoader.get();
252+
sourceLoader = shardContext.newSourceLoader.get();
268253
storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
269254
}
270255

@@ -277,7 +262,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
277262
);
278263
}
279264
StoredFieldLoader storedFieldLoader;
280-
if (useSequentialStoredFieldsReader(docs)) {
265+
if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) {
281266
storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec);
282267
trackStoredFields(storedFieldsSpec, true);
283268
} else {
@@ -454,13 +439,13 @@ public void close() {
454439
* Is it more efficient to use a sequential stored field reader
455440
* when reading stored fields for the documents contained in {@code docIds}?
456441
*/
457-
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) {
442+
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) {
458443
int count = docs.count();
459444
if (count < SEQUENTIAL_BOUNDARY) {
460445
return false;
461446
}
462447
int range = docs.get(count - 1) - docs.get(0);
463-
return range * storedFieldsSequentialProportion < count - 1;
448+
return range * storedFieldsSequentialProportion <= count;
464449
}
465450

466451
private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public record OrdinalsGroupingOperatorFactory(
6363
int docChannel,
6464
String groupingField,
6565
List<Factory> aggregators,
66-
int maxPageSize,
67-
double storedFieldsSequentialProportion
66+
int maxPageSize
6867
) implements OperatorFactory {
6968

7069
@Override
@@ -77,7 +76,6 @@ public Operator get(DriverContext driverContext) {
7776
groupingField,
7877
aggregators,
7978
maxPageSize,
80-
storedFieldsSequentialProportion,
8179
driverContext
8280
);
8381
}
@@ -96,7 +94,6 @@ public String describe() {
9694
private final List<Factory> aggregatorFactories;
9795
private final ElementType groupingElementType;
9896
private final Map<SegmentID, OrdinalSegmentAggregator> ordinalAggregators;
99-
private final double storedFieldsSequentialProportion;
10097

10198
private final DriverContext driverContext;
10299

@@ -114,7 +111,6 @@ public OrdinalsGroupingOperator(
114111
String groupingField,
115112
List<GroupingAggregator.Factory> aggregatorFactories,
116113
int maxPageSize,
117-
double storedFieldsSequentialProportion,
118114
DriverContext driverContext
119115
) {
120116
Objects.requireNonNull(aggregatorFactories);
@@ -126,7 +122,6 @@ public OrdinalsGroupingOperator(
126122
this.aggregatorFactories = aggregatorFactories;
127123
this.ordinalAggregators = new HashMap<>();
128124
this.maxPageSize = maxPageSize;
129-
this.storedFieldsSequentialProportion = storedFieldsSequentialProportion;
130125
this.driverContext = driverContext;
131126
}
132127

@@ -176,7 +171,6 @@ public void addInput(Page page) {
176171
channelIndex,
177172
aggregatorFactories,
178173
maxPageSize,
179-
storedFieldsSequentialProportion,
180174
driverContext
181175
);
182176
}
@@ -491,7 +485,6 @@ boolean next() throws IOException {
491485
private static class ValuesAggregator implements Releasable {
492486
private final ValuesSourceReaderOperator extractor;
493487
private final HashAggregationOperator aggregator;
494-
private final double storedFieldsSequentialProportion;
495488

496489
ValuesAggregator(
497490
IntFunction<BlockLoader> blockLoaders,
@@ -502,15 +495,13 @@ private static class ValuesAggregator implements Releasable {
502495
int channelIndex,
503496
List<GroupingAggregator.Factory> aggregatorFactories,
504497
int maxPageSize,
505-
double storedFieldsSequentialProportion,
506498
DriverContext driverContext
507499
) {
508500
this.extractor = new ValuesSourceReaderOperator(
509501
driverContext.blockFactory(),
510502
List.of(new ValuesSourceReaderOperator.FieldInfo(groupingField, groupingElementType, blockLoaders)),
511503
shardContexts,
512-
docChannel,
513-
storedFieldsSequentialProportion
504+
docChannel
514505
);
515506
this.aggregator = new HashAggregationOperator(
516507
aggregatorFactories,
@@ -522,7 +513,6 @@ private static class ValuesAggregator implements Releasable {
522513
),
523514
driverContext
524515
);
525-
this.storedFieldsSequentialProportion = storedFieldsSequentialProportion;
526516
}
527517

528518
void addInput(Page page) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,12 @@ public String toString() {
198198
operators.add(
199199
new OrdinalsGroupingOperator(
200200
shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(mockBlContext()),
201-
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
201+
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
202202
ElementType.BYTES_REF,
203203
0,
204204
gField,
205205
List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(INITIAL, List.of(1))),
206206
randomPageSize(),
207-
0.1,
208207
driverContext
209208
)
210209
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,8 @@ private List<Page> runQuery(Set<String> values, Query query, boolean shuffleDocs
208208
),
209209
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
210210
throw new UnsupportedOperationException();
211-
})),
212-
0,
213-
0.1
211+
}, 0.2)),
212+
0
214213
)
215214
);
216215
LuceneQueryEvaluator.ShardConfig[] shards = new LuceneQueryEvaluator.ShardConfig[] {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@
108108
import java.util.stream.IntStream;
109109
import java.util.stream.LongStream;
110110

111-
import static org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorTests.STORED_FIELDS_SEQUENTIAL_PROPORTIONS;
112111
import static org.elasticsearch.test.MapMatcher.assertMap;
113112
import static org.elasticsearch.test.MapMatcher.matchesMap;
114113
import static org.elasticsearch.xpack.esql.core.type.DataType.IP;
@@ -201,7 +200,7 @@ private MapperService mapperService(String indexKey) {
201200
private List<ValuesSourceReaderOperator.ShardContext> initShardContexts() {
202201
return INDICES.keySet()
203202
.stream()
204-
.map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE))
203+
.map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE, 0.2))
205204
.toList();
206205
}
207206

@@ -240,7 +239,7 @@ private static Operator.OperatorFactory factory(
240239
fail("unexpected shardIdx [" + shardIdx + "]");
241240
}
242241
return loader;
243-
})), shardContexts, 0, STORED_FIELDS_SEQUENTIAL_PROPORTIONS);
242+
})), shardContexts, 0);
244243
}
245244

246245
protected SourceOperator simpleInput(DriverContext context, int size) {
@@ -489,8 +488,7 @@ public void testManySingleDocPages() {
489488
new ValuesSourceReaderOperator.Factory(
490489
List.of(testCase.info, fieldInfo(mapperService(indexKey).fieldType("key"), ElementType.INT)),
491490
shardContexts,
492-
0,
493-
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
491+
0
494492
).get(driverContext)
495493
);
496494
List<Page> results = drive(operators, input.iterator(), driverContext);
@@ -600,8 +598,7 @@ private void loadSimpleAndAssert(
600598
fieldInfo(mapperService("index1").fieldType("indexKey"), ElementType.BYTES_REF)
601599
),
602600
shardContexts,
603-
0,
604-
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
601+
0
605602
).get(driverContext)
606603
);
607604
List<FieldCase> tests = new ArrayList<>();
@@ -610,12 +607,7 @@ private void loadSimpleAndAssert(
610607
cases.removeAll(b);
611608
tests.addAll(b);
612609
operators.add(
613-
new ValuesSourceReaderOperator.Factory(
614-
b.stream().map(i -> i.info).toList(),
615-
shardContexts,
616-
0,
617-
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
618-
).get(driverContext)
610+
new ValuesSourceReaderOperator.Factory(b.stream().map(i -> i.info).toList(), shardContexts, 0).get(driverContext)
619611
);
620612
}
621613
List<Page> results = drive(operators, input.iterator(), driverContext);
@@ -717,11 +709,7 @@ private void testLoadAllStatus(boolean allInOnePage) {
717709
Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING
718710
);
719711
List<Operator> operators = cases.stream()
720-
.map(
721-
i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0, STORED_FIELDS_SEQUENTIAL_PROPORTIONS).get(
722-
driverContext
723-
)
724-
)
712+
.map(i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0).get(driverContext))
725713
.toList();
726714
if (allInOnePage) {
727715
input = List.of(CannedSourceOperator.mergePages(input));
@@ -1309,7 +1297,7 @@ public void testWithNulls() throws IOException {
13091297
LuceneOperator.NO_LIMIT,
13101298
false // no scoring
13111299
);
1312-
var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE);
1300+
var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2);
13131301
try (
13141302
Driver driver = TestDriverFactory.create(
13151303
driverContext,
@@ -1397,8 +1385,7 @@ public void testNullsShared() {
13971385
new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS)
13981386
),
13991387
shardContexts,
1400-
0,
1401-
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
1388+
0
14021389
).get(driverContext)
14031390
),
14041391
new PageConsumerOperator(page -> {
@@ -1428,9 +1415,8 @@ public void testDescriptionOfMany() throws IOException {
14281415

14291416
ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory(
14301417
cases.stream().map(c -> c.info).toList(),
1431-
List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE)),
1432-
0,
1433-
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
1418+
List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
1419+
0
14341420
);
14351421
assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]"));
14361422
try (Operator op = factory.get(driverContext())) {
@@ -1457,7 +1443,9 @@ public void testManyShards() throws IOException {
14571443
List<ValuesSourceReaderOperator.ShardContext> readerShardContexts = new ArrayList<>();
14581444
for (int s = 0; s < shardCount; s++) {
14591445
contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s));
1460-
readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE));
1446+
readerShardContexts.add(
1447+
new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE, 0.2)
1448+
);
14611449
}
14621450
var luceneFactory = new LuceneSourceOperator.Factory(
14631451
contexts,
@@ -1476,8 +1464,7 @@ public void testManyShards() throws IOException {
14761464
return ft.blockLoader(blContext());
14771465
})),
14781466
readerShardContexts,
1479-
0,
1480-
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
1467+
0
14811468
);
14821469
DriverContext driverContext = driverContext();
14831470
List<Page> results = drive(

0 commit comments

Comments
 (0)