diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 9ac259cfa46e5..2d65375fee26b 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -208,6 +208,7 @@ static TransportVersion def(int id) { public static final TransportVersion SPARSE_VECTOR_FIELD_PRUNING_OPTIONS_8_19 = def(8_841_0_58); public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59); public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60); + public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61); public static final TransportVersion V_9_0_0 = def(9_000_0_09); public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10); public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index d9f56340b458b..01d34b8e1f7c9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -47,6 +48,7 @@ import java.util.function.Supplier; import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; /** * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} @@ -617,18 +619,23 @@ public static class Status extends AbstractPageMappingOperator.Status { Status(StreamInput in) throws IOException { super(in); readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); - valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; + valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeMap(readersBuilt, StreamOutput::writeVInt); - if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + if (supportsValuesLoaded(out.getTransportVersion())) { out.writeVLong(valuesLoaded); } } + private static boolean supportsValuesLoaded(TransportVersion version) { + return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) + || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19); + } + @Override public String getWriteableName() { return ENTRY.name; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index ac4960ce9a134..93c30470c316c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; @@ -37,6 +38,7 @@ import java.util.Optional; import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse implements @@ -120,8 +122,8 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException { } List columns = in.readCollectionAsList(ColumnInfoImpl::new); List pages = in.readCollectionAsList(Page::new); - long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; - long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; + long documentsFound = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0; + long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { profile = in.readOptionalWriteable(Profile::readFrom); } @@ -153,7 +155,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeCollection(columns); out.writeCollection(pages); - if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + if (supportsValuesLoaded(out.getTransportVersion())) { out.writeVLong(documentsFound); out.writeVLong(valuesLoaded); } @@ -166,6 +168,11 @@ public void writeTo(StreamOutput out) throws IOException { } } + private static boolean supportsValuesLoaded(TransportVersion version) { + return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) + || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19); + } + public List columns() { return columns; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java index 4f55a2a6e8cec..ac49213d6151c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.io.stream.StreamInput; @@ -20,6 +21,7 @@ import java.util.List; import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; /** * The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest} @@ -58,7 +60,7 @@ final class ComputeResponse extends TransportResponse { } ComputeResponse(StreamInput in) throws IOException { - if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + if (supportsCompletionInfo(in.getTransportVersion())) { completionInfo = DriverCompletionInfo.readFrom(in); } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { if (in.readBoolean()) { @@ -92,7 +94,7 @@ final class ComputeResponse extends TransportResponse { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + if (supportsCompletionInfo(out.getTransportVersion())) { completionInfo.writeTo(out); } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { out.writeBoolean(true); @@ -111,6 +113,11 @@ public void writeTo(StreamOutput out) throws IOException { } } + private static boolean supportsCompletionInfo(TransportVersion version) { + return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) + || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19); + } + public DriverCompletionInfo getCompletionInfo() { return completionInfo; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java index cdf418df1526b..e21f6d7e44b56 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.operator.DriverCompletionInfo; @@ -19,6 +20,7 @@ import java.util.Map; import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; /** * The compute result of {@link DataNodeRequest} @@ -33,7 +35,7 @@ final class DataNodeComputeResponse extends TransportResponse { } DataNodeComputeResponse(StreamInput in) throws IOException { - if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + if (supportsCompletionInfo(in.getTransportVersion())) { this.completionInfo = DriverCompletionInfo.readFrom(in); this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); return; @@ -49,7 +51,7 @@ final class DataNodeComputeResponse extends TransportResponse { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + if (supportsCompletionInfo(out.getTransportVersion())) { completionInfo.writeTo(out); out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException); return; @@ -65,6 +67,11 @@ public void writeTo(StreamOutput out) throws IOException { new ComputeResponse(completionInfo).writeTo(out); } + private static boolean supportsCompletionInfo(TransportVersion version) { + return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) + || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19); + } + public DriverCompletionInfo completionInfo() { return completionInfo; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 4cc928fe07cb3..6629b0b09d086 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -349,7 +349,7 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config columns, result.pages(), result.completionInfo().documentsFound(), - result.completionInfo().documentsFound(), + result.completionInfo().valuesLoaded(), profile, request.columnar(), asyncExecutionId, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index cb669c2d14937..981384ca70a10 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -312,7 +312,6 @@ protected EsqlQueryResponse mutateInstance(EsqlQueryResponse instance) { } default -> throw new IllegalArgumentException(); } - ; return new EsqlQueryResponse(columns, pages, documentsFound, valuesLoaded, profile, columnar, isAsync, executionInfo); }