From a4e72ad15c4ec591df60ddf272cf11ef0f6400e5 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 2 Dec 2025 14:35:53 +0100 Subject: [PATCH 1/5] ESQL: Fix enrich and lookup join resolution based on min transport version (#137431) When deciding which types are supported, we did not use the correct minimum transport version during the enrich resolution in case of CCS and ROW queries. What's more, the EnrichPolicyResolver did not account for the fact that the node requesting resolution might be on a version that doesn't support the types in the resolved mapping, which led to serialization bugs surfacing when trying to enable the DATE_RANGE type. - Initialize the minimum transport version with the minimum version from the cluster state before any resolution steps. That makes ROW queries correct. - Send the determined minimum transport version along the enrich resolution request so that remote clusters don't send un-deserializable data types back. - Add the determined minimum transport version to the profile. - Add a bunch of tests. (cherry picked from commit 4a14f83b1659a4315105e6a77a3c098b709863a4) # Conflicts: # server/src/main/resources/transport/upper_bounds/9.3.csv # x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java --- docs/changelog/137431.yaml | 5 + ..._minimum_version_for_enrich_resolution.csv | 1 + .../resources/transport/upper_bounds/9.2.csv | 2 +- .../resources/transport/upper_bounds/9.3.csv | 2 +- .../test/rest/ESRestTestCase.java | 3 +- .../xpack/esql/core/type/DataType.java | 9 +- .../xpack/esql/ccq/AllSupportedFieldsIT.java | 24 +- .../esql/qa/single_node/PushQueriesIT.java | 1 + .../single_node/StoredFieldsSequentialIT.java | 1 + .../qa/rest/AllSupportedFieldsTestCase.java | 581 ++++++++++++++---- .../xpack/esql/action/EsqlExecutionInfo.java | 2 +- .../xpack/esql/action/EsqlQueryResponse.java | 33 +- .../action/EsqlResolveFieldsResponse.java | 2 +- .../esql/enrich/EnrichPolicyResolver.java | 43 +- .../xpack/esql/execution/PlanExecutor.java | 17 +- .../esql/plugin/TransportEsqlQueryAction.java | 31 +- .../xpack/esql/querylog/EsqlQueryLog.java | 9 +- .../xpack/esql/session/EsqlCCSUtils.java | 12 +- .../xpack/esql/session/EsqlSession.java | 58 +- .../xpack/esql/session/IndexResolver.java | 100 +-- .../elasticsearch/xpack/esql/CsvTests.java | 1 + .../action/EsqlQueryResponseProfileTests.java | 29 +- .../esql/action/EsqlQueryResponseTests.java | 14 +- .../enrich/EnrichPolicyResolverTests.java | 2 +- .../esql/querylog/EsqlQueryLogTests.java | 7 +- .../telemetry/PlanExecutorMetricsTests.java | 9 +- 26 files changed, 785 insertions(+), 213 deletions(-) create mode 100644 docs/changelog/137431.yaml create mode 100644 server/src/main/resources/transport/definitions/referable/esql_use_minimum_version_for_enrich_resolution.csv diff --git a/docs/changelog/137431.yaml b/docs/changelog/137431.yaml new file mode 100644 index 0000000000000..451fc5eabc1a7 --- /dev/null +++ b/docs/changelog/137431.yaml @@ -0,0 +1,5 @@ +pr: 137431 +summary: Fix enrich and lookup join resolution based on min transport version +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/resources/transport/definitions/referable/esql_use_minimum_version_for_enrich_resolution.csv b/server/src/main/resources/transport/definitions/referable/esql_use_minimum_version_for_enrich_resolution.csv new file mode 100644 index 0000000000000..58fae920e0a63 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_use_minimum_version_for_enrich_resolution.csv @@ -0,0 +1 @@ +9231000,9185011 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 5b4cb63943755..6f76f928f4bb0 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -aggregate_metric_double_typed_block,9185010 +esql_use_minimum_version_for_enrich_resolution,9185011 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 482517b9a61d6..106690c0c4247 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -aggregate_metric_double_typed_block,9227000 +esql_use_minimum_version_for_enrich_resolution,9231000 diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 5ddd925e4e803..8d8a714451d79 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2733,7 +2733,8 @@ protected static MapMatcher getProfileMatcher() { .entry("query", instanceOf(Map.class)) .entry("planning", instanceOf(Map.class)) .entry("drivers", instanceOf(List.class)) - .entry("plans", instanceOf(List.class)); + .entry("plans", instanceOf(List.class)) + .entry("minimumTransportVersion", instanceOf(Integer.class)); } protected static MapMatcher getResultMatcher(boolean includePartial, boolean includeDocumentsFound) { diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index bff8ab8c7037f..7a8c020d413eb 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -15,6 +15,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; +import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.util.PlanStreamInput; import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput; @@ -751,13 +752,9 @@ public DataType counter() { public void writeTo(StreamOutput out) throws IOException { if (supportedVersion.supportedOn(out.getTransportVersion(), Build.current().isSnapshot()) == false) { /* - * TODO when we implement version aware planning flip this to an IllegalStateException - * so we throw a 500 error. It'll be our bug then. Right now it's a sign that the user - * tried to do something like `KNN(dense_vector_field, [1, 2])` against an old node. - * Like, during the rolling upgrade that enables KNN or to a remote cluster that has - * not yet been upgraded. + * Throw a 500 error - this is a bug, we failed to account for an old node during planning. */ - throw new IllegalArgumentException( + throw new QlIllegalArgumentException( "remote node at version [" + out.getTransportVersion() + "] doesn't understand data type [" + this + "]" ); } diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java index fc0763f7d0777..43ba02c166304 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java @@ -50,10 +50,18 @@ public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPrefer public void createRemoteIndices() throws IOException { if (supportsNodeAssignment()) { for (Map.Entry e : remoteNodeToInfo().entrySet()) { - createIndexForNode(remoteClient(), e.getKey(), e.getValue().id()); + createIndexForNode(remoteClient(), e.getKey(), e.getValue().id(), indexMode()); } } else { - createIndexForNode(remoteClient(), null, null); + createIndexForNode(remoteClient(), null, null, indexMode()); + } + + // We need a single lookup index that has the same name across all clusters, as well as a single enrich policy per cluster. + // We create both only when we're testing LOOKUP mode. + if (indexExists(remoteClient(), LOOKUP_INDEX_NAME) == false && indexMode() == IndexMode.LOOKUP) { + createAllTypesIndex(remoteClient(), LOOKUP_INDEX_NAME, null, indexMode()); + createAllTypesDoc(remoteClient(), LOOKUP_INDEX_NAME); + createEnrichPolicy(remoteClient(), LOOKUP_INDEX_NAME, ENRICH_POLICY_NAME); } } @@ -101,4 +109,16 @@ && clusterHasCapability(remoteClient(), "GET", "/_query", List.of(), List.of("DE false ); } + + @Override + protected boolean fetchAllIsCrossCluster() { + return true; + } + + public final void testFetchAllOnlyFromRemotes() throws IOException { + doTestFetchAll(fromAllQuery("*:%mode%*", """ + , _id, _ignored, _index_mode, _score, _source, _version + | LIMIT 1000 + """), remoteNodeToInfo(), allNodeToInfo()); + } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java index a82ecd16334ac..d11d1cbf55043 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java @@ -365,6 +365,7 @@ private void testPushQuery( .entry("plans", instanceOf(List.class)) .entry("planning", matchesMap().extraOk()) .entry("query", matchesMap().extraOk()) + .entry("minimumTransportVersion", instanceOf(Integer.class)) ), matchesList().item(matchesMap().entry("name", "test").entry("type", anyOf(equalTo("text"), equalTo("keyword")))), equalTo(found ? List.of(List.of(value)) : List.of()) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java index 73cc14c195269..5b1adfdbd6e8c 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java @@ -122,6 +122,7 @@ private void testQuery(Double percent, String query, int documentsFound, boolean .entry("plans", instanceOf(List.class)) .entry("planning", matchesMap().extraOk()) .entry("query", matchesMap().extraOk()) + .entry("minimumTransportVersion", instanceOf(Integer.class)) ) .extraOk() ); diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java index c7ae359993bb4..3c142dbd09424 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java @@ -9,17 +9,19 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.http.HttpHost; import org.apache.http.util.EntityUtils; import org.elasticsearch.Build; import org.elasticsearch.TransportVersion; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentBuilder; @@ -44,10 +46,11 @@ import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_CREATED_TV; import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_USED_TV; import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION; import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_DENSE_VECTOR_CREATED_VERSION; -import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.INDEX_SOURCE; +import static org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver.ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION; import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; @@ -72,7 +75,6 @@ * load constant field values and have simple mappings. */ public class AllSupportedFieldsTestCase extends ESRestTestCase { - private static final Logger logger = LogManager.getLogger(FieldExtractorTestCase.class); @Rule(order = Integer.MIN_VALUE) public ProfileLogger profileLogger = new ProfileLogger(); @@ -100,11 +102,22 @@ protected AllSupportedFieldsTestCase(MappedFieldType.FieldExtractPreference extr this.indexMode = indexMode; } - protected record NodeInfo(String cluster, String id, boolean snapshot, TransportVersion version, Set roles) {} + protected IndexMode indexMode() { + return indexMode; + } + + protected record NodeInfo( + String cluster, + String id, + boolean snapshot, + TransportVersion version, + Set roles, + Set boundAddress + ) {} private static Map nodeToInfo; - private Map nodeToInfo() throws IOException { + private Map localNodeToInfo() throws IOException { if (nodeToInfo == null) { nodeToInfo = fetchNodeToInfo(client(), null); } @@ -152,7 +165,7 @@ protected boolean supportsNodeAssignment() throws IOException { * Map from node name to information about the node. */ protected Map allNodeToInfo() throws IOException { - return nodeToInfo(); + return localNodeToInfo(); } protected static Map fetchNodeToInfo(RestClient client, String cluster) throws IOException { @@ -164,9 +177,13 @@ protected static Map fetchNodeToInfo(RestClient client, String String id = (String) n.getKey(); Map nodeInfo = (Map) n.getValue(); String nodeName = (String) extractValue(nodeInfo, "name"); + Map http = (Map) extractValue(nodeInfo, "http"); + List unparsedBoundAddress = (List) extractValue(http, "bound_address"); + // The bound address can actually be 2 addresses, one ipv4 and one ipv6; stuff 'em in a set. + Set boundAddress = unparsedBoundAddress.stream().map(s -> HttpHost.create((String) s)).collect(Collectors.toSet()); /* - * Figuring out is a node is a snapshot is kind of tricky. The main version + * Figuring out if a node is a snapshot is kind of tricky. The main version * doesn't include -SNAPSHOT. But ${VERSION}-SNAPSHOT is in the node info * *somewhere*. So we do this silly toString here. */ @@ -178,21 +195,39 @@ protected static Map fetchNodeToInfo(RestClient client, String nodeToInfo.put( nodeName, - new NodeInfo(cluster, id, snapshot, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet())) + new NodeInfo( + cluster, + id, + snapshot, + transportVersion, + roles.stream().map(Object::toString).collect(Collectors.toSet()), + boundAddress + ) ); } return nodeToInfo; } + protected static final String ENRICH_POLICY_NAME = "all_fields_policy"; + protected static final String LOOKUP_INDEX_NAME = "all_fields_lookup_index"; + @Before public void createIndices() throws IOException { if (supportsNodeAssignment()) { - for (Map.Entry e : nodeToInfo().entrySet()) { - createIndexForNode(client(), e.getKey(), e.getValue().id()); + for (Map.Entry e : localNodeToInfo().entrySet()) { + createIndexForNode(client(), e.getKey(), e.getValue().id(), indexMode); } } else { - createIndexForNode(client(), null, null); + createIndexForNode(client(), null, null, indexMode); + } + + // We need a single lookup index that has the same name across all clusters, as well as a single enrich policy per cluster. + // We create both only when we're testing LOOKUP mode. + if (indexExists(LOOKUP_INDEX_NAME) == false && indexMode == IndexMode.LOOKUP) { + createAllTypesIndex(client(), LOOKUP_INDEX_NAME, null, indexMode); + createAllTypesDoc(client(), LOOKUP_INDEX_NAME); + createEnrichPolicy(client(), LOOKUP_INDEX_NAME, ENRICH_POLICY_NAME); } } @@ -200,7 +235,7 @@ public void createIndices() throws IOException { * Make sure the test doesn't run on snapshot builds. Release builds only. *

* {@link Build#isSnapshot()} checks if the version under test is a snapshot. - * But! This run test runs against many versions and if *any* are snapshots + * But! This test runs against many versions and if *any* are snapshots * then this will fail. So we check the versions of each node in the cluster too. *

*/ @@ -212,45 +247,143 @@ public void skipSnapshots() throws IOException { } } - // TODO: Also add a test for _tsid once we can determine the minimum transport version of all nodes. public final void testFetchAll() throws IOException { - Map response = esql(""" + doTestFetchAll(fromAllQuery(""" + , _id, _ignored, _index_mode, _score, _source, _version + | LIMIT 1000 + """), allNodeToInfo(), allNodeToInfo()); + } + + public final void testFetchAllEnrich() throws IOException { + assumeTrue("Test only requires the enrich policy (made from a lookup index)", indexMode == IndexMode.LOOKUP); + // The ENRICH is a no-op because it overwrites columns with the same identical data (except that it messes with + // the order of the columns, but we don't assert that). + doTestFetchAll(fromAllQuery(LoggerMessageFormat.format(null, """ , _id, _ignored, _index_mode, _score, _source, _version + | ENRICH _remote:{} ON {} | LIMIT 1000 - """); + """, ENRICH_POLICY_NAME, LOOKUP_ID_FIELD)), allNodeToInfo(), allNodeToInfo()); + } + + public final void testFetchAllLookupJoin() throws IOException { + assumeTrue("Test only requires lookup indices", indexMode == IndexMode.LOOKUP); + // The LOOKUP JOIN is a no-op because it overwrites columns with the same identical data (except that it messes with + // the order of the columns, but we don't assert that). + // We force the lookup join on to the remotes by having a SORT after it. + doTestFetchAll(fromAllQuery(LoggerMessageFormat.format(null, """ + , _id, _ignored, _index_mode, _score, _source, _version + | LOOKUP JOIN {} ON {} + | SORT _id + | LIMIT 1000 + """, LOOKUP_INDEX_NAME, LOOKUP_ID_FIELD)), allNodeToInfo(), allNodeToInfo()); + } + + /** + * Runs the query and expects 1 document per index on the contributing nodes as well as all the columns. + */ + protected final void doTestFetchAll( + String query, + Map nodesContributingIndices, + Map nodesInvolvedInExecution + ) throws IOException { + var responseAndCoordinatorVersion = runQuery(query); + + Map response = responseAndCoordinatorVersion.v1(); + TransportVersion coordinatorVersion = responseAndCoordinatorVersion.v2(); + + assertNoPartialResponse(response); + + List columns = (List) response.get("columns"); + List values = (List) response.get("values"); + + MapMatcher expectedColumns = allTypesColumnsMatcher(coordinatorVersion, minVersion(), indexMode, extractPreference, true, true); + assertMap(nameToType(columns), expectedColumns); + + MapMatcher expectedAllValues = matchesMap(); + for (Map.Entry e : expectedIndices(indexMode, nodesContributingIndices).entrySet()) { + String indexName = e.getKey(); + MapMatcher expectedValues = allTypesValuesMatcher( + coordinatorVersion, + minVersion(), + indexMode, + extractPreference, + true, + true, + indexName + ); + expectedAllValues = expectedAllValues.entry(indexName, expectedValues); + } + assertMap(indexToRow(columns, values), expectedAllValues); + + assertMinimumVersion(minVersion(nodesInvolvedInExecution), responseAndCoordinatorVersion, true, fetchAllIsCrossCluster()); + + profileLogger.clearProfile(); + } + + protected boolean fetchAllIsCrossCluster() { + return false; + } + + protected static void assertNoPartialResponse(Map response) { if ((Boolean) response.get("is_partial")) { throw new AssertionError("partial results: " + response); } - List columns = (List) response.get("columns"); - List values = (List) response.get("values"); + } - MapMatcher expectedColumns = matchesMap(); + protected static MapMatcher allTypesColumnsMatcher( + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode, + MappedFieldType.FieldExtractPreference extractPreference, + boolean expectMetadataFields, + boolean expectNonEnrichableFields + ) { + MapMatcher expectedColumns = matchesMap().entry(LOOKUP_ID_FIELD, "integer"); for (DataType type : DataType.values()) { if (supportedInIndex(type) == false) { continue; } - expectedColumns = expectedColumns.entry(fieldName(type), expectedType(type)); - } - expectedColumns = expectedColumns.entry("_id", "keyword") - .entry("_ignored", "keyword") - .entry("_index", "keyword") - .entry("_index_mode", "keyword") - .entry("_score", "double") - .entry("_source", "_source") - .entry("_version", "long"); - assertMap(nameToType(columns), expectedColumns); + if (expectNonEnrichableFields == false && supportedInEnrich(type) == false) { + continue; + } + expectedColumns = expectedColumns.entry(fieldName(type), expectedType(type, coordinatorVersion, minimumVersion, indexMode)); + } + if (expectMetadataFields) { + expectedColumns = expectedColumns.entry("_id", "keyword") + .entry("_ignored", "keyword") + .entry("_index", "keyword") + .entry("_index_mode", "keyword") + .entry("_score", "double") + .entry("_source", "_source") + .entry("_version", "long"); + } + return expectedColumns; + } - MapMatcher expectedAllValues = matchesMap(); - for (Map.Entry e : expectedIndices().entrySet()) { - String indexName = e.getKey(); - NodeInfo nodeInfo = e.getValue(); - MapMatcher expectedValues = matchesMap(); - for (DataType type : DataType.values()) { - if (supportedInIndex(type) == false) { - continue; - } - expectedValues = expectedValues.entry(fieldName(type), expectedValue(type, nodeInfo)); + protected static MapMatcher allTypesValuesMatcher( + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode, + MappedFieldType.FieldExtractPreference extractPreference, + boolean expectMetadataFields, + boolean expectNonEnrichableFields, + String indexName + ) { + MapMatcher expectedValues = matchesMap(); + expectedValues = expectedValues.entry(LOOKUP_ID_FIELD, equalTo(123)); + for (DataType type : DataType.values()) { + if (supportedInIndex(type) == false) { + continue; + } + if (expectNonEnrichableFields == false && supportedInEnrich(type) == false) { + continue; } + expectedValues = expectedValues.entry( + fieldName(type), + expectedValue(type, coordinatorVersion, minimumVersion, indexMode, extractPreference) + ); + } + if (expectMetadataFields) { expectedValues = expectedValues.entry("_id", any(String.class)) .entry("_ignored", nullValue()) .entry("_index", indexName) @@ -258,10 +391,9 @@ public final void testFetchAll() throws IOException { .entry("_score", 0.0) .entry("_source", matchesMap().extraOk()) .entry("_version", 1); - expectedAllValues = expectedAllValues.entry(indexName, expectedValues); } - assertMap(indexToRow(columns, values), expectedAllValues); - profileLogger.clearProfile(); + + return expectedValues; } /** @@ -280,7 +412,10 @@ public final void testFetchDenseVector() throws IOException { | EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector """ + request; } - response = esql(request); + var responseAndCoordinatorVersion = runQuery(fromAllQuery(request)); + assertMinimumVersionFromAllQueries(responseAndCoordinatorVersion); + + response = runQuery(fromAllQuery(request)).v1(); if ((Boolean) response.get("is_partial")) { Map clusters = (Map) response.get("_clusters"); Map details = (Map) clusters.get("details"); @@ -319,11 +454,10 @@ public final void testFetchDenseVector() throws IOException { assertMap(nameToType(columns), expectedColumns); MapMatcher expectedAllValues = matchesMap(); - for (Map.Entry e : expectedIndices().entrySet()) { + for (Map.Entry e : expectedIndices(indexMode).entrySet()) { String indexName = e.getKey(); - NodeInfo nodeInfo = e.getValue(); MapMatcher expectedValues = matchesMap(); - expectedValues = expectedValues.entry("f_dense_vector", expectedDenseVector(nodeInfo.version)); + expectedValues = expectedValues.entry("f_dense_vector", matchesList().item(0.5).item(10.0).item(5.9999995)); expectedValues = expectedValues.entry("_index", indexName); expectedAllValues = expectedAllValues.entry(indexName, expectedValues); } @@ -347,7 +481,10 @@ public final void testFetchAggregateMetricDouble() throws IOException { | EVAL junk = TO_AGGREGATE_METRIC_DOUBLE(1) // workaround to enable fetching aggregate_metric_double """ + request; } - response = esql(request); + var responseAndCoordinatorVersion = runQuery(fromAllQuery(request)); + assertMinimumVersionFromAllQueries(responseAndCoordinatorVersion); + + response = runQuery(fromAllQuery(request)).v1(); if ((Boolean) response.get("is_partial")) { Map clusters = (Map) response.get("_clusters"); Map details = (Map) clusters.get("details"); @@ -386,9 +523,8 @@ public final void testFetchAggregateMetricDouble() throws IOException { assertMap(nameToType(columns), expectedColumns); MapMatcher expectedAllValues = matchesMap(); - for (Map.Entry e : expectedIndices().entrySet()) { + for (Map.Entry e : expectedIndices(indexMode).entrySet()) { String indexName = e.getKey(); - NodeInfo nodeInfo = e.getValue(); MapMatcher expectedValues = matchesMap(); expectedValues = expectedValues.entry( "f_aggregate_metric_double", @@ -400,10 +536,112 @@ public final void testFetchAggregateMetricDouble() throws IOException { assertMap(indexToRow(columns, values), expectedAllValues); } - private Map esql(String query) throws IOException { + protected String fromAllQuery(String indexPattern, String restOfQuery) { + return ("FROM " + indexPattern + " METADATA _index").replace("%mode%", indexMode.toString()) + restOfQuery; + } + + protected String fromAllQuery(String restOfQuery) { + return fromAllQuery("*:%mode%*,%mode%*", restOfQuery); + } + + public void testRow() throws IOException { + assumeTrue( + "Test has to run only once, skip on other configurations", + extractPreference == MappedFieldType.FieldExtractPreference.NONE && indexMode == IndexMode.STANDARD + ); + String query = "ROW x = 1 | LIMIT 1"; + var responseAndCoordinatorVersion = runQuery(query); + + assertMinimumVersion(minVersion(localNodeToInfo()), responseAndCoordinatorVersion, false, false); + } + + @SuppressWarnings("unchecked") + public void testRowLookupJoin() throws IOException { + assumeTrue("Test only requires the lookup index", indexMode == IndexMode.LOOKUP); + String query = "ROW " + LOOKUP_ID_FIELD + " = 123 | LOOKUP JOIN " + LOOKUP_INDEX_NAME + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1"; + var responseAndCoordinatorVersion = runQuery(query); + TransportVersion expectedMinimumVersion = minVersion(localNodeToInfo()); + + assertMinimumVersion(expectedMinimumVersion, responseAndCoordinatorVersion, false, false); + + Map response = responseAndCoordinatorVersion.v1(); + TransportVersion coordinatorVersion = responseAndCoordinatorVersion.v2(); + + assertNoPartialResponse(response); + + List columns = (List) response.get("columns"); + List values = (List) response.get("values"); + + MapMatcher expectedColumns = allTypesColumnsMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + true + ); + assertMap(nameToType(columns), expectedColumns); + + MapMatcher expectedValues = allTypesValuesMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + true, + null + ); + assertMap(nameToValue(names(columns), (List) values.getFirst()), expectedValues); + } + + @SuppressWarnings("unchecked") + public void testRowEnrich() throws IOException { + assumeTrue("Test only requires the enrich policy (made from a lookup index)", indexMode == IndexMode.LOOKUP); + String query = "ROW " + LOOKUP_ID_FIELD + " = 123 | ENRICH " + ENRICH_POLICY_NAME + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1"; + var responseAndCoordinatorVersion = runQuery(query); + Map response = responseAndCoordinatorVersion.v1(); + TransportVersion coordinatorVersion = responseAndCoordinatorVersion.v2(); + TransportVersion expectedMinimumVersion = minVersion(localNodeToInfo()); + + assertMinimumVersion(expectedMinimumVersion, responseAndCoordinatorVersion, false, false); + + assertNoPartialResponse(response); + + List columns = (List) response.get("columns"); + List values = (List) response.get("values"); + + MapMatcher expectedColumns = allTypesColumnsMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + false + ); + assertMap(nameToType(columns), expectedColumns); + + MapMatcher expectedValues = allTypesValuesMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + false, + null + ); + assertMap(nameToValue(names(columns), (List) values.getFirst()), expectedValues); + } + + /** + * Run the query and return the response and the version of the coordinator. + *

+ * Fails if the response contains any warnings. + */ + @SuppressWarnings("unchecked") + private Tuple, TransportVersion> runQuery(String query) throws IOException { Request request = new Request("POST", "_query"); XContentBuilder body = JsonXContent.contentBuilder().startObject(); - body.field("query", "FROM *:%mode%*,%mode%* METADATA _index".replace("%mode%", indexMode.toString()) + query); + body.field("query", query); { body.startObject("pragma"); if (extractPreference != null) { @@ -417,29 +655,81 @@ private Map esql(String query) throws IOException { body.endObject(); request.setJsonEntity(Strings.toString(body)); - Map response = responseAsMap(client().performRequest(request)); - profileLogger.extractProfile(response, true); - return response; + Response response = client().performRequest(request); + Map responseMap = responseAsMap(response); + HttpHost coordinatorHost = response.getHost(); + NodeInfo coordinator = allNodeToInfo().values().stream().filter(n -> n.boundAddress().contains(coordinatorHost)).findFirst().get(); + TransportVersion coordinatorVersion = coordinator.version(); + + profileLogger.extractProfile(responseMap, true); + return new Tuple<>(responseMap, coordinatorVersion); } - protected void createIndexForNode(RestClient client, String nodeName, String nodeId) throws IOException { - String indexName = indexMode.toString(); - if (nodeName != null) { - indexName += "_" + nodeName.toLowerCase(Locale.ROOT); + protected void assertMinimumVersionFromAllQueries(Tuple, TransportVersion> responseAndCoordinatorVersion) + throws IOException { + assertMinimumVersion(minVersion(), responseAndCoordinatorVersion, true, fetchAllIsCrossCluster()); + } + + /** + * @param expectedMinimumVersion the minimum version of all clusters that participate in the query + * @param performsMainFieldCapsRequest {@code true} for queries that have a {@code FROM} command, so we don't retrieve the minimum + * version from the main field caps response. + */ + @SuppressWarnings("unchecked") + protected void assertMinimumVersion( + TransportVersion expectedMinimumVersion, + Tuple, TransportVersion> responseAndCoordinatorVersion, + boolean performsMainFieldCapsRequest, + boolean isCrossCluster + ) { + var responseMap = responseAndCoordinatorVersion.v1(); + var coordinatorVersion = responseAndCoordinatorVersion.v2(); + + if (coordinatorVersion.supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + Map profile = (Map) responseMap.get("profile"); + Integer minimumVersion = (Integer) profile.get("minimumTransportVersion"); + assertNotNull(minimumVersion); + int minimumVersionInt = minimumVersion; + if (expectedMinimumVersion.supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) + || (performsMainFieldCapsRequest == false) + || (isCrossCluster == false)) { + assertEquals(expectedMinimumVersion.id(), minimumVersionInt); + } else { + // If a remote cluster is old enough that it doesn't provide version information in the field caps response, the coordinator + // HAS to assume the oldest compatible version. + // This only applies to multi-cluster tests; if we're looking at a mixed cluster, the coordinator is new enough + // that it's field caps response will include the min cluster version. (Apparently the field caps request is performed + // directly on the coordinator.) + assertEquals(TransportVersion.minimumCompatible().id(), minimumVersionInt); + } } + } + + protected static void createIndexForNode(RestClient client, String nodeName, String nodeId, IndexMode mode) throws IOException { + String indexName = indexName(mode, nodeName); if (false == indexExists(client, indexName)) { - createAllTypesIndex(client, indexName, nodeId); + createAllTypesIndex(client, indexName, nodeId, mode); createAllTypesDoc(client, indexName); } } - private void createAllTypesIndex(RestClient client, String indexName, String nodeId) throws IOException { + protected static String indexName(IndexMode mode, String nodeName) { + String indexName = mode.toString(); + if (nodeName != null) { + indexName += "_" + nodeName.toLowerCase(Locale.ROOT); + } + return indexName; + } + + private static final String LOOKUP_ID_FIELD = "lookup_id"; + + protected static void createAllTypesIndex(RestClient client, String indexName, String nodeId, IndexMode mode) throws IOException { XContentBuilder config = JsonXContent.contentBuilder().startObject(); { config.startObject("settings"); config.startObject("index"); - config.field("mode", indexMode); - if (indexMode == IndexMode.TIME_SERIES) { + config.field("mode", mode); + if (mode == IndexMode.TIME_SERIES) { config.field("routing_path", "f_keyword"); } if (nodeId != null) { @@ -450,14 +740,20 @@ private void createAllTypesIndex(RestClient client, String indexName, String nod } { config.startObject("mappings").startObject("properties"); + + config.startObject(LOOKUP_ID_FIELD); + config.field("type", "integer"); + config.endObject(); + for (DataType type : DataType.values()) { if (supportedInIndex(type) == false) { continue; } config.startObject(fieldName(type)); - typeMapping(indexMode, config, type); + typeMapping(mode, config, type); config.endObject(); } + config.endObject().endObject().endObject(); } Request request = new Request("PUT", indexName); @@ -465,11 +761,11 @@ private void createAllTypesIndex(RestClient client, String indexName, String nod client.performRequest(request); } - private String fieldName(DataType type) { + private static String fieldName(DataType type) { return type == DataType.DATETIME ? "@timestamp" : "f_" + type.esType(); } - private void typeMapping(IndexMode indexMode, XContentBuilder config, DataType type) throws IOException { + private static void typeMapping(IndexMode indexMode, XContentBuilder config, DataType type) throws IOException { switch (type) { case COUNTER_DOUBLE, COUNTER_INTEGER, COUNTER_LONG -> config.field("type", type.esType().replace("counter_", "")) .field("time_series_metric", "counter"); @@ -488,8 +784,10 @@ private void typeMapping(IndexMode indexMode, XContentBuilder config, DataType t } } - private void createAllTypesDoc(RestClient client, String indexName) throws IOException { + protected static void createAllTypesDoc(RestClient client, String indexName) throws IOException { XContentBuilder doc = JsonXContent.contentBuilder().startObject(); + doc.field(LOOKUP_ID_FIELD); + doc.value(123); for (DataType type : DataType.values()) { if (supportedInIndex(type) == false) { continue; @@ -524,8 +822,42 @@ private void createAllTypesDoc(RestClient client, String indexName) throws IOExc client.performRequest(request); } - // This will become dependent on the minimum transport version of all nodes once we can determine that. - private Matcher expectedValue(DataType type, NodeInfo nodeInfo) throws IOException { + protected static void createEnrichPolicy(RestClient client, String indexName, String policyName) throws IOException { + XContentBuilder policyConfig = JsonXContent.contentBuilder().startObject(); + { + policyConfig.startObject("match"); + + policyConfig.field("indices", indexName); + policyConfig.field("match_field", LOOKUP_ID_FIELD); + List enrichFields = new ArrayList<>(); + for (DataType type : DataType.values()) { + if (supportedInIndex(type) == false || supportedInEnrich(type) == false) { + continue; + } + enrichFields.add(fieldName(type)); + } + policyConfig.field("enrich_fields", enrichFields); + + policyConfig.endObject(); + } + policyConfig.endObject(); + + Request request = new Request("PUT", "_enrich/policy/" + policyName); + request.setJsonEntity(Strings.toString(policyConfig)); + client.performRequest(request); + + Request execute = new Request("PUT", "_enrich/policy/" + policyName + "/_execute"); + request.addParameter("wait_for_completion", "true"); + client.performRequest(execute); + } + + private static Matcher expectedValue( + DataType type, + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode, + MappedFieldType.FieldExtractPreference extractPreference + ) { return switch (type) { case BOOLEAN -> equalTo(true); case COUNTER_LONG, LONG, COUNTER_INTEGER, INTEGER, UNSIGNED_LONG, SHORT, BYTE -> equalTo(1); @@ -538,21 +870,23 @@ private Matcher expectedValue(DataType type, NodeInfo nodeInfo) throws IOExce case DATETIME, DATE_NANOS -> equalTo("2025-01-01T01:00:00.000Z"); case IP -> equalTo("192.168.0.1"); case VERSION -> equalTo("1.0.0-SNAPSHOT"); - case GEO_POINT -> extractPreference == MappedFieldType.FieldExtractPreference.DOC_VALUES || syntheticSourceByDefault() + case GEO_POINT -> extractPreference == MappedFieldType.FieldExtractPreference.DOC_VALUES || syntheticSourceByDefault(indexMode) ? equalTo("POINT (-71.34000004269183 41.1199999647215)") : equalTo("POINT (-71.34 41.12)"); case GEO_SHAPE -> equalTo("POINT (-71.34 41.12)"); case NULL -> nullValue(); case AGGREGATE_METRIC_DOUBLE -> { - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { + // See expectedType for an explanation + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { yield nullValue(); } yield equalTo("{\"min\":-302.5,\"max\":702.3,\"sum\":200.0,\"value_count\":25}"); } case DENSE_VECTOR -> { - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { + // See expectedType for an explanation + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { yield nullValue(); } yield equalTo(List.of(0.5, 10.0, 5.9999995)); @@ -561,12 +895,6 @@ private Matcher expectedValue(DataType type, NodeInfo nodeInfo) throws IOExce }; } - private Matcher> expectedDenseVector(TransportVersion version) { - return version.supports(INDEX_SOURCE) // *after* 9.1 - ? matchesList().item(0.5).item(10.0).item(5.9999995) - : matchesList().item(0.04283529).item(0.85670584).item(0.5140235); - } - /** * Is the type supported in indices? */ @@ -585,7 +913,21 @@ private static boolean supportedInIndex(DataType t) { }; } - private Map nameToType(List columns) { + /** + * Is the type supported in enrich policies? + */ + private static boolean supportedInEnrich(DataType t) { + return switch (t) { + // Enrich policies don't work with types that have mandatory fields in the mapping. + // https://github.com/elastic/elasticsearch/issues/127350 + case AGGREGATE_METRIC_DOUBLE, SCALED_FLOAT, + // https://github.com/elastic/elasticsearch/issues/137699 + DENSE_VECTOR -> false; + default -> true; + }; + } + + private static Map nameToType(List columns) { Map result = new TreeMap<>(); for (Object c : columns) { Map map = (Map) c; @@ -594,7 +936,7 @@ private Map nameToType(List columns) { return result; } - private List names(List columns) { + private static List names(List columns) { List result = new ArrayList<>(); for (Object c : columns) { Map map = (Map) c; @@ -603,21 +945,21 @@ private List names(List columns) { return result; } - private Map> indexToRow(List columns, List values) { + private static Map> indexToRow(List columns, List values) { List names = names(columns); - int timestampIdx = names.indexOf("_index"); - if (timestampIdx < 0) { + int indexNameIdx = names.indexOf("_index"); + if (indexNameIdx < 0) { throw new IllegalStateException("query didn't return _index"); } Map> result = new TreeMap<>(); for (Object r : values) { List row = (List) r; - result.put(row.get(timestampIdx).toString(), nameToValue(names, row)); + result.put(row.get(indexNameIdx).toString(), nameToValue(names, row)); } return result; } - private Map nameToValue(List names, List values) { + private static Map nameToValue(List names, List values) { Map result = new TreeMap<>(); for (int i = 0; i < values.size(); i++) { result.put(names.get(i), values.get(i)); @@ -625,8 +967,16 @@ private Map nameToValue(List names, List values) { return result; } - // This will become dependent on the minimum transport version of all nodes once we can determine that. - private Matcher expectedType(DataType type) throws IOException { + private Matcher expectedType(DataType type, TransportVersion coordinatorVersion) throws IOException { + return expectedType(type, coordinatorVersion, minVersion(), indexMode); + } + + private static Matcher expectedType( + DataType type, + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode + ) { return switch (type) { case COUNTER_DOUBLE, COUNTER_LONG, COUNTER_INTEGER -> { if (indexMode == IndexMode.TIME_SERIES) { @@ -638,20 +988,23 @@ private Matcher expectedType(DataType type) throws IOException { case HALF_FLOAT, SCALED_FLOAT, FLOAT -> equalTo("double"); case NULL -> equalTo("keyword"); case AGGREGATE_METRIC_DOUBLE -> { - // RESOLVE_FIELDS_RESPONSE_USED_TV is newer and technically sufficient to check. - // We also check for ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION for clarity. - // Future data types added here should only require the TV when they were created, - // because it will be after RESOLVE_FIELDS_RESPONSE_USED_TV. - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { + // 9.2.0 nodes have ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION and support this type + // when they are data nodes, but not as coordinators! + // (Unless the query uses functions that depend on this type, which is a workaround + // for missing version-awareness in 9.2.0, and not considered here.) + // RESOLVE_FIELDS_RESPONSE_USED_TV is newer and marks the point when coordinators + // started to be able to plan for this data type, and will consider it supported if + // all nodes are on ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION or newer. + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { yield equalTo("unsupported"); } yield equalTo("aggregate_metric_double"); } case DENSE_VECTOR -> { - logger.error("ADFDAFAF " + minVersion()); - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { + // Same dance as for AGGREGATE_METRIC_DOUBLE + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { yield equalTo("unsupported"); } yield equalTo("dense_vector"); @@ -665,37 +1018,55 @@ protected boolean preserveClusterUponCompletion() { return true; } - private boolean syntheticSourceByDefault() { + private static boolean syntheticSourceByDefault(IndexMode indexMode) { return switch (indexMode) { case TIME_SERIES, LOGSDB -> true; case STANDARD, LOOKUP -> false; }; } - private Map expectedIndices() throws IOException { + private Map expectedIndices(IndexMode indexMode) throws IOException { + return expectedIndices(indexMode, allNodeToInfo()); + } + + protected Map expectedIndices(IndexMode indexMode, Map nodeToInfo) throws IOException { Map result = new TreeMap<>(); if (supportsNodeAssignment()) { - for (Map.Entry e : allNodeToInfo().entrySet()) { - String name = indexMode + "_" + e.getKey(); + for (Map.Entry e : nodeToInfo.entrySet()) { + String name = indexName(indexMode, e.getKey()); if (e.getValue().cluster != null) { name = e.getValue().cluster + ":" + name; } result.put(name, e.getValue()); } } else { - for (Map.Entry e : allNodeToInfo().entrySet()) { - String name = indexMode.toString(); + for (Map.Entry e : nodeToInfo.entrySet()) { + String name = indexName(indexMode, null); if (e.getValue().cluster != null) { name = e.getValue().cluster + ":" + name; } // We should only end up with one per cluster - result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().snapshot(), e.getValue().version(), null)); + result.put( + name, + new NodeInfo( + e.getValue().cluster, + null, + e.getValue().snapshot(), + e.getValue().version(), + null, + e.getValue().boundAddress() + ) + ); } } return result; } protected TransportVersion minVersion() throws IOException { - return allNodeToInfo().values().stream().map(NodeInfo::version).min(Comparator.naturalOrder()).get(); + return minVersion(allNodeToInfo()); + } + + protected TransportVersion minVersion(Map nodeToInfo) throws IOException { + return nodeToInfo.values().stream().map(NodeInfo::version).min(Comparator.naturalOrder()).get(); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 70bf575b1d809..55470eca87303 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -70,7 +70,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. public final ConcurrentMap clusterInfo; - // Is the clusterInfo map iinitialization in progress? If so, we should not try to serialize it. + // Is the clusterInfo map initialization in progress? If so, we should not try to serialize it. private transient volatile boolean clusterInfoInitializing; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 736cb9fdaa5b2..467108587a14b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -36,6 +36,8 @@ import java.util.Objects; import java.util.Optional; +import static org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver.ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION; + public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse implements ChunkedToXContentObject, @@ -275,6 +277,11 @@ public Iterator toXContentChunked(ToXContent.Params params })); content.add(ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params)); content.add(ChunkedToXContentHelper.array("plans", profile.plans.iterator())); + content.add(ChunkedToXContentHelper.chunk((b, p) -> { + TransportVersion minimumVersion = profile.minimumVersion(); + b.field("minimumTransportVersion", minimumVersion == null ? null : minimumVersion.id()); + return b; + })); content.add(ChunkedToXContentHelper.endObject()); } content.add(ChunkedToXContentHelper.endObject()); @@ -383,14 +390,15 @@ public EsqlResponse responseInternal() { return esqlResponse; } - public record Profile(List drivers, List plans) implements Writeable { + public record Profile(List drivers, List plans, TransportVersion minimumVersion) implements Writeable { public static Profile readFrom(StreamInput in) throws IOException { return new Profile( in.readCollectionAsImmutableList(DriverProfile::readFrom), in.getTransportVersion().supports(ESQL_PROFILE_INCLUDE_PLAN) ? in.readCollectionAsImmutableList(PlanProfile::readFrom) - : List.of() + : List.of(), + in.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION) ? readOptionalTransportVersion(in) : null ); } @@ -400,6 +408,27 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().supports(ESQL_PROFILE_INCLUDE_PLAN)) { out.writeCollection(plans); } + if (out.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + // When retrieving the profile from an older node, there might be no minimum version attached. + // When writing the profile somewhere else, we need to handle the case that the minimum version is null. + writeOptionalTransportVersion(minimumVersion, out); + } + } + + private static TransportVersion readOptionalTransportVersion(StreamInput in) throws IOException { + if (in.readBoolean()) { + return TransportVersion.readVersion(in); + } + return null; + } + + private static void writeOptionalTransportVersion(@Nullable TransportVersion version, StreamOutput out) throws IOException { + if (version == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + TransportVersion.writeVersion(version, out); + } } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java index aaef3da09e2fd..6eaae7b9b5f67 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java @@ -18,7 +18,7 @@ import java.io.IOException; public class EsqlResolveFieldsResponse extends ActionResponse { - private static final TransportVersion RESOLVE_FIELDS_RESPONSE_CREATED_TV = TransportVersion.fromName( + public static final TransportVersion RESOLVE_FIELDS_RESPONSE_CREATED_TV = TransportVersion.fromName( "esql_resolve_fields_response_created" ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 0744fd126999d..b57dc1a86d6fd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.enrich; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ChannelActionListener; @@ -75,6 +76,10 @@ public class EnrichPolicyResolver { private static final String RESOLVE_ACTION_NAME = "cluster:monitor/xpack/enrich/esql/resolve_policy"; + public static final TransportVersion ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION = TransportVersion.fromName( + "esql_use_minimum_version_for_enrich_resolution" + ); + private final ClusterService clusterService; private final IndexResolver indexResolver; private final TransportService transportService; @@ -113,9 +118,17 @@ public static UnresolvedPolicy from(Enrich e) { * * @param enriches the unresolved policies * @param executionInfo the execution info + * @param minimumVersion the minimum transport version of all clusters involved in the query; used for making the resolved mapping + * compatible with all involved clusters in case of CCS. + * (Enrich policy resolution happens separately on each remote cluster.) * @param listener notified with the enrich resolution */ - public void resolvePolicies(List enriches, EsqlExecutionInfo executionInfo, ActionListener listener) { + public void resolvePolicies( + List enriches, + EsqlExecutionInfo executionInfo, + TransportVersion minimumVersion, + ActionListener listener + ) { if (enriches.isEmpty()) { listener.onResponse(new EnrichResolution()); return; @@ -125,6 +138,7 @@ public void resolvePolicies(List enriches, EsqlExecutionInfo executionIn executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()), enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), executionInfo, + minimumVersion, listener ); } @@ -133,6 +147,7 @@ protected void doResolvePolicies( Set remoteClusters, Collection unresolvedPolicies, EsqlExecutionInfo executionInfo, + TransportVersion minimumVersion, ActionListener listener ) { if (unresolvedPolicies.isEmpty()) { @@ -141,7 +156,7 @@ protected void doResolvePolicies( } final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, listener.map(lookupResponses -> { + lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, minimumVersion, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); final Map lookupResponsesToProcess = new HashMap<>(); for (Map.Entry entry : lookupResponses.entrySet()) { @@ -305,6 +320,7 @@ private void lookupPolicies( boolean includeLocal, Collection unresolvedPolicies, EsqlExecutionInfo executionInfo, + TransportVersion minimumVersion, ActionListener> listener ) { final Map lookupResponses = ConcurrentCollections.newConcurrentMap(); @@ -324,7 +340,7 @@ public void onResponse(Transport.Connection connection) { transportService.sendRequest( connection, RESOLVE_ACTION_NAME, - new LookupRequest(cluster, remotePolicies), + new LookupRequest(cluster, remotePolicies, minimumVersion), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, skipOnFailure, l)), @@ -350,7 +366,7 @@ public void onFailure(Exception e) { transportService.sendRequest( transportService.getLocalNode(), RESOLVE_ACTION_NAME, - new LookupRequest(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, localPolicies), + new LookupRequest(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, localPolicies, minimumVersion), new ActionListenerResponseHandler<>( refs.acquire(resp -> lookupResponses.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, resp)), LookupResponse::new, @@ -372,21 +388,36 @@ private void failIfSkipUnavailableFalse(Exception e, boolean skipOnFailure, Acti private static class LookupRequest extends AbstractTransportRequest { private final String clusterAlias; private final Collection policyNames; + // The minimum version of all clusters involved in executing the ESQL query. + final TransportVersion minimumVersion; - LookupRequest(String clusterAlias, Collection policyNames) { + LookupRequest(String clusterAlias, Collection policyNames, TransportVersion minimumVersion) { this.clusterAlias = clusterAlias; this.policyNames = policyNames; + this.minimumVersion = minimumVersion; } LookupRequest(StreamInput in) throws IOException { this.clusterAlias = in.readString(); this.policyNames = in.readStringCollectionAsList(); + if (in.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + this.minimumVersion = TransportVersion.readVersion(in); + } else { + // An older coordinator contacted us. Let's assume an old version, otherwise we might send back + // types that it can't deserialize. + // (The only versions that know some new types but don't send their transport version here are 9.2.0-9.2.2. + // These types are dense_vector and aggregate_metric_double, and both don't work with ENRICH in these versions, anyway.) + this.minimumVersion = TransportVersion.minimumCompatible(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterAlias); out.writeStringCollection(policyNames); + if (out.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + TransportVersion.writeVersion(minimumVersion, out); + } } } @@ -452,7 +483,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas IndexResolver.ALL_FIELDS, null, false, - // Disable aggregate_metric_double and dense_vector until we get version checks in planning + request.minimumVersion, false, false, refs.acquire(indexResult -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index 90a0f5a3a88fe..ecdccf6aecebe 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.execution; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; @@ -26,6 +27,7 @@ import org.elasticsearch.xpack.esql.session.EsqlSession; import org.elasticsearch.xpack.esql.session.IndexResolver; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import org.elasticsearch.xpack.esql.telemetry.Metrics; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager; @@ -67,17 +69,19 @@ public PlanExecutor( public void esql( EsqlQueryRequest request, String sessionId, + TransportVersion localClusterMinimumVersion, AnalyzerSettings analyzerSettings, EnrichPolicyResolver enrichPolicyResolver, EsqlExecutionInfo executionInfo, IndicesExpressionGrouper indicesExpressionGrouper, EsqlSession.PlanRunner planRunner, TransportActionServices services, - ActionListener listener + ActionListener> listener ) { final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry); final var session = new EsqlSession( sessionId, + localClusterMinimumVersion, analyzerSettings, indexResolver, enrichPolicyResolver, @@ -93,7 +97,7 @@ public void esql( metrics.total(clientId); var begin = System.nanoTime(); - ActionListener executeListener = wrap( + ActionListener> executeListener = wrap( x -> onQuerySuccess(request, listener, x, planTelemetry), ex -> onQueryFailure(request, listener, ex, clientId, planTelemetry, begin) ); @@ -102,7 +106,12 @@ public void esql( ActionListener.run(executeListener, l -> session.execute(request, executionInfo, planRunner, l)); } - private void onQuerySuccess(EsqlQueryRequest request, ActionListener listener, Result x, PlanTelemetry planTelemetry) { + private void onQuerySuccess( + EsqlQueryRequest request, + ActionListener> listener, + Versioned x, + PlanTelemetry planTelemetry + ) { planTelemetryManager.publish(planTelemetry, true); queryLog.onQueryPhase(x, request.query()); listener.onResponse(x); @@ -110,7 +119,7 @@ private void onQuerySuccess(EsqlQueryRequest request, ActionListener lis private void onQueryFailure( EsqlQueryRequest request, - ActionListener listener, + ActionListener> listener, Exception ex, QueryMetric clientId, PlanTelemetry planTelemetry, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index f8d24720231e5..31707c33a126f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.stats.CCSUsage; @@ -55,6 +56,7 @@ import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import java.io.IOException; import java.util.ArrayList; @@ -237,6 +239,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener columns = result.schema().stream().map(c -> { + private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolean profileEnabled, Versioned result) { + var innerResult = result.inner(); + List columns = innerResult.schema().stream().map(c -> { List originalTypes; if (c instanceof UnsupportedAttribute ua) { // Sort the original types so they are easier to test against and prettier. @@ -382,32 +387,36 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolea return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes); }).toList(); EsqlQueryResponse.Profile profile = profileEnabled - ? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles()) + ? new EsqlQueryResponse.Profile( + innerResult.completionInfo().driverProfiles(), + innerResult.completionInfo().planProfiles(), + result.minimumVersion() + ) : null; if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { String asyncExecutionId = asyncTask.getExecutionId().getEncoded(); return new EsqlQueryResponse( columns, - result.pages(), - result.completionInfo().documentsFound(), - result.completionInfo().valuesLoaded(), + innerResult.pages(), + innerResult.completionInfo().documentsFound(), + innerResult.completionInfo().valuesLoaded(), profile, request.columnar(), asyncExecutionId, false, request.async(), - result.executionInfo() + innerResult.executionInfo() ); } return new EsqlQueryResponse( columns, - result.pages(), - result.completionInfo().documentsFound(), - result.completionInfo().valuesLoaded(), + innerResult.pages(), + innerResult.completionInfo().documentsFound(), + innerResult.completionInfo().valuesLoaded(), profile, request.columnar(), request.async(), - result.executionInfo() + innerResult.executionInfo() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java index cd410c8eba804..8722fa8b1c155 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java @@ -16,6 +16,7 @@ import org.elasticsearch.index.SlowLogFields; import org.elasticsearch.xcontent.json.JsonStringEncoder; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -62,12 +63,12 @@ public EsqlQueryLog(ClusterSettings settings, SlowLogFieldProvider slowLogFieldP this.additionalFields = slowLogFieldProvider.create(); } - public void onQueryPhase(Result esqlResult, String query) { - if (esqlResult == null) { + public void onQueryPhase(Versioned esqlResult, String query) { + if (esqlResult.inner() == null) { return; // TODO review, it happens in some tests, not sure if it's a thing also in prod } - long tookInNanos = esqlResult.executionInfo().overallTook().nanos(); - log(() -> Message.of(esqlResult, query, includeUser ? additionalFields.queryFields() : Map.of()), tookInNanos); + long tookInNanos = esqlResult.inner().executionInfo().overallTook().nanos(); + log(() -> Message.of(esqlResult.inner(), query, includeUser ? additionalFields.queryFields() : Map.of()), tookInNanos); } public void onQueryFailure(String query, Exception ex, long tookInNanos) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 35326ed44a826..73c23bab3608f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; @@ -76,9 +77,9 @@ static Map determineUnavailableRemoteClusters( */ abstract static class CssPartialErrorsActionListener implements ActionListener> { private final EsqlExecutionInfo executionInfo; - private final ActionListener listener; + private final ActionListener> listener; - CssPartialErrorsActionListener(EsqlExecutionInfo executionInfo, ActionListener listener) { + CssPartialErrorsActionListener(EsqlExecutionInfo executionInfo, ActionListener> listener) { this.executionInfo = executionInfo; this.listener = listener; } @@ -87,7 +88,12 @@ abstract static class CssPartialErrorsActionListener implements ActionListener( + new Result(Analyzer.NO_FIELDS, Collections.emptyList(), DriverCompletionInfo.EMPTY, executionInfo), + TransportVersion.current() + ) + ); } else { listener.onFailure(e); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a39dda6223ca4..6cf495131e50a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -118,7 +118,8 @@ public interface PlanRunner { private static final TransportVersion LOOKUP_JOIN_CCS = TransportVersion.fromName("lookup_join_ccs"); private final String sessionId; - private final AnalyzerSettings clusterSettings; + private final TransportVersion localClusterMinimumVersion; + private final AnalyzerSettings analyzerSettings; private final IndexResolver indexResolver; private final EnrichPolicyResolver enrichPolicyResolver; @@ -142,7 +143,8 @@ public interface PlanRunner { public EsqlSession( String sessionId, - AnalyzerSettings clusterSettings, + TransportVersion localClusterMinimumVersion, + AnalyzerSettings analyzerSettings, IndexResolver indexResolver, EnrichPolicyResolver enrichPolicyResolver, PreAnalyzer preAnalyzer, @@ -154,7 +156,8 @@ public EsqlSession( TransportActionServices services ) { this.sessionId = sessionId; - this.clusterSettings = clusterSettings; + this.localClusterMinimumVersion = localClusterMinimumVersion; + this.analyzerSettings = analyzerSettings; this.indexResolver = indexResolver; this.enrichPolicyResolver = enrichPolicyResolver; this.preAnalyzer = preAnalyzer; @@ -178,7 +181,12 @@ public String sessionId() { /** * Execute an ESQL request. */ - public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener listener) { + public void execute( + EsqlQueryRequest request, + EsqlExecutionInfo executionInfo, + PlanRunner planRunner, + ActionListener> listener + ) { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); assert executionInfo != null : "Null EsqlExecutionInfo"; LOGGER.debug("ESQL query:\n{}", request.query()); @@ -190,15 +198,15 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P null, clusterName, request.pragmas(), - clusterSettings.resultTruncationMaxSize(), - clusterSettings.resultTruncationDefaultSize(), + analyzerSettings.resultTruncationMaxSize(), + analyzerSettings.resultTruncationDefaultSize(), request.query(), request.profile(), request.tables(), System.nanoTime(), request.allowPartialResults(), - clusterSettings.timeseriesResultTruncationMaxSize(), - clusterSettings.timeseriesResultTruncationDefaultSize() + analyzerSettings.timeseriesResultTruncationMaxSize(), + analyzerSettings.timeseriesResultTruncationDefaultSize() ); FoldContext foldContext = configuration.newFoldContext(); @@ -246,6 +254,7 @@ public void onResponse(Versioned analyzedPlan) { l ) ) + .>andThen((l, r) -> l.onResponse(new Versioned<>(r, minimumVersion))) .addListener(listener); } } @@ -508,7 +517,11 @@ public void analyzedPlan( assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); - PreAnalysisResult result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false); + // Initialize the PreAnalysisResult with the local cluster's minimum transport version, so our planning will be correct also in + // case of ROW queries. ROW queries can still require inter-node communication (for ENRICH and LOOKUP JOIN execution) with an older + // node in the same cluster; so assuming that all nodes are on the same version as this node will be wrong and may cause bugs. + PreAnalysisResult result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false) + .withMinimumTransportVersion(localClusterMinimumVersion); EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); @@ -527,7 +540,13 @@ public void analyzedPlan( }) .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) .andThen((l, r) -> { - enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); + // Do not update PreAnalysisResult.minimumTransportVersion, that's already been determined during main index resolution. + enrichPolicyResolver.resolvePolicies( + preAnalysis.enriches(), + executionInfo, + r.minimumTransportVersion(), + l.map(r::withEnrichResolution) + ); }) .andThen((l, r) -> { inferenceService.inferenceResolver(functionRegistry).resolveInferenceIds(parsed, l.map(r::withInferenceResolution)); @@ -567,12 +586,18 @@ private void preAnalyzeLookupIndex( ThreadPool.Names.SEARCH_COORDINATION, ThreadPool.Names.SYSTEM_READ ); + // No need to update the minimum transport version in the PreAnalysisResult, + // it should already have been determined during the main index resolution. indexResolver.resolveAsMergedMapping( EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern), result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames, null, false, - // Disable aggregate_metric_double and dense_vector until we get version checks in planning + // We use the minimum version determined in the main index resolution, because for remote LOOKUP JOIN, we're only considering + // remote lookup indices in the field caps request - but the coordinating cluster must be considered, too! + // The main index resolution should already have taken the version of the coordinating cluster into account and this should + // be reflected in result.minimumTransportVersion(). + result.minimumTransportVersion(), false, false, listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) @@ -631,6 +656,7 @@ private PreAnalysisResult receiveLookupIndexResolution( + "] mode" ); } + return result.addLookupIndexResolution(index, lookupIndexResolution); } @@ -791,6 +817,16 @@ private void preAnalyzeMainIndicesAndRetrieveMinTransportVersion( default -> requestFilter; }, preAnalysis.indexMode() == IndexMode.TIME_SERIES, + // TODO: In case of subqueries, the different main index resolutions don't know about each other's minimum version. + // This is bad because `FROM (FROM remote1:*) (FROM remote2:*)` can have different minimum versions + // while resolving each subquery's main index pattern. We'll determine the correct overall minimum transport version + // in the end because we keep updating the PreAnalysisResult after each resolution; but the EsIndex objects may be + // inconsistent with this version: + // The main index pattern from a subquery that we resolve first may have a higher min version in the field caps response + // than an index pattern that we resolve later. + // Thus, the EsIndex for `FROM remote1:*` may contain data types that aren't supported on the overall minimum version + // if we only find out that the overall version is actually lower when resolving `FROM remote2:*`. + result.minimumTransportVersion(), preAnalysis.useAggregateMetricDoubleWhenNotSupported(), preAnalysis.useDenseVectorWhenNotSupported(), listener.delegateFailureAndWrap((l, indexResolution) -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index fbba2a0b874c8..64a635039e1ac 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -25,7 +25,6 @@ import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; -import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.DateEsField; @@ -84,13 +83,15 @@ public IndexResolver(Client client) { } /** - * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. + * Like {@code IndexResolver#resolveIndicesVersioned} + * but simplified and does not pass on the determined minimum transport version to the listener. */ public void resolveAsMergedMapping( String indexWildcard, Set fieldNames, QueryBuilder requestFilter, boolean includeAllDimensions, + TransportVersion minimumVersion, boolean useAggregateMetricDoubleWhenNotSupported, boolean useDenseVectorWhenNotSupported, ActionListener listener @@ -104,6 +105,7 @@ public void resolveAsMergedMapping( fieldNames, requestFilter, includeAllDimensions, + minimumVersion, useAggregateMetricDoubleWhenNotSupported, useDenseVectorWhenNotSupported, ignoreVersion @@ -111,15 +113,37 @@ public void resolveAsMergedMapping( } /** - * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. Also retrieves the minimum transport - * version available in the cluster (and remotes). + * Perform a field caps request to resolve a pattern to one mapping (potentially compound, meaning it spans multiple indices). + *

+ * The field caps response contains the minimum transport version of all clusters that apply to the pattern, + * and it is used to deal with previously unsupported data types during resolution. + *

+ * If a field's type is not supported on the minimum version, it will be {@link DataType#UNSUPPORTED}. + *

+ * If the nodes are too old to include their minimum transport version in the field caps response, we'll assume + * {@link TransportVersion#minimumCompatible()}. + *

+ * The {@code minimumVersion} already known so far must be passed in and will be used instead of the minimum version + * from the field caps response if it is lower. During main index resolution, this is the local cluster's minimum version. + * This safeguards against using too new a version in case of {@code FROM remote_only:* | ...} queries that don't have any indices + * on the local cluster. + *

+ * But it's also important for remote {@code ENRICH} resolution, because in CCS enrich policies are resolved on remote clusters, + * so the overall minimum transport version that the coordinating cluster observed must be passed in here to avoid inconsistencies. + *

+ * The overall minimum version is updated using the field caps response and is passed on to the listener. */ public void resolveAsMergedMappingAndRetrieveMinimumVersion( String indexWildcard, Set fieldNames, QueryBuilder requestFilter, boolean includeAllDimensions, + TransportVersion minimumVersion, + // Used for bwc with 9.2.0, which supports aggregate_metric_double but doesn't provide its version in the field + // caps response. We'll just assume the type is supported based on usage in the query to not break compatibility + // with 9.2.0. boolean useAggregateMetricDoubleWhenNotSupported, + // Same as above boolean useDenseVectorWhenNotSupported, ActionListener> listener ) { @@ -127,15 +151,29 @@ public void resolveAsMergedMappingAndRetrieveMinimumVersion( EsqlResolveFieldsAction.TYPE, createFieldCapsRequest(indexWildcard, fieldNames, requestFilter, includeAllDimensions), listener.delegateFailureAndWrap((l, response) -> { + TransportVersion responseMinimumVersion = response.caps().minTransportVersion(); + // Note: Once {@link EsqlResolveFieldsResponse}'s CREATED version is live everywhere + // we can remove this and make sure responseMinimumVersion is non-null. That'll be 10.0-ish. + TransportVersion overallMinimumVersion = responseMinimumVersion == null + ? TransportVersion.minimumCompatible() + : TransportVersion.min(minimumVersion, responseMinimumVersion); + FieldsInfo info = new FieldsInfo( response.caps(), - response.minTransportVersion(), + overallMinimumVersion, Build.current().isSnapshot(), useAggregateMetricDoubleWhenNotSupported, useDenseVectorWhenNotSupported ); - LOGGER.debug("minimum transport version {} {}", response.minTransportVersion(), info.effectiveMinTransportVersion()); - l.onResponse(new Versioned<>(mergedMappings(indexWildcard, info), info.effectiveMinTransportVersion())); + LOGGER.debug( + "previously assumed minimum transport version [{}] updated to effective version [{}]" + + " using field caps response version [{}] for index pattern [{}]", + minimumVersion, + info.minTransportVersion(), + responseMinimumVersion, + indexWildcard + ); + l.onResponse(new Versioned<>(mergedMappings(indexWildcard, info), info.minTransportVersion())); }) ); } @@ -148,6 +186,22 @@ public void resolveAsMergedMappingAndRetrieveMinimumVersion( * is targeting. It doesn't matter if the node is a data node or an ML node or a unicorn, it's transport * version counts. BUT if the query doesn't dispatch to that cluster AT ALL, we don't count the versions * of any nodes in that cluster. + *

+ * If any remote didn't tell us the version we assume + * that it's very, very old. This effectively disables any fields that were created "recently". + * Which is appropriate because those fields are not supported on *almost* all versions that + * don't return the transport version in the response. + *

+ * "Very, very old" above means that there are versions of Elasticsearch that we're wire + * compatible that with that don't support sending the version back. That's anything + * from {@code 8.19.FIRST} to {@code 9.2.0}. "Recently" means any field types we + * added support for after the initial release of ESQL. These fields use + * {@link SupportedVersion#supportedOn} rather than {@link SupportedVersion#SUPPORTED_ON_ALL_NODES}. + * Except for DATE_NANOS. For DATE_NANOS we got lucky/made a mistake. It wasn't widely + * used before ESQL added support for it and we weren't careful about enabling it. So + * queries on mixed version clusters that touch DATE_NANOS will fail. All the types + * added after that, like DENSE_VECTOR, will gracefully disable themselves when talking + * to older nodes. * @param currentBuildIsSnapshot is the current build a snapshot? Note: This is always {@code Build.current().isSnapshot()} in * production but tests need more control * @param useAggregateMetricDoubleWhenNotSupported does the query itself force us to use {@code aggregate_metric_double} fields @@ -166,34 +220,7 @@ public record FieldsInfo( boolean currentBuildIsSnapshot, boolean useAggregateMetricDoubleWhenNotSupported, boolean useDenseVectorWhenNotSupported - ) { - /** - * The {@link #minTransportVersion}, but if any remote didn't tell us the version we assume - * that it's very, very old. This effectively disables any fields that were created "recently". - * Which is appropriate because those fields are not supported on *almost* all versions that - * don't return the transport version in the response. - *

- * "Very, very old" above means that there are versions of Elasticsearch that we're wire - * compatible that with that don't support sending the version back. That's anything - * from {@code 8.19.FIRST} to {@code 9.2.0}. "Recently" means any field types we - * added support for after the initial release of ESQL. These fields use - * {@link SupportedVersion#supportedOn} rather than {@link SupportedVersion#SUPPORTED_ON_ALL_NODES}. - * Except for DATE_NANOS. For DATE_NANOS we got lucky/made a mistake. It wasn't widely - * used before ESQL added support for it and we weren't careful about enabling it. So - * queries on mixed version clusters that touch DATE_NANOS will fail. All the types - * added after that, like DENSE_VECTOR, will gracefully disable themselves when talking - * to older nodes. - *

- *

- * Note: Once {@link EsqlResolveFieldsResponse}'s CREATED version is live everywhere - * we can remove this and make sure {@link #minTransportVersion} is non-null. That'll - * be 10.0-ish. - *

- */ - TransportVersion effectiveMinTransportVersion() { - return minTransportVersion != null ? minTransportVersion : TransportVersion.minimumCompatible(); - } - } + ) {} // public for testing only public static IndexResolution mergedMappings(String indexPattern, FieldsInfo fieldsInfo) { @@ -328,8 +355,7 @@ private static EsField createField( IndexFieldCapabilities first = fcs.get(0); List rest = fcs.subList(1, fcs.size()); DataType type = EsqlDataTypeRegistry.INSTANCE.fromEs(first.type(), first.metricType()); - boolean typeSupported = type.supportedVersion() - .supportedOn(fieldsInfo.effectiveMinTransportVersion(), fieldsInfo.currentBuildIsSnapshot) + boolean typeSupported = type.supportedVersion().supportedOn(fieldsInfo.minTransportVersion(), fieldsInfo.currentBuildIsSnapshot) || switch (type) { case AGGREGATE_METRIC_DOUBLE -> fieldsInfo.useAggregateMetricDoubleWhenNotSupported; case DENSE_VECTOR -> fieldsInfo.useDenseVectorWhenNotSupported; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index b21d11f96cfc0..b59c92a1139b2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -601,6 +601,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { FoldContext foldCtx = FoldContext.small(); EsqlSession session = new EsqlSession( getTestName(), + TransportVersion.current(), queryClusterSettings(), null, null, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index f62d065518bdf..3983679a5291f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; @@ -15,6 +16,7 @@ import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.PlanProfile; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import java.util.List; @@ -26,14 +28,21 @@ protected Writeable.Reader instanceReader() { @Override protected EsqlQueryResponse.Profile createTestInstance() { - return new EsqlQueryResponse.Profile(randomDriverProfiles(), randomPlanProfiles()); + return new EsqlQueryResponse.Profile(randomDriverProfiles(), randomPlanProfiles(), randomMinimumVersion()); } @Override protected EsqlQueryResponse.Profile mutateInstance(EsqlQueryResponse.Profile instance) { - return randomBoolean() - ? new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles), instance.plans()) - : new EsqlQueryResponse.Profile(instance.drivers(), randomValueOtherThan(instance.plans(), this::randomPlanProfiles)); + var drivers = instance.drivers(); + var plans = instance.plans(); + var minimumVersion = instance.minimumVersion(); + + switch (between(0, 2)) { + case 0 -> drivers = randomValueOtherThan(drivers, EsqlQueryResponseProfileTests::randomDriverProfiles); + case 1 -> plans = randomValueOtherThan(plans, EsqlQueryResponseProfileTests::randomPlanProfiles); + case 2 -> minimumVersion = randomValueOtherThan(minimumVersion, EsqlQueryResponseProfileTests::randomMinimumVersion); + } + return new EsqlQueryResponse.Profile(drivers, plans, minimumVersion); } @Override @@ -41,7 +50,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(List.of(AbstractPageMappingOperator.Status.ENTRY)); } - private List randomDriverProfiles() { + private static List randomDriverProfiles() { return randomList( 10, () -> new DriverProfile( @@ -53,20 +62,20 @@ private List randomDriverProfiles() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomList(10, this::randomOperatorStatus), + randomList(10, EsqlQueryResponseProfileTests::randomOperatorStatus), DriverSleeps.empty() ) ); } - private List randomPlanProfiles() { + private static List randomPlanProfiles() { return randomList( 10, () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphanumericOfLength(1024)) ); } - private OperatorStatus randomOperatorStatus() { + private static OperatorStatus randomOperatorStatus() { return new OperatorStatus( randomAlphaOfLength(4), randomBoolean() @@ -79,4 +88,8 @@ private OperatorStatus randomOperatorStatus() { : null ); } + + public static TransportVersion randomMinimumVersion() { + return randomBoolean() ? null : EsqlTestUtils.randomMinimumVersion(); + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 77f73f430868e..fee73a591f431 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -9,11 +9,13 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; @@ -980,6 +982,8 @@ private EsqlQueryResponse simple(boolean columnar, boolean async) { } public void testProfileXContent() { + TransportVersion minimumVersion = EsqlQueryResponseProfileTests.randomMinimumVersion(); + try ( EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), @@ -1001,14 +1005,15 @@ public void testProfileXContent() { DriverSleeps.empty() ) ), - List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree")) + List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree")), + minimumVersion ), false, false, null ); ) { - assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(LoggerMessageFormat.format(""" { "documents_found" : 10, "values_loaded" : 100, @@ -1064,9 +1069,10 @@ public void testProfileXContent() { "node_name" : "node-1", "plan" : "plan tree" } - ] + ], + "minimumTransportVersion" : {} } - }""")); + }""", minimumVersion == null ? "null" : minimumVersion.id()))); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 9cb735b955d09..e846363e9e8c9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -454,7 +454,7 @@ EnrichResolution resolvePolicies(Collection clusters, Collection future = new PlainActionFuture<>(); - super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, future); + super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, TransportVersion.current(), future); return future.actionGet(30, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java index b23517dd14088..38d1b8b185a76 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.logging.ESLogMessage; import org.elasticsearch.common.logging.Loggers; @@ -25,6 +26,7 @@ import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -106,7 +108,10 @@ public void testPrioritiesOnSuccess() { for (int i = 0; i < actualTook.length; i++) { EsqlExecutionInfo warnQuery = getEsqlExecutionInfo(actualTook[i], actualPlanningTook[i]); - queryLog.onQueryPhase(new Result(List.of(), List.of(), DriverCompletionInfo.EMPTY, warnQuery), query); + queryLog.onQueryPhase( + new Versioned<>(new Result(List.of(), List.of(), DriverCompletionInfo.EMPTY, warnQuery), TransportVersion.current()), + query + ); if (expectedLevel[i] != null) { assertThat(appender.lastEvent(), is(not(nullValue()))); var msg = (ESLogMessage) appender.lastMessage(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index bf8434e3c11c5..2d94c764f2c20 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.esql.session.EsqlSession; import org.elasticsearch.xpack.esql.session.IndexResolver; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import org.junit.After; import org.junit.Before; import org.mockito.stubbing.Answer; @@ -85,7 +86,7 @@ EnrichPolicyResolver mockEnrichResolver() { ActionListener listener = (ActionListener) arguments[arguments.length - 1]; listener.onResponse(new EnrichResolution()); return null; - }).when(enrichResolver).resolvePolicies(any(), any(), any()); + }).when(enrichResolver).resolvePolicies(any(), any(), any(), any()); return enrichResolver; } @@ -173,6 +174,7 @@ public void testFailedMetric() { planExecutor.esql( request, randomAlphaOfLength(10), + TransportVersion.current(), queryClusterSettings(), enrichResolver, new EsqlExecutionInfo(randomBoolean()), @@ -181,7 +183,7 @@ public void testFailedMetric() { EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override - public void onResponse(Result result) { + public void onResponse(Versioned result) { fail("this shouldn't happen"); } @@ -203,6 +205,7 @@ public void onFailure(Exception e) { planExecutor.esql( request, randomAlphaOfLength(10), + TransportVersion.current(), queryClusterSettings(), enrichResolver, new EsqlExecutionInfo(randomBoolean()), @@ -211,7 +214,7 @@ public void onFailure(Exception e) { EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override - public void onResponse(Result result) {} + public void onResponse(Versioned result) {} @Override public void onFailure(Exception e) { From 040555cdd96fab7dda177c8912e4ac803ac077e4 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 2 Dec 2025 15:10:42 +0100 Subject: [PATCH 2/5] Fix comment --- .../org/elasticsearch/xpack/esql/session/IndexResolver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index 64a635039e1ac..f88a8f74ed4a5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -83,8 +83,8 @@ public IndexResolver(Client client) { } /** - * Like {@code IndexResolver#resolveIndicesVersioned} - * but simplified and does not pass on the determined minimum transport version to the listener. + * Like {@code resolveAsMergedMappingAndRetrieveMinimumVersion} + * but does not pass on the determined minimum transport version to the listener. */ public void resolveAsMergedMapping( String indexWildcard, From fa68b9eacba76b599d9685f11604808164dfcf2a Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 2 Dec 2025 17:51:29 +0100 Subject: [PATCH 3/5] Align PreAnalysisResult with main Make pre-initialization of minimumTransportVersion consistent with main. --- .../elasticsearch/xpack/esql/session/EsqlSession.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 6cf495131e50a..f48cd8e6da388 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -800,7 +800,6 @@ private void preAnalyzeMainIndicesAndRetrieveMinTransportVersion( // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))) - .withMinimumTransportVersion(TransportVersion.current()) ); } else { indexResolver.resolveAsMergedMappingAndRetrieveMinimumVersion( @@ -976,7 +975,8 @@ public record PreAnalysisResult( ) { public PreAnalysisResult(Set fieldNames, Set wildcardJoinIndices) { - this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY, null); + this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY, + TransportVersion.current()); } PreAnalysisResult withIndices(IndexResolution indices) { @@ -1021,6 +1021,12 @@ PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolutio } PreAnalysisResult withMinimumTransportVersion(TransportVersion minimumTransportVersion) { + if (this.minimumTransportVersion != null) { + if (this.minimumTransportVersion.equals(minimumTransportVersion)) { + return this; + } + minimumTransportVersion = TransportVersion.min(this.minimumTransportVersion, minimumTransportVersion); + } return new PreAnalysisResult( fieldNames, wildcardJoinIndices, From 2688da38fea2904cfb812de07b33f853f45ff08e Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 2 Dec 2025 18:07:54 +0100 Subject: [PATCH 4/5] Use min version from the correct field caps In 9.2, this was in the forked field caps response, not the original field caps response. --- .../org/elasticsearch/xpack/esql/session/IndexResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index f88a8f74ed4a5..449229954e15c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -151,7 +151,7 @@ public void resolveAsMergedMappingAndRetrieveMinimumVersion( EsqlResolveFieldsAction.TYPE, createFieldCapsRequest(indexWildcard, fieldNames, requestFilter, includeAllDimensions), listener.delegateFailureAndWrap((l, response) -> { - TransportVersion responseMinimumVersion = response.caps().minTransportVersion(); + TransportVersion responseMinimumVersion = response.minTransportVersion(); // Note: Once {@link EsqlResolveFieldsResponse}'s CREATED version is live everywhere // we can remove this and make sure responseMinimumVersion is non-null. That'll be 10.0-ish. TransportVersion overallMinimumVersion = responseMinimumVersion == null From f02f1fad3d197b392512e90f5d56de069b14f23f Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 3 Dec 2025 07:16:17 +0000 Subject: [PATCH 5/5] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index f48cd8e6da388..fb9ac5711983a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -975,8 +975,7 @@ public record PreAnalysisResult( ) { public PreAnalysisResult(Set fieldNames, Set wildcardJoinIndices) { - this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY, - TransportVersion.current()); + this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY, TransportVersion.current()); } PreAnalysisResult withIndices(IndexResolution indices) {