Skip to content

Commit 75fe33d

Browse files
authored
ESQL: Refactor value reading so it can split Pages (elastic#130573)
This refactors our `ValuesSourceReaderOperator` so it can split pages when it reads large values. It does not *actually* split the pages as that's a bit tricky. But it sets the stage for the next PR that will do so. * Move `ValuesSourceReaderOperator` to it's own package * Move many inner classes into their own top level classes * Extend from `AbstractPageMappingToIteratorOperator` instead of `AbstractPageMappingToOperator` * This allows returning more than one `Page` per input `Page` * In this PR we still always return one `Page` per input `Page` * Make new `ReleasableIterator` subclasses to satisfy `AbstractPageMappingToIteratorOperator` * Change `status` of loading fields from `pages_processed` to `pages_received` and `pages_emitted` * Fix a bug in `AbstractPageMappingToOperator` which can leak circuit breaker allocation if we fail to during `receive`. This isn't possible in the existing implementations but is possible in `ValuesSourceReaderOperator`. * Add a test with large text fields. Right now it still comes back in one page because we don't cut the pages. Closes elastic#130727
1 parent 2b82742 commit 75fe33d

File tree

35 files changed

+1307
-878
lines changed

35 files changed

+1307
-878
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
import org.elasticsearch.compute.data.Page;
4242
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
4343
import org.elasticsearch.compute.lucene.ShardRefCounted;
44-
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
44+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
45+
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
4546
import org.elasticsearch.compute.operator.topn.TopNOperator;
4647
import org.elasticsearch.core.IOUtils;
4748
import org.elasticsearch.index.IndexSettings;
@@ -343,7 +344,7 @@ public void benchmark() {
343344
);
344345
long sum = 0;
345346
for (Page page : pages) {
346-
op.addInput(page);
347+
op.addInput(page.shallowCopy());
347348
switch (name) {
348349
case "long" -> {
349350
LongVector values = op.getOutput().<LongBlock>getBlock(1).asVector();
@@ -411,7 +412,7 @@ public void benchmark() {
411412
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
412413
}
413414
boolean foundStoredFieldLoader = false;
414-
ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
415+
ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
415416
for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
416417
if (e.getKey().indexOf("stored_fields") >= 0) {
417418
foundStoredFieldLoader = true;

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ static TransportVersion def(int id) {
210210
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
211211
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
212212
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62);
213+
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63);
213214
public static final TransportVersion V_9_0_0 = def(9_000_0_09);
214215
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
215216
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11);
@@ -326,9 +327,13 @@ static TransportVersion def(int id) {
326327
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
327328
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
328329
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
330+
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01);
331+
// Below is the first version in 9.2 and NOT in 9.1.
329332
public static final TransportVersion PROJECT_STATE_REGISTRY_RECORDS_DELETIONS = def(9_113_0_00);
330333
public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00);
331334
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_COMPLETION_ADDED = def(9_115_0_00);
335+
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES = def(9_116_0_00);
336+
332337
/*
333338
* STOP! READ THIS FIRST! No, really,
334339
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/compute/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@
3636
exports org.elasticsearch.compute.aggregation.table;
3737
exports org.elasticsearch.compute.data.sort;
3838
exports org.elasticsearch.compute.querydsl.query;
39+
exports org.elasticsearch.compute.lucene.read;
3940
}

0 commit comments

Comments
 (0)