Skip to content

Commit 4dae263

Browse files
authored
ESQL: Refactor value reading so it can split Pages (elastic#130573) (elastic#130784)
This refactors our `ValuesSourceReaderOperator` so it can split pages when it reads large values. It does not *actually* split the pages as that's a bit tricky. But it sets the stage for the next PR that will do so. * Move `ValuesSourceReaderOperator` to it's own package * Move many inner classes into their own top level classes * Extend from `AbstractPageMappingToIteratorOperator` instead of `AbstractPageMappingToOperator` * This allows returning more than one `Page` per input `Page` * In this PR we still always return one `Page` per input `Page` * Make new `ReleasableIterator` subclasses to satisfy `AbstractPageMappingToIteratorOperator` * Change `status` of loading fields from `pages_processed` to `pages_received` and `pages_emitted` * Fix a bug in `AbstractPageMappingToOperator` which can leak circuit breaker allocation if we fail to during `receive`. This isn't possible in the existing implementations but is possible in `ValuesSourceReaderOperator`. * Add a test with large text fields. Right now it still comes back in one page because we don't cut the pages. Closes elastic#130727
1 parent 258fdcc commit 4dae263

File tree

36 files changed

+1308
-872
lines changed

36 files changed

+1308
-872
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
import org.elasticsearch.compute.data.LongVector;
4141
import org.elasticsearch.compute.data.Page;
4242
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
43-
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
43+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
44+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
4445
import org.elasticsearch.compute.operator.topn.TopNOperator;
4546
import org.elasticsearch.core.IOUtils;
4647
import org.elasticsearch.index.IndexSettings;
@@ -371,7 +372,7 @@ public void benchmark() {
371372
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
372373
}
373374
boolean foundStoredFieldLoader = false;
374-
ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
375+
ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
375376
for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
376377
if (e.getKey().indexOf("stored_fields") >= 0) {
377378
foundStoredFieldLoader = true;

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ static TransportVersion def(int id) {
255255
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
256256
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
257257
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62);
258+
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63);
258259

259260
/*
260261
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@
3636
exports org.elasticsearch.compute.aggregation.table;
3737
exports org.elasticsearch.compute.data.sort;
3838
exports org.elasticsearch.compute.querydsl.query;
39+
exports org.elasticsearch.compute.lucene.read;
3940
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 0 additions & 778 deletions
This file was deleted.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene.read;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.BytesRefBlock;
14+
import org.elasticsearch.core.Releasable;
15+
16+
class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
17+
private final int pageSize;
18+
private Block nullBlock;
19+
20+
ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
21+
super(factory);
22+
this.pageSize = pageSize;
23+
}
24+
25+
@Override
26+
public Block constantNulls() {
27+
if (nullBlock == null) {
28+
nullBlock = factory.newConstantNullBlock(pageSize);
29+
}
30+
nullBlock.incRef();
31+
return nullBlock;
32+
}
33+
34+
@Override
35+
public void close() {
36+
if (nullBlock != null) {
37+
nullBlock.close();
38+
}
39+
}
40+
41+
@Override
42+
public BytesRefBlock constantBytes(BytesRef value) {
43+
return factory.newConstantBytesRefBlockWith(value, pageSize);
44+
}
45+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene.read;
9+
10+
import org.apache.lucene.index.SortedDocValues;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.ElementType;
14+
import org.elasticsearch.compute.data.SingletonOrdinalsBuilder;
15+
import org.elasticsearch.index.mapper.BlockLoader;
16+
17+
public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
18+
protected final BlockFactory factory;
19+
20+
protected DelegatingBlockLoaderFactory(BlockFactory factory) {
21+
this.factory = factory;
22+
}
23+
24+
@Override
25+
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
26+
return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
27+
}
28+
29+
@Override
30+
public BlockLoader.BooleanBuilder booleans(int expectedCount) {
31+
return factory.newBooleanBlockBuilder(expectedCount);
32+
}
33+
34+
@Override
35+
public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) {
36+
return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
37+
}
38+
39+
@Override
40+
public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) {
41+
return factory.newBytesRefBlockBuilder(expectedCount);
42+
}
43+
44+
@Override
45+
public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
46+
return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
47+
}
48+
49+
@Override
50+
public BlockLoader.DoubleBuilder doubles(int expectedCount) {
51+
return factory.newDoubleBlockBuilder(expectedCount);
52+
}
53+
54+
@Override
55+
public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) {
56+
return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions);
57+
}
58+
59+
@Override
60+
public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) {
61+
return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
62+
}
63+
64+
@Override
65+
public BlockLoader.IntBuilder ints(int expectedCount) {
66+
return factory.newIntBlockBuilder(expectedCount);
67+
}
68+
69+
@Override
70+
public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) {
71+
return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
72+
}
73+
74+
@Override
75+
public BlockLoader.LongBuilder longs(int expectedCount) {
76+
return factory.newLongBlockBuilder(expectedCount);
77+
}
78+
79+
@Override
80+
public BlockLoader.Builder nulls(int expectedCount) {
81+
return ElementType.NULL.newBlockBuilder(expectedCount, factory);
82+
}
83+
84+
@Override
85+
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
86+
return new SingletonOrdinalsBuilder(factory, ordinals, count);
87+
}
88+
89+
@Override
90+
public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
91+
return factory.newAggregateMetricDoubleBlockBuilder(count);
92+
}
93+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene.read;
9+
10+
import org.apache.lucene.index.LeafReaderContext;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.DocVector;
13+
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
16+
import org.elasticsearch.index.mapper.BlockLoader;
17+
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
18+
import org.elasticsearch.index.mapper.SourceLoader;
19+
import org.elasticsearch.search.fetch.StoredFieldsSpec;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* Loads values from a many leaves. Much less efficient than {@link ValuesFromSingleReader}.
25+
*/
26+
class ValuesFromManyReader extends ValuesReader {
27+
private final int[] forwards;
28+
private final int[] backwards;
29+
private final BlockLoader.RowStrideReader[] rowStride;
30+
31+
private BlockLoaderStoredFieldsFromLeafLoader storedFields;
32+
33+
ValuesFromManyReader(ValuesSourceReaderOperator operator, DocVector docs) {
34+
super(operator, docs);
35+
forwards = docs.shardSegmentDocMapForwards();
36+
backwards = docs.shardSegmentDocMapBackwards();
37+
rowStride = new BlockLoader.RowStrideReader[operator.fields.length];
38+
}
39+
40+
@Override
41+
protected void load(Block[] target, int offset) throws IOException {
42+
try (Run run = new Run(target)) {
43+
run.run(offset);
44+
}
45+
}
46+
47+
class Run implements Releasable {
48+
private final Block[] target;
49+
private final Block.Builder[][] builders;
50+
private final BlockLoader[][] converters;
51+
private final Block.Builder[] fieldTypeBuilders;
52+
53+
Run(Block[] target) {
54+
this.target = target;
55+
fieldTypeBuilders = new Block.Builder[target.length];
56+
builders = new Block.Builder[target.length][operator.shardContexts.size()];
57+
converters = new BlockLoader[target.length][operator.shardContexts.size()];
58+
}
59+
60+
void run(int offset) throws IOException {
61+
assert offset == 0; // TODO allow non-0 offset to support splitting pages
62+
for (int f = 0; f < operator.fields.length; f++) {
63+
/*
64+
* Important note: each field has a desired type, which might not match the mapped type (in the case of union-types).
65+
* We create the final block builders using the desired type, one for each field, but then also use inner builders
66+
* (one for each field and shard), and converters (again one for each field and shard) to actually perform the field
67+
* loading in a way that is correct for the mapped field type, and then convert between that type and the desired type.
68+
*/
69+
fieldTypeBuilders[f] = operator.fields[f].info.type().newBlockBuilder(docs.getPositionCount(), operator.blockFactory);
70+
builders[f] = new Block.Builder[operator.shardContexts.size()];
71+
converters[f] = new BlockLoader[operator.shardContexts.size()];
72+
}
73+
try (
74+
ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.getPositionCount())
75+
) {
76+
int p = forwards[offset];
77+
int shard = docs.shards().getInt(p);
78+
int segment = docs.segments().getInt(p);
79+
int firstDoc = docs.docs().getInt(p);
80+
operator.positionFieldWork(shard, segment, firstDoc);
81+
LeafReaderContext ctx = operator.ctx(shard, segment);
82+
fieldsMoved(ctx, shard);
83+
verifyBuilders(loaderBlockFactory, shard);
84+
read(firstDoc, shard);
85+
86+
int i = offset + 1;
87+
while (i < forwards.length) {
88+
p = forwards[i];
89+
shard = docs.shards().getInt(p);
90+
segment = docs.segments().getInt(p);
91+
boolean changedSegment = operator.positionFieldWorkDocGuaranteedAscending(shard, segment);
92+
if (changedSegment) {
93+
ctx = operator.ctx(shard, segment);
94+
fieldsMoved(ctx, shard);
95+
}
96+
verifyBuilders(loaderBlockFactory, shard);
97+
read(docs.docs().getInt(p), shard);
98+
i++;
99+
}
100+
buildBlocks();
101+
}
102+
}
103+
104+
private void buildBlocks() {
105+
for (int f = 0; f < target.length; f++) {
106+
for (int s = 0; s < operator.shardContexts.size(); s++) {
107+
if (builders[f][s] != null) {
108+
try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) {
109+
fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount());
110+
}
111+
}
112+
}
113+
try (Block targetBlock = fieldTypeBuilders[f].build()) {
114+
target[f] = targetBlock.filter(backwards);
115+
}
116+
operator.sanityCheckBlock(rowStride[f], backwards.length, target[f], f);
117+
}
118+
}
119+
120+
private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) {
121+
for (int f = 0; f < operator.fields.length; f++) {
122+
if (builders[f][shard] == null) {
123+
// Note that this relies on field.newShard() to set the loader and converter correctly for the current shard
124+
builders[f][shard] = (Block.Builder) operator.fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount());
125+
converters[f][shard] = operator.fields[f].loader;
126+
}
127+
}
128+
}
129+
130+
private void read(int doc, int shard) throws IOException {
131+
storedFields.advanceTo(doc);
132+
for (int f = 0; f < builders.length; f++) {
133+
rowStride[f].read(doc, storedFields, builders[f][shard]);
134+
}
135+
}
136+
137+
@Override
138+
public void close() {
139+
Releasables.closeExpectNoException(fieldTypeBuilders);
140+
for (int f = 0; f < operator.fields.length; f++) {
141+
Releasables.closeExpectNoException(builders[f]);
142+
}
143+
}
144+
}
145+
146+
private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException {
147+
StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
148+
for (int f = 0; f < operator.fields.length; f++) {
149+
ValuesSourceReaderOperator.FieldWork field = operator.fields[f];
150+
rowStride[f] = field.rowStride(ctx);
151+
storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec());
152+
}
153+
SourceLoader sourceLoader = null;
154+
if (storedFieldsSpec.requiresSource()) {
155+
sourceLoader = operator.shardContexts.get(shard).newSourceLoader().get();
156+
storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
157+
}
158+
storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
159+
StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null),
160+
sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
161+
);
162+
if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
163+
operator.trackStoredFields(storedFieldsSpec, false);
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)