Skip to content

Commit 2f0844c

Browse files
authored
ESQL: Refactor value reading so it can split Pages (#130573) (#130781)
This refactors our `ValuesSourceReaderOperator` so it can split pages when it reads large values. It does not *actually* split the pages as that's a bit tricky. But it sets the stage for the next PR that will do so. * Move `ValuesSourceReaderOperator` to it's own package * Move many inner classes into their own top level classes * Extend from `AbstractPageMappingToIteratorOperator` instead of `AbstractPageMappingToOperator` * This allows returning more than one `Page` per input `Page` * In this PR we still always return one `Page` per input `Page` * Make new `ReleasableIterator` subclasses to satisfy `AbstractPageMappingToIteratorOperator` * Change `status` of loading fields from `pages_processed` to `pages_received` and `pages_emitted` * Fix a bug in `AbstractPageMappingToOperator` which can leak circuit breaker allocation if we fail to during `receive`. This isn't possible in the existing implementations but is possible in `ValuesSourceReaderOperator`. * Add a test with large text fields. Right now it still comes back in one page because we don't cut the pages. Closes #130727 Also includes "Claim backported profile versions (#130187)"
1 parent 2677424 commit 2f0844c

File tree

37 files changed

+1312
-884
lines changed

37 files changed

+1312
-884
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
import org.elasticsearch.compute.data.Page;
4242
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
4343
import org.elasticsearch.compute.lucene.ShardRefCounted;
44-
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
44+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
45+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
4546
import org.elasticsearch.compute.operator.topn.TopNOperator;
4647
import org.elasticsearch.core.IOUtils;
4748
import org.elasticsearch.index.IndexSettings;
@@ -343,7 +344,7 @@ public void benchmark() {
343344
);
344345
long sum = 0;
345346
for (Page page : pages) {
346-
op.addInput(page);
347+
op.addInput(page.shallowCopy());
347348
switch (name) {
348349
case "long" -> {
349350
LongVector values = op.getOutput().<LongBlock>getBlock(1).asVector();
@@ -411,7 +412,7 @@ public void benchmark() {
411412
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
412413
}
413414
boolean foundStoredFieldLoader = false;
414-
ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
415+
ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
415416
for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
416417
if (e.getKey().indexOf("stored_fields") >= 0) {
417418
foundStoredFieldLoader = true;

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ static TransportVersion def(int id) {
209209
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59);
210210
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
211211
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
212+
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62);
213+
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63);
212214
public static final TransportVersion V_9_0_0 = def(9_000_0_09);
213215
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
214216
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11);
@@ -325,6 +327,7 @@ static TransportVersion def(int id) {
325327
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
326328
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
327329
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
330+
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01);
328331

329332
/*
330333
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@
3636
exports org.elasticsearch.compute.aggregation.table;
3737
exports org.elasticsearch.compute.data.sort;
3838
exports org.elasticsearch.compute.querydsl.query;
39+
exports org.elasticsearch.compute.lucene.read;
3940
}

0 commit comments

Comments
 (0)