diff --git a/docs/changelog/137431.yaml b/docs/changelog/137431.yaml new file mode 100644 index 0000000000000..451fc5eabc1a7 --- /dev/null +++ b/docs/changelog/137431.yaml @@ -0,0 +1,5 @@ +pr: 137431 +summary: Fix enrich and lookup join resolution based on min transport version +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/resources/transport/definitions/referable/esql_use_minimum_version_for_enrich_resolution.csv b/server/src/main/resources/transport/definitions/referable/esql_use_minimum_version_for_enrich_resolution.csv new file mode 100644 index 0000000000000..58fae920e0a63 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_use_minimum_version_for_enrich_resolution.csv @@ -0,0 +1 @@ +9231000,9185011 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 482517b9a61d6..106690c0c4247 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -aggregate_metric_double_typed_block,9227000 +esql_use_minimum_version_for_enrich_resolution,9231000 diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 5ddd925e4e803..8d8a714451d79 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2733,7 +2733,8 @@ protected static MapMatcher getProfileMatcher() { .entry("query", instanceOf(Map.class)) .entry("planning", instanceOf(Map.class)) .entry("drivers", instanceOf(List.class)) - .entry("plans", instanceOf(List.class)); + .entry("plans", instanceOf(List.class)) + .entry("minimumTransportVersion", instanceOf(Integer.class)); } protected static MapMatcher getResultMatcher(boolean includePartial, boolean includeDocumentsFound) { diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index bff8ab8c7037f..7a8c020d413eb 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -15,6 +15,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; +import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.util.PlanStreamInput; import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput; @@ -751,13 +752,9 @@ public DataType counter() { public void writeTo(StreamOutput out) throws IOException { if (supportedVersion.supportedOn(out.getTransportVersion(), Build.current().isSnapshot()) == false) { /* - * TODO when we implement version aware planning flip this to an IllegalStateException - * so we throw a 500 error. It'll be our bug then. Right now it's a sign that the user - * tried to do something like `KNN(dense_vector_field, [1, 2])` against an old node. - * Like, during the rolling upgrade that enables KNN or to a remote cluster that has - * not yet been upgraded. + * Throw a 500 error - this is a bug, we failed to account for an old node during planning. */ - throw new IllegalArgumentException( + throw new QlIllegalArgumentException( "remote node at version [" + out.getTransportVersion() + "] doesn't understand data type [" + this + "]" ); } diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java index fc0763f7d0777..43ba02c166304 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java @@ -50,10 +50,18 @@ public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPrefer public void createRemoteIndices() throws IOException { if (supportsNodeAssignment()) { for (Map.Entry e : remoteNodeToInfo().entrySet()) { - createIndexForNode(remoteClient(), e.getKey(), e.getValue().id()); + createIndexForNode(remoteClient(), e.getKey(), e.getValue().id(), indexMode()); } } else { - createIndexForNode(remoteClient(), null, null); + createIndexForNode(remoteClient(), null, null, indexMode()); + } + + // We need a single lookup index that has the same name across all clusters, as well as a single enrich policy per cluster. + // We create both only when we're testing LOOKUP mode. + if (indexExists(remoteClient(), LOOKUP_INDEX_NAME) == false && indexMode() == IndexMode.LOOKUP) { + createAllTypesIndex(remoteClient(), LOOKUP_INDEX_NAME, null, indexMode()); + createAllTypesDoc(remoteClient(), LOOKUP_INDEX_NAME); + createEnrichPolicy(remoteClient(), LOOKUP_INDEX_NAME, ENRICH_POLICY_NAME); } } @@ -101,4 +109,16 @@ && clusterHasCapability(remoteClient(), "GET", "/_query", List.of(), List.of("DE false ); } + + @Override + protected boolean fetchAllIsCrossCluster() { + return true; + } + + public final void testFetchAllOnlyFromRemotes() throws IOException { + doTestFetchAll(fromAllQuery("*:%mode%*", """ + , _id, _ignored, _index_mode, _score, _source, _version + | LIMIT 1000 + """), remoteNodeToInfo(), allNodeToInfo()); + } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java index a82ecd16334ac..d11d1cbf55043 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java @@ -365,6 +365,7 @@ private void testPushQuery( .entry("plans", instanceOf(List.class)) .entry("planning", matchesMap().extraOk()) .entry("query", matchesMap().extraOk()) + .entry("minimumTransportVersion", instanceOf(Integer.class)) ), matchesList().item(matchesMap().entry("name", "test").entry("type", anyOf(equalTo("text"), equalTo("keyword")))), equalTo(found ? List.of(List.of(value)) : List.of()) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java index 73cc14c195269..5b1adfdbd6e8c 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java @@ -122,6 +122,7 @@ private void testQuery(Double percent, String query, int documentsFound, boolean .entry("plans", instanceOf(List.class)) .entry("planning", matchesMap().extraOk()) .entry("query", matchesMap().extraOk()) + .entry("minimumTransportVersion", instanceOf(Integer.class)) ) .extraOk() ); diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java index c7ae359993bb4..3c142dbd09424 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java @@ -9,17 +9,19 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.http.HttpHost; import org.apache.http.util.EntityUtils; import org.elasticsearch.Build; import org.elasticsearch.TransportVersion; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentBuilder; @@ -44,10 +46,11 @@ import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_CREATED_TV; import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_USED_TV; import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION; import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_DENSE_VECTOR_CREATED_VERSION; -import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.INDEX_SOURCE; +import static org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver.ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION; import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; @@ -72,7 +75,6 @@ * load constant field values and have simple mappings. */ public class AllSupportedFieldsTestCase extends ESRestTestCase { - private static final Logger logger = LogManager.getLogger(FieldExtractorTestCase.class); @Rule(order = Integer.MIN_VALUE) public ProfileLogger profileLogger = new ProfileLogger(); @@ -100,11 +102,22 @@ protected AllSupportedFieldsTestCase(MappedFieldType.FieldExtractPreference extr this.indexMode = indexMode; } - protected record NodeInfo(String cluster, String id, boolean snapshot, TransportVersion version, Set roles) {} + protected IndexMode indexMode() { + return indexMode; + } + + protected record NodeInfo( + String cluster, + String id, + boolean snapshot, + TransportVersion version, + Set roles, + Set boundAddress + ) {} private static Map nodeToInfo; - private Map nodeToInfo() throws IOException { + private Map localNodeToInfo() throws IOException { if (nodeToInfo == null) { nodeToInfo = fetchNodeToInfo(client(), null); } @@ -152,7 +165,7 @@ protected boolean supportsNodeAssignment() throws IOException { * Map from node name to information about the node. */ protected Map allNodeToInfo() throws IOException { - return nodeToInfo(); + return localNodeToInfo(); } protected static Map fetchNodeToInfo(RestClient client, String cluster) throws IOException { @@ -164,9 +177,13 @@ protected static Map fetchNodeToInfo(RestClient client, String String id = (String) n.getKey(); Map nodeInfo = (Map) n.getValue(); String nodeName = (String) extractValue(nodeInfo, "name"); + Map http = (Map) extractValue(nodeInfo, "http"); + List unparsedBoundAddress = (List) extractValue(http, "bound_address"); + // The bound address can actually be 2 addresses, one ipv4 and one ipv6; stuff 'em in a set. + Set boundAddress = unparsedBoundAddress.stream().map(s -> HttpHost.create((String) s)).collect(Collectors.toSet()); /* - * Figuring out is a node is a snapshot is kind of tricky. The main version + * Figuring out if a node is a snapshot is kind of tricky. The main version * doesn't include -SNAPSHOT. But ${VERSION}-SNAPSHOT is in the node info * *somewhere*. So we do this silly toString here. */ @@ -178,21 +195,39 @@ protected static Map fetchNodeToInfo(RestClient client, String nodeToInfo.put( nodeName, - new NodeInfo(cluster, id, snapshot, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet())) + new NodeInfo( + cluster, + id, + snapshot, + transportVersion, + roles.stream().map(Object::toString).collect(Collectors.toSet()), + boundAddress + ) ); } return nodeToInfo; } + protected static final String ENRICH_POLICY_NAME = "all_fields_policy"; + protected static final String LOOKUP_INDEX_NAME = "all_fields_lookup_index"; + @Before public void createIndices() throws IOException { if (supportsNodeAssignment()) { - for (Map.Entry e : nodeToInfo().entrySet()) { - createIndexForNode(client(), e.getKey(), e.getValue().id()); + for (Map.Entry e : localNodeToInfo().entrySet()) { + createIndexForNode(client(), e.getKey(), e.getValue().id(), indexMode); } } else { - createIndexForNode(client(), null, null); + createIndexForNode(client(), null, null, indexMode); + } + + // We need a single lookup index that has the same name across all clusters, as well as a single enrich policy per cluster. + // We create both only when we're testing LOOKUP mode. + if (indexExists(LOOKUP_INDEX_NAME) == false && indexMode == IndexMode.LOOKUP) { + createAllTypesIndex(client(), LOOKUP_INDEX_NAME, null, indexMode); + createAllTypesDoc(client(), LOOKUP_INDEX_NAME); + createEnrichPolicy(client(), LOOKUP_INDEX_NAME, ENRICH_POLICY_NAME); } } @@ -200,7 +235,7 @@ public void createIndices() throws IOException { * Make sure the test doesn't run on snapshot builds. Release builds only. *

* {@link Build#isSnapshot()} checks if the version under test is a snapshot. - * But! This run test runs against many versions and if *any* are snapshots + * But! This test runs against many versions and if *any* are snapshots * then this will fail. So we check the versions of each node in the cluster too. *

*/ @@ -212,45 +247,143 @@ public void skipSnapshots() throws IOException { } } - // TODO: Also add a test for _tsid once we can determine the minimum transport version of all nodes. public final void testFetchAll() throws IOException { - Map response = esql(""" + doTestFetchAll(fromAllQuery(""" + , _id, _ignored, _index_mode, _score, _source, _version + | LIMIT 1000 + """), allNodeToInfo(), allNodeToInfo()); + } + + public final void testFetchAllEnrich() throws IOException { + assumeTrue("Test only requires the enrich policy (made from a lookup index)", indexMode == IndexMode.LOOKUP); + // The ENRICH is a no-op because it overwrites columns with the same identical data (except that it messes with + // the order of the columns, but we don't assert that). + doTestFetchAll(fromAllQuery(LoggerMessageFormat.format(null, """ , _id, _ignored, _index_mode, _score, _source, _version + | ENRICH _remote:{} ON {} | LIMIT 1000 - """); + """, ENRICH_POLICY_NAME, LOOKUP_ID_FIELD)), allNodeToInfo(), allNodeToInfo()); + } + + public final void testFetchAllLookupJoin() throws IOException { + assumeTrue("Test only requires lookup indices", indexMode == IndexMode.LOOKUP); + // The LOOKUP JOIN is a no-op because it overwrites columns with the same identical data (except that it messes with + // the order of the columns, but we don't assert that). + // We force the lookup join on to the remotes by having a SORT after it. + doTestFetchAll(fromAllQuery(LoggerMessageFormat.format(null, """ + , _id, _ignored, _index_mode, _score, _source, _version + | LOOKUP JOIN {} ON {} + | SORT _id + | LIMIT 1000 + """, LOOKUP_INDEX_NAME, LOOKUP_ID_FIELD)), allNodeToInfo(), allNodeToInfo()); + } + + /** + * Runs the query and expects 1 document per index on the contributing nodes as well as all the columns. + */ + protected final void doTestFetchAll( + String query, + Map nodesContributingIndices, + Map nodesInvolvedInExecution + ) throws IOException { + var responseAndCoordinatorVersion = runQuery(query); + + Map response = responseAndCoordinatorVersion.v1(); + TransportVersion coordinatorVersion = responseAndCoordinatorVersion.v2(); + + assertNoPartialResponse(response); + + List columns = (List) response.get("columns"); + List values = (List) response.get("values"); + + MapMatcher expectedColumns = allTypesColumnsMatcher(coordinatorVersion, minVersion(), indexMode, extractPreference, true, true); + assertMap(nameToType(columns), expectedColumns); + + MapMatcher expectedAllValues = matchesMap(); + for (Map.Entry e : expectedIndices(indexMode, nodesContributingIndices).entrySet()) { + String indexName = e.getKey(); + MapMatcher expectedValues = allTypesValuesMatcher( + coordinatorVersion, + minVersion(), + indexMode, + extractPreference, + true, + true, + indexName + ); + expectedAllValues = expectedAllValues.entry(indexName, expectedValues); + } + assertMap(indexToRow(columns, values), expectedAllValues); + + assertMinimumVersion(minVersion(nodesInvolvedInExecution), responseAndCoordinatorVersion, true, fetchAllIsCrossCluster()); + + profileLogger.clearProfile(); + } + + protected boolean fetchAllIsCrossCluster() { + return false; + } + + protected static void assertNoPartialResponse(Map response) { if ((Boolean) response.get("is_partial")) { throw new AssertionError("partial results: " + response); } - List columns = (List) response.get("columns"); - List values = (List) response.get("values"); + } - MapMatcher expectedColumns = matchesMap(); + protected static MapMatcher allTypesColumnsMatcher( + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode, + MappedFieldType.FieldExtractPreference extractPreference, + boolean expectMetadataFields, + boolean expectNonEnrichableFields + ) { + MapMatcher expectedColumns = matchesMap().entry(LOOKUP_ID_FIELD, "integer"); for (DataType type : DataType.values()) { if (supportedInIndex(type) == false) { continue; } - expectedColumns = expectedColumns.entry(fieldName(type), expectedType(type)); - } - expectedColumns = expectedColumns.entry("_id", "keyword") - .entry("_ignored", "keyword") - .entry("_index", "keyword") - .entry("_index_mode", "keyword") - .entry("_score", "double") - .entry("_source", "_source") - .entry("_version", "long"); - assertMap(nameToType(columns), expectedColumns); + if (expectNonEnrichableFields == false && supportedInEnrich(type) == false) { + continue; + } + expectedColumns = expectedColumns.entry(fieldName(type), expectedType(type, coordinatorVersion, minimumVersion, indexMode)); + } + if (expectMetadataFields) { + expectedColumns = expectedColumns.entry("_id", "keyword") + .entry("_ignored", "keyword") + .entry("_index", "keyword") + .entry("_index_mode", "keyword") + .entry("_score", "double") + .entry("_source", "_source") + .entry("_version", "long"); + } + return expectedColumns; + } - MapMatcher expectedAllValues = matchesMap(); - for (Map.Entry e : expectedIndices().entrySet()) { - String indexName = e.getKey(); - NodeInfo nodeInfo = e.getValue(); - MapMatcher expectedValues = matchesMap(); - for (DataType type : DataType.values()) { - if (supportedInIndex(type) == false) { - continue; - } - expectedValues = expectedValues.entry(fieldName(type), expectedValue(type, nodeInfo)); + protected static MapMatcher allTypesValuesMatcher( + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode, + MappedFieldType.FieldExtractPreference extractPreference, + boolean expectMetadataFields, + boolean expectNonEnrichableFields, + String indexName + ) { + MapMatcher expectedValues = matchesMap(); + expectedValues = expectedValues.entry(LOOKUP_ID_FIELD, equalTo(123)); + for (DataType type : DataType.values()) { + if (supportedInIndex(type) == false) { + continue; + } + if (expectNonEnrichableFields == false && supportedInEnrich(type) == false) { + continue; } + expectedValues = expectedValues.entry( + fieldName(type), + expectedValue(type, coordinatorVersion, minimumVersion, indexMode, extractPreference) + ); + } + if (expectMetadataFields) { expectedValues = expectedValues.entry("_id", any(String.class)) .entry("_ignored", nullValue()) .entry("_index", indexName) @@ -258,10 +391,9 @@ public final void testFetchAll() throws IOException { .entry("_score", 0.0) .entry("_source", matchesMap().extraOk()) .entry("_version", 1); - expectedAllValues = expectedAllValues.entry(indexName, expectedValues); } - assertMap(indexToRow(columns, values), expectedAllValues); - profileLogger.clearProfile(); + + return expectedValues; } /** @@ -280,7 +412,10 @@ public final void testFetchDenseVector() throws IOException { | EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector """ + request; } - response = esql(request); + var responseAndCoordinatorVersion = runQuery(fromAllQuery(request)); + assertMinimumVersionFromAllQueries(responseAndCoordinatorVersion); + + response = runQuery(fromAllQuery(request)).v1(); if ((Boolean) response.get("is_partial")) { Map clusters = (Map) response.get("_clusters"); Map details = (Map) clusters.get("details"); @@ -319,11 +454,10 @@ public final void testFetchDenseVector() throws IOException { assertMap(nameToType(columns), expectedColumns); MapMatcher expectedAllValues = matchesMap(); - for (Map.Entry e : expectedIndices().entrySet()) { + for (Map.Entry e : expectedIndices(indexMode).entrySet()) { String indexName = e.getKey(); - NodeInfo nodeInfo = e.getValue(); MapMatcher expectedValues = matchesMap(); - expectedValues = expectedValues.entry("f_dense_vector", expectedDenseVector(nodeInfo.version)); + expectedValues = expectedValues.entry("f_dense_vector", matchesList().item(0.5).item(10.0).item(5.9999995)); expectedValues = expectedValues.entry("_index", indexName); expectedAllValues = expectedAllValues.entry(indexName, expectedValues); } @@ -347,7 +481,10 @@ public final void testFetchAggregateMetricDouble() throws IOException { | EVAL junk = TO_AGGREGATE_METRIC_DOUBLE(1) // workaround to enable fetching aggregate_metric_double """ + request; } - response = esql(request); + var responseAndCoordinatorVersion = runQuery(fromAllQuery(request)); + assertMinimumVersionFromAllQueries(responseAndCoordinatorVersion); + + response = runQuery(fromAllQuery(request)).v1(); if ((Boolean) response.get("is_partial")) { Map clusters = (Map) response.get("_clusters"); Map details = (Map) clusters.get("details"); @@ -386,9 +523,8 @@ public final void testFetchAggregateMetricDouble() throws IOException { assertMap(nameToType(columns), expectedColumns); MapMatcher expectedAllValues = matchesMap(); - for (Map.Entry e : expectedIndices().entrySet()) { + for (Map.Entry e : expectedIndices(indexMode).entrySet()) { String indexName = e.getKey(); - NodeInfo nodeInfo = e.getValue(); MapMatcher expectedValues = matchesMap(); expectedValues = expectedValues.entry( "f_aggregate_metric_double", @@ -400,10 +536,112 @@ public final void testFetchAggregateMetricDouble() throws IOException { assertMap(indexToRow(columns, values), expectedAllValues); } - private Map esql(String query) throws IOException { + protected String fromAllQuery(String indexPattern, String restOfQuery) { + return ("FROM " + indexPattern + " METADATA _index").replace("%mode%", indexMode.toString()) + restOfQuery; + } + + protected String fromAllQuery(String restOfQuery) { + return fromAllQuery("*:%mode%*,%mode%*", restOfQuery); + } + + public void testRow() throws IOException { + assumeTrue( + "Test has to run only once, skip on other configurations", + extractPreference == MappedFieldType.FieldExtractPreference.NONE && indexMode == IndexMode.STANDARD + ); + String query = "ROW x = 1 | LIMIT 1"; + var responseAndCoordinatorVersion = runQuery(query); + + assertMinimumVersion(minVersion(localNodeToInfo()), responseAndCoordinatorVersion, false, false); + } + + @SuppressWarnings("unchecked") + public void testRowLookupJoin() throws IOException { + assumeTrue("Test only requires the lookup index", indexMode == IndexMode.LOOKUP); + String query = "ROW " + LOOKUP_ID_FIELD + " = 123 | LOOKUP JOIN " + LOOKUP_INDEX_NAME + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1"; + var responseAndCoordinatorVersion = runQuery(query); + TransportVersion expectedMinimumVersion = minVersion(localNodeToInfo()); + + assertMinimumVersion(expectedMinimumVersion, responseAndCoordinatorVersion, false, false); + + Map response = responseAndCoordinatorVersion.v1(); + TransportVersion coordinatorVersion = responseAndCoordinatorVersion.v2(); + + assertNoPartialResponse(response); + + List columns = (List) response.get("columns"); + List values = (List) response.get("values"); + + MapMatcher expectedColumns = allTypesColumnsMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + true + ); + assertMap(nameToType(columns), expectedColumns); + + MapMatcher expectedValues = allTypesValuesMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + true, + null + ); + assertMap(nameToValue(names(columns), (List) values.getFirst()), expectedValues); + } + + @SuppressWarnings("unchecked") + public void testRowEnrich() throws IOException { + assumeTrue("Test only requires the enrich policy (made from a lookup index)", indexMode == IndexMode.LOOKUP); + String query = "ROW " + LOOKUP_ID_FIELD + " = 123 | ENRICH " + ENRICH_POLICY_NAME + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1"; + var responseAndCoordinatorVersion = runQuery(query); + Map response = responseAndCoordinatorVersion.v1(); + TransportVersion coordinatorVersion = responseAndCoordinatorVersion.v2(); + TransportVersion expectedMinimumVersion = minVersion(localNodeToInfo()); + + assertMinimumVersion(expectedMinimumVersion, responseAndCoordinatorVersion, false, false); + + assertNoPartialResponse(response); + + List columns = (List) response.get("columns"); + List values = (List) response.get("values"); + + MapMatcher expectedColumns = allTypesColumnsMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + false + ); + assertMap(nameToType(columns), expectedColumns); + + MapMatcher expectedValues = allTypesValuesMatcher( + coordinatorVersion, + expectedMinimumVersion, + indexMode, + extractPreference, + false, + false, + null + ); + assertMap(nameToValue(names(columns), (List) values.getFirst()), expectedValues); + } + + /** + * Run the query and return the response and the version of the coordinator. + *

+ * Fails if the response contains any warnings. + */ + @SuppressWarnings("unchecked") + private Tuple, TransportVersion> runQuery(String query) throws IOException { Request request = new Request("POST", "_query"); XContentBuilder body = JsonXContent.contentBuilder().startObject(); - body.field("query", "FROM *:%mode%*,%mode%* METADATA _index".replace("%mode%", indexMode.toString()) + query); + body.field("query", query); { body.startObject("pragma"); if (extractPreference != null) { @@ -417,29 +655,81 @@ private Map esql(String query) throws IOException { body.endObject(); request.setJsonEntity(Strings.toString(body)); - Map response = responseAsMap(client().performRequest(request)); - profileLogger.extractProfile(response, true); - return response; + Response response = client().performRequest(request); + Map responseMap = responseAsMap(response); + HttpHost coordinatorHost = response.getHost(); + NodeInfo coordinator = allNodeToInfo().values().stream().filter(n -> n.boundAddress().contains(coordinatorHost)).findFirst().get(); + TransportVersion coordinatorVersion = coordinator.version(); + + profileLogger.extractProfile(responseMap, true); + return new Tuple<>(responseMap, coordinatorVersion); } - protected void createIndexForNode(RestClient client, String nodeName, String nodeId) throws IOException { - String indexName = indexMode.toString(); - if (nodeName != null) { - indexName += "_" + nodeName.toLowerCase(Locale.ROOT); + protected void assertMinimumVersionFromAllQueries(Tuple, TransportVersion> responseAndCoordinatorVersion) + throws IOException { + assertMinimumVersion(minVersion(), responseAndCoordinatorVersion, true, fetchAllIsCrossCluster()); + } + + /** + * @param expectedMinimumVersion the minimum version of all clusters that participate in the query + * @param performsMainFieldCapsRequest {@code true} for queries that have a {@code FROM} command, so we don't retrieve the minimum + * version from the main field caps response. + */ + @SuppressWarnings("unchecked") + protected void assertMinimumVersion( + TransportVersion expectedMinimumVersion, + Tuple, TransportVersion> responseAndCoordinatorVersion, + boolean performsMainFieldCapsRequest, + boolean isCrossCluster + ) { + var responseMap = responseAndCoordinatorVersion.v1(); + var coordinatorVersion = responseAndCoordinatorVersion.v2(); + + if (coordinatorVersion.supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + Map profile = (Map) responseMap.get("profile"); + Integer minimumVersion = (Integer) profile.get("minimumTransportVersion"); + assertNotNull(minimumVersion); + int minimumVersionInt = minimumVersion; + if (expectedMinimumVersion.supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) + || (performsMainFieldCapsRequest == false) + || (isCrossCluster == false)) { + assertEquals(expectedMinimumVersion.id(), minimumVersionInt); + } else { + // If a remote cluster is old enough that it doesn't provide version information in the field caps response, the coordinator + // HAS to assume the oldest compatible version. + // This only applies to multi-cluster tests; if we're looking at a mixed cluster, the coordinator is new enough + // that it's field caps response will include the min cluster version. (Apparently the field caps request is performed + // directly on the coordinator.) + assertEquals(TransportVersion.minimumCompatible().id(), minimumVersionInt); + } } + } + + protected static void createIndexForNode(RestClient client, String nodeName, String nodeId, IndexMode mode) throws IOException { + String indexName = indexName(mode, nodeName); if (false == indexExists(client, indexName)) { - createAllTypesIndex(client, indexName, nodeId); + createAllTypesIndex(client, indexName, nodeId, mode); createAllTypesDoc(client, indexName); } } - private void createAllTypesIndex(RestClient client, String indexName, String nodeId) throws IOException { + protected static String indexName(IndexMode mode, String nodeName) { + String indexName = mode.toString(); + if (nodeName != null) { + indexName += "_" + nodeName.toLowerCase(Locale.ROOT); + } + return indexName; + } + + private static final String LOOKUP_ID_FIELD = "lookup_id"; + + protected static void createAllTypesIndex(RestClient client, String indexName, String nodeId, IndexMode mode) throws IOException { XContentBuilder config = JsonXContent.contentBuilder().startObject(); { config.startObject("settings"); config.startObject("index"); - config.field("mode", indexMode); - if (indexMode == IndexMode.TIME_SERIES) { + config.field("mode", mode); + if (mode == IndexMode.TIME_SERIES) { config.field("routing_path", "f_keyword"); } if (nodeId != null) { @@ -450,14 +740,20 @@ private void createAllTypesIndex(RestClient client, String indexName, String nod } { config.startObject("mappings").startObject("properties"); + + config.startObject(LOOKUP_ID_FIELD); + config.field("type", "integer"); + config.endObject(); + for (DataType type : DataType.values()) { if (supportedInIndex(type) == false) { continue; } config.startObject(fieldName(type)); - typeMapping(indexMode, config, type); + typeMapping(mode, config, type); config.endObject(); } + config.endObject().endObject().endObject(); } Request request = new Request("PUT", indexName); @@ -465,11 +761,11 @@ private void createAllTypesIndex(RestClient client, String indexName, String nod client.performRequest(request); } - private String fieldName(DataType type) { + private static String fieldName(DataType type) { return type == DataType.DATETIME ? "@timestamp" : "f_" + type.esType(); } - private void typeMapping(IndexMode indexMode, XContentBuilder config, DataType type) throws IOException { + private static void typeMapping(IndexMode indexMode, XContentBuilder config, DataType type) throws IOException { switch (type) { case COUNTER_DOUBLE, COUNTER_INTEGER, COUNTER_LONG -> config.field("type", type.esType().replace("counter_", "")) .field("time_series_metric", "counter"); @@ -488,8 +784,10 @@ private void typeMapping(IndexMode indexMode, XContentBuilder config, DataType t } } - private void createAllTypesDoc(RestClient client, String indexName) throws IOException { + protected static void createAllTypesDoc(RestClient client, String indexName) throws IOException { XContentBuilder doc = JsonXContent.contentBuilder().startObject(); + doc.field(LOOKUP_ID_FIELD); + doc.value(123); for (DataType type : DataType.values()) { if (supportedInIndex(type) == false) { continue; @@ -524,8 +822,42 @@ private void createAllTypesDoc(RestClient client, String indexName) throws IOExc client.performRequest(request); } - // This will become dependent on the minimum transport version of all nodes once we can determine that. - private Matcher expectedValue(DataType type, NodeInfo nodeInfo) throws IOException { + protected static void createEnrichPolicy(RestClient client, String indexName, String policyName) throws IOException { + XContentBuilder policyConfig = JsonXContent.contentBuilder().startObject(); + { + policyConfig.startObject("match"); + + policyConfig.field("indices", indexName); + policyConfig.field("match_field", LOOKUP_ID_FIELD); + List enrichFields = new ArrayList<>(); + for (DataType type : DataType.values()) { + if (supportedInIndex(type) == false || supportedInEnrich(type) == false) { + continue; + } + enrichFields.add(fieldName(type)); + } + policyConfig.field("enrich_fields", enrichFields); + + policyConfig.endObject(); + } + policyConfig.endObject(); + + Request request = new Request("PUT", "_enrich/policy/" + policyName); + request.setJsonEntity(Strings.toString(policyConfig)); + client.performRequest(request); + + Request execute = new Request("PUT", "_enrich/policy/" + policyName + "/_execute"); + request.addParameter("wait_for_completion", "true"); + client.performRequest(execute); + } + + private static Matcher expectedValue( + DataType type, + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode, + MappedFieldType.FieldExtractPreference extractPreference + ) { return switch (type) { case BOOLEAN -> equalTo(true); case COUNTER_LONG, LONG, COUNTER_INTEGER, INTEGER, UNSIGNED_LONG, SHORT, BYTE -> equalTo(1); @@ -538,21 +870,23 @@ private Matcher expectedValue(DataType type, NodeInfo nodeInfo) throws IOExce case DATETIME, DATE_NANOS -> equalTo("2025-01-01T01:00:00.000Z"); case IP -> equalTo("192.168.0.1"); case VERSION -> equalTo("1.0.0-SNAPSHOT"); - case GEO_POINT -> extractPreference == MappedFieldType.FieldExtractPreference.DOC_VALUES || syntheticSourceByDefault() + case GEO_POINT -> extractPreference == MappedFieldType.FieldExtractPreference.DOC_VALUES || syntheticSourceByDefault(indexMode) ? equalTo("POINT (-71.34000004269183 41.1199999647215)") : equalTo("POINT (-71.34 41.12)"); case GEO_SHAPE -> equalTo("POINT (-71.34 41.12)"); case NULL -> nullValue(); case AGGREGATE_METRIC_DOUBLE -> { - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { + // See expectedType for an explanation + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { yield nullValue(); } yield equalTo("{\"min\":-302.5,\"max\":702.3,\"sum\":200.0,\"value_count\":25}"); } case DENSE_VECTOR -> { - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { + // See expectedType for an explanation + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { yield nullValue(); } yield equalTo(List.of(0.5, 10.0, 5.9999995)); @@ -561,12 +895,6 @@ private Matcher expectedValue(DataType type, NodeInfo nodeInfo) throws IOExce }; } - private Matcher> expectedDenseVector(TransportVersion version) { - return version.supports(INDEX_SOURCE) // *after* 9.1 - ? matchesList().item(0.5).item(10.0).item(5.9999995) - : matchesList().item(0.04283529).item(0.85670584).item(0.5140235); - } - /** * Is the type supported in indices? */ @@ -585,7 +913,21 @@ private static boolean supportedInIndex(DataType t) { }; } - private Map nameToType(List columns) { + /** + * Is the type supported in enrich policies? + */ + private static boolean supportedInEnrich(DataType t) { + return switch (t) { + // Enrich policies don't work with types that have mandatory fields in the mapping. + // https://github.com/elastic/elasticsearch/issues/127350 + case AGGREGATE_METRIC_DOUBLE, SCALED_FLOAT, + // https://github.com/elastic/elasticsearch/issues/137699 + DENSE_VECTOR -> false; + default -> true; + }; + } + + private static Map nameToType(List columns) { Map result = new TreeMap<>(); for (Object c : columns) { Map map = (Map) c; @@ -594,7 +936,7 @@ private Map nameToType(List columns) { return result; } - private List names(List columns) { + private static List names(List columns) { List result = new ArrayList<>(); for (Object c : columns) { Map map = (Map) c; @@ -603,21 +945,21 @@ private List names(List columns) { return result; } - private Map> indexToRow(List columns, List values) { + private static Map> indexToRow(List columns, List values) { List names = names(columns); - int timestampIdx = names.indexOf("_index"); - if (timestampIdx < 0) { + int indexNameIdx = names.indexOf("_index"); + if (indexNameIdx < 0) { throw new IllegalStateException("query didn't return _index"); } Map> result = new TreeMap<>(); for (Object r : values) { List row = (List) r; - result.put(row.get(timestampIdx).toString(), nameToValue(names, row)); + result.put(row.get(indexNameIdx).toString(), nameToValue(names, row)); } return result; } - private Map nameToValue(List names, List values) { + private static Map nameToValue(List names, List values) { Map result = new TreeMap<>(); for (int i = 0; i < values.size(); i++) { result.put(names.get(i), values.get(i)); @@ -625,8 +967,16 @@ private Map nameToValue(List names, List values) { return result; } - // This will become dependent on the minimum transport version of all nodes once we can determine that. - private Matcher expectedType(DataType type) throws IOException { + private Matcher expectedType(DataType type, TransportVersion coordinatorVersion) throws IOException { + return expectedType(type, coordinatorVersion, minVersion(), indexMode); + } + + private static Matcher expectedType( + DataType type, + TransportVersion coordinatorVersion, + TransportVersion minimumVersion, + IndexMode indexMode + ) { return switch (type) { case COUNTER_DOUBLE, COUNTER_LONG, COUNTER_INTEGER -> { if (indexMode == IndexMode.TIME_SERIES) { @@ -638,20 +988,23 @@ private Matcher expectedType(DataType type) throws IOException { case HALF_FLOAT, SCALED_FLOAT, FLOAT -> equalTo("double"); case NULL -> equalTo("keyword"); case AGGREGATE_METRIC_DOUBLE -> { - // RESOLVE_FIELDS_RESPONSE_USED_TV is newer and technically sufficient to check. - // We also check for ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION for clarity. - // Future data types added here should only require the TV when they were created, - // because it will be after RESOLVE_FIELDS_RESPONSE_USED_TV. - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { + // 9.2.0 nodes have ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION and support this type + // when they are data nodes, but not as coordinators! + // (Unless the query uses functions that depend on this type, which is a workaround + // for missing version-awareness in 9.2.0, and not considered here.) + // RESOLVE_FIELDS_RESPONSE_USED_TV is newer and marks the point when coordinators + // started to be able to plan for this data type, and will consider it supported if + // all nodes are on ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION or newer. + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) { yield equalTo("unsupported"); } yield equalTo("aggregate_metric_double"); } case DENSE_VECTOR -> { - logger.error("ADFDAFAF " + minVersion()); - if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false - || minVersion().supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { + // Same dance as for AGGREGATE_METRIC_DOUBLE + if (coordinatorVersion.supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false + || minimumVersion.supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) { yield equalTo("unsupported"); } yield equalTo("dense_vector"); @@ -665,37 +1018,55 @@ protected boolean preserveClusterUponCompletion() { return true; } - private boolean syntheticSourceByDefault() { + private static boolean syntheticSourceByDefault(IndexMode indexMode) { return switch (indexMode) { case TIME_SERIES, LOGSDB -> true; case STANDARD, LOOKUP -> false; }; } - private Map expectedIndices() throws IOException { + private Map expectedIndices(IndexMode indexMode) throws IOException { + return expectedIndices(indexMode, allNodeToInfo()); + } + + protected Map expectedIndices(IndexMode indexMode, Map nodeToInfo) throws IOException { Map result = new TreeMap<>(); if (supportsNodeAssignment()) { - for (Map.Entry e : allNodeToInfo().entrySet()) { - String name = indexMode + "_" + e.getKey(); + for (Map.Entry e : nodeToInfo.entrySet()) { + String name = indexName(indexMode, e.getKey()); if (e.getValue().cluster != null) { name = e.getValue().cluster + ":" + name; } result.put(name, e.getValue()); } } else { - for (Map.Entry e : allNodeToInfo().entrySet()) { - String name = indexMode.toString(); + for (Map.Entry e : nodeToInfo.entrySet()) { + String name = indexName(indexMode, null); if (e.getValue().cluster != null) { name = e.getValue().cluster + ":" + name; } // We should only end up with one per cluster - result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().snapshot(), e.getValue().version(), null)); + result.put( + name, + new NodeInfo( + e.getValue().cluster, + null, + e.getValue().snapshot(), + e.getValue().version(), + null, + e.getValue().boundAddress() + ) + ); } } return result; } protected TransportVersion minVersion() throws IOException { - return allNodeToInfo().values().stream().map(NodeInfo::version).min(Comparator.naturalOrder()).get(); + return minVersion(allNodeToInfo()); + } + + protected TransportVersion minVersion(Map nodeToInfo) throws IOException { + return nodeToInfo.values().stream().map(NodeInfo::version).min(Comparator.naturalOrder()).get(); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index dea9b2d8da746..fb1e6f28204ff 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -69,7 +69,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. public final ConcurrentMap clusterInfo; - // Is the clusterInfo map iinitialization in progress? If so, we should not try to serialize it. + // Is the clusterInfo map initialization in progress? If so, we should not try to serialize it. private transient volatile boolean clusterInfoInitializing; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 736cb9fdaa5b2..467108587a14b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -36,6 +36,8 @@ import java.util.Objects; import java.util.Optional; +import static org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver.ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION; + public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse implements ChunkedToXContentObject, @@ -275,6 +277,11 @@ public Iterator toXContentChunked(ToXContent.Params params })); content.add(ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params)); content.add(ChunkedToXContentHelper.array("plans", profile.plans.iterator())); + content.add(ChunkedToXContentHelper.chunk((b, p) -> { + TransportVersion minimumVersion = profile.minimumVersion(); + b.field("minimumTransportVersion", minimumVersion == null ? null : minimumVersion.id()); + return b; + })); content.add(ChunkedToXContentHelper.endObject()); } content.add(ChunkedToXContentHelper.endObject()); @@ -383,14 +390,15 @@ public EsqlResponse responseInternal() { return esqlResponse; } - public record Profile(List drivers, List plans) implements Writeable { + public record Profile(List drivers, List plans, TransportVersion minimumVersion) implements Writeable { public static Profile readFrom(StreamInput in) throws IOException { return new Profile( in.readCollectionAsImmutableList(DriverProfile::readFrom), in.getTransportVersion().supports(ESQL_PROFILE_INCLUDE_PLAN) ? in.readCollectionAsImmutableList(PlanProfile::readFrom) - : List.of() + : List.of(), + in.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION) ? readOptionalTransportVersion(in) : null ); } @@ -400,6 +408,27 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().supports(ESQL_PROFILE_INCLUDE_PLAN)) { out.writeCollection(plans); } + if (out.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + // When retrieving the profile from an older node, there might be no minimum version attached. + // When writing the profile somewhere else, we need to handle the case that the minimum version is null. + writeOptionalTransportVersion(minimumVersion, out); + } + } + + private static TransportVersion readOptionalTransportVersion(StreamInput in) throws IOException { + if (in.readBoolean()) { + return TransportVersion.readVersion(in); + } + return null; + } + + private static void writeOptionalTransportVersion(@Nullable TransportVersion version, StreamOutput out) throws IOException { + if (version == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + TransportVersion.writeVersion(version, out); + } } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java index aaef3da09e2fd..6eaae7b9b5f67 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java @@ -18,7 +18,7 @@ import java.io.IOException; public class EsqlResolveFieldsResponse extends ActionResponse { - private static final TransportVersion RESOLVE_FIELDS_RESPONSE_CREATED_TV = TransportVersion.fromName( + public static final TransportVersion RESOLVE_FIELDS_RESPONSE_CREATED_TV = TransportVersion.fromName( "esql_resolve_fields_response_created" ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 0744fd126999d..b57dc1a86d6fd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.enrich; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ChannelActionListener; @@ -75,6 +76,10 @@ public class EnrichPolicyResolver { private static final String RESOLVE_ACTION_NAME = "cluster:monitor/xpack/enrich/esql/resolve_policy"; + public static final TransportVersion ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION = TransportVersion.fromName( + "esql_use_minimum_version_for_enrich_resolution" + ); + private final ClusterService clusterService; private final IndexResolver indexResolver; private final TransportService transportService; @@ -113,9 +118,17 @@ public static UnresolvedPolicy from(Enrich e) { * * @param enriches the unresolved policies * @param executionInfo the execution info + * @param minimumVersion the minimum transport version of all clusters involved in the query; used for making the resolved mapping + * compatible with all involved clusters in case of CCS. + * (Enrich policy resolution happens separately on each remote cluster.) * @param listener notified with the enrich resolution */ - public void resolvePolicies(List enriches, EsqlExecutionInfo executionInfo, ActionListener listener) { + public void resolvePolicies( + List enriches, + EsqlExecutionInfo executionInfo, + TransportVersion minimumVersion, + ActionListener listener + ) { if (enriches.isEmpty()) { listener.onResponse(new EnrichResolution()); return; @@ -125,6 +138,7 @@ public void resolvePolicies(List enriches, EsqlExecutionInfo executionIn executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()), enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), executionInfo, + minimumVersion, listener ); } @@ -133,6 +147,7 @@ protected void doResolvePolicies( Set remoteClusters, Collection unresolvedPolicies, EsqlExecutionInfo executionInfo, + TransportVersion minimumVersion, ActionListener listener ) { if (unresolvedPolicies.isEmpty()) { @@ -141,7 +156,7 @@ protected void doResolvePolicies( } final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, listener.map(lookupResponses -> { + lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, minimumVersion, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); final Map lookupResponsesToProcess = new HashMap<>(); for (Map.Entry entry : lookupResponses.entrySet()) { @@ -305,6 +320,7 @@ private void lookupPolicies( boolean includeLocal, Collection unresolvedPolicies, EsqlExecutionInfo executionInfo, + TransportVersion minimumVersion, ActionListener> listener ) { final Map lookupResponses = ConcurrentCollections.newConcurrentMap(); @@ -324,7 +340,7 @@ public void onResponse(Transport.Connection connection) { transportService.sendRequest( connection, RESOLVE_ACTION_NAME, - new LookupRequest(cluster, remotePolicies), + new LookupRequest(cluster, remotePolicies, minimumVersion), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, skipOnFailure, l)), @@ -350,7 +366,7 @@ public void onFailure(Exception e) { transportService.sendRequest( transportService.getLocalNode(), RESOLVE_ACTION_NAME, - new LookupRequest(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, localPolicies), + new LookupRequest(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, localPolicies, minimumVersion), new ActionListenerResponseHandler<>( refs.acquire(resp -> lookupResponses.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, resp)), LookupResponse::new, @@ -372,21 +388,36 @@ private void failIfSkipUnavailableFalse(Exception e, boolean skipOnFailure, Acti private static class LookupRequest extends AbstractTransportRequest { private final String clusterAlias; private final Collection policyNames; + // The minimum version of all clusters involved in executing the ESQL query. + final TransportVersion minimumVersion; - LookupRequest(String clusterAlias, Collection policyNames) { + LookupRequest(String clusterAlias, Collection policyNames, TransportVersion minimumVersion) { this.clusterAlias = clusterAlias; this.policyNames = policyNames; + this.minimumVersion = minimumVersion; } LookupRequest(StreamInput in) throws IOException { this.clusterAlias = in.readString(); this.policyNames = in.readStringCollectionAsList(); + if (in.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + this.minimumVersion = TransportVersion.readVersion(in); + } else { + // An older coordinator contacted us. Let's assume an old version, otherwise we might send back + // types that it can't deserialize. + // (The only versions that know some new types but don't send their transport version here are 9.2.0-9.2.2. + // These types are dense_vector and aggregate_metric_double, and both don't work with ENRICH in these versions, anyway.) + this.minimumVersion = TransportVersion.minimumCompatible(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterAlias); out.writeStringCollection(policyNames); + if (out.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) { + TransportVersion.writeVersion(minimumVersion, out); + } } } @@ -452,7 +483,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas IndexResolver.ALL_FIELDS, null, false, - // Disable aggregate_metric_double and dense_vector until we get version checks in planning + request.minimumVersion, false, false, refs.acquire(indexResult -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index 90a0f5a3a88fe..ecdccf6aecebe 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.execution; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; @@ -26,6 +27,7 @@ import org.elasticsearch.xpack.esql.session.EsqlSession; import org.elasticsearch.xpack.esql.session.IndexResolver; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import org.elasticsearch.xpack.esql.telemetry.Metrics; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager; @@ -67,17 +69,19 @@ public PlanExecutor( public void esql( EsqlQueryRequest request, String sessionId, + TransportVersion localClusterMinimumVersion, AnalyzerSettings analyzerSettings, EnrichPolicyResolver enrichPolicyResolver, EsqlExecutionInfo executionInfo, IndicesExpressionGrouper indicesExpressionGrouper, EsqlSession.PlanRunner planRunner, TransportActionServices services, - ActionListener listener + ActionListener> listener ) { final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry); final var session = new EsqlSession( sessionId, + localClusterMinimumVersion, analyzerSettings, indexResolver, enrichPolicyResolver, @@ -93,7 +97,7 @@ public void esql( metrics.total(clientId); var begin = System.nanoTime(); - ActionListener executeListener = wrap( + ActionListener> executeListener = wrap( x -> onQuerySuccess(request, listener, x, planTelemetry), ex -> onQueryFailure(request, listener, ex, clientId, planTelemetry, begin) ); @@ -102,7 +106,12 @@ public void esql( ActionListener.run(executeListener, l -> session.execute(request, executionInfo, planRunner, l)); } - private void onQuerySuccess(EsqlQueryRequest request, ActionListener listener, Result x, PlanTelemetry planTelemetry) { + private void onQuerySuccess( + EsqlQueryRequest request, + ActionListener> listener, + Versioned x, + PlanTelemetry planTelemetry + ) { planTelemetryManager.publish(planTelemetry, true); queryLog.onQueryPhase(x, request.query()); listener.onResponse(x); @@ -110,7 +119,7 @@ private void onQuerySuccess(EsqlQueryRequest request, ActionListener lis private void onQueryFailure( EsqlQueryRequest request, - ActionListener listener, + ActionListener> listener, Exception ex, QueryMetric clientId, PlanTelemetry planTelemetry, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index f8d24720231e5..31707c33a126f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.stats.CCSUsage; @@ -55,6 +56,7 @@ import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import java.io.IOException; import java.util.ArrayList; @@ -237,6 +239,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener columns = result.schema().stream().map(c -> { + private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolean profileEnabled, Versioned result) { + var innerResult = result.inner(); + List columns = innerResult.schema().stream().map(c -> { List originalTypes; if (c instanceof UnsupportedAttribute ua) { // Sort the original types so they are easier to test against and prettier. @@ -382,32 +387,36 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolea return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes); }).toList(); EsqlQueryResponse.Profile profile = profileEnabled - ? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles()) + ? new EsqlQueryResponse.Profile( + innerResult.completionInfo().driverProfiles(), + innerResult.completionInfo().planProfiles(), + result.minimumVersion() + ) : null; if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { String asyncExecutionId = asyncTask.getExecutionId().getEncoded(); return new EsqlQueryResponse( columns, - result.pages(), - result.completionInfo().documentsFound(), - result.completionInfo().valuesLoaded(), + innerResult.pages(), + innerResult.completionInfo().documentsFound(), + innerResult.completionInfo().valuesLoaded(), profile, request.columnar(), asyncExecutionId, false, request.async(), - result.executionInfo() + innerResult.executionInfo() ); } return new EsqlQueryResponse( columns, - result.pages(), - result.completionInfo().documentsFound(), - result.completionInfo().valuesLoaded(), + innerResult.pages(), + innerResult.completionInfo().documentsFound(), + innerResult.completionInfo().valuesLoaded(), profile, request.columnar(), request.async(), - result.executionInfo() + innerResult.executionInfo() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java index cd410c8eba804..8722fa8b1c155 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java @@ -16,6 +16,7 @@ import org.elasticsearch.index.SlowLogFields; import org.elasticsearch.xcontent.json.JsonStringEncoder; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -62,12 +63,12 @@ public EsqlQueryLog(ClusterSettings settings, SlowLogFieldProvider slowLogFieldP this.additionalFields = slowLogFieldProvider.create(); } - public void onQueryPhase(Result esqlResult, String query) { - if (esqlResult == null) { + public void onQueryPhase(Versioned esqlResult, String query) { + if (esqlResult.inner() == null) { return; // TODO review, it happens in some tests, not sure if it's a thing also in prod } - long tookInNanos = esqlResult.executionInfo().overallTook().nanos(); - log(() -> Message.of(esqlResult, query, includeUser ? additionalFields.queryFields() : Map.of()), tookInNanos); + long tookInNanos = esqlResult.inner().executionInfo().overallTook().nanos(); + log(() -> Message.of(esqlResult.inner(), query, includeUser ? additionalFields.queryFields() : Map.of()), tookInNanos); } public void onQueryFailure(String query, Exception ex, long tookInNanos) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 35326ed44a826..73c23bab3608f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; @@ -76,9 +77,9 @@ static Map determineUnavailableRemoteClusters( */ abstract static class CssPartialErrorsActionListener implements ActionListener> { private final EsqlExecutionInfo executionInfo; - private final ActionListener listener; + private final ActionListener> listener; - CssPartialErrorsActionListener(EsqlExecutionInfo executionInfo, ActionListener listener) { + CssPartialErrorsActionListener(EsqlExecutionInfo executionInfo, ActionListener> listener) { this.executionInfo = executionInfo; this.listener = listener; } @@ -87,7 +88,12 @@ abstract static class CssPartialErrorsActionListener implements ActionListener( + new Result(Analyzer.NO_FIELDS, Collections.emptyList(), DriverCompletionInfo.EMPTY, executionInfo), + TransportVersion.current() + ) + ); } else { listener.onFailure(e); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a39dda6223ca4..fb9ac5711983a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -118,7 +118,8 @@ public interface PlanRunner { private static final TransportVersion LOOKUP_JOIN_CCS = TransportVersion.fromName("lookup_join_ccs"); private final String sessionId; - private final AnalyzerSettings clusterSettings; + private final TransportVersion localClusterMinimumVersion; + private final AnalyzerSettings analyzerSettings; private final IndexResolver indexResolver; private final EnrichPolicyResolver enrichPolicyResolver; @@ -142,7 +143,8 @@ public interface PlanRunner { public EsqlSession( String sessionId, - AnalyzerSettings clusterSettings, + TransportVersion localClusterMinimumVersion, + AnalyzerSettings analyzerSettings, IndexResolver indexResolver, EnrichPolicyResolver enrichPolicyResolver, PreAnalyzer preAnalyzer, @@ -154,7 +156,8 @@ public EsqlSession( TransportActionServices services ) { this.sessionId = sessionId; - this.clusterSettings = clusterSettings; + this.localClusterMinimumVersion = localClusterMinimumVersion; + this.analyzerSettings = analyzerSettings; this.indexResolver = indexResolver; this.enrichPolicyResolver = enrichPolicyResolver; this.preAnalyzer = preAnalyzer; @@ -178,7 +181,12 @@ public String sessionId() { /** * Execute an ESQL request. */ - public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener listener) { + public void execute( + EsqlQueryRequest request, + EsqlExecutionInfo executionInfo, + PlanRunner planRunner, + ActionListener> listener + ) { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); assert executionInfo != null : "Null EsqlExecutionInfo"; LOGGER.debug("ESQL query:\n{}", request.query()); @@ -190,15 +198,15 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P null, clusterName, request.pragmas(), - clusterSettings.resultTruncationMaxSize(), - clusterSettings.resultTruncationDefaultSize(), + analyzerSettings.resultTruncationMaxSize(), + analyzerSettings.resultTruncationDefaultSize(), request.query(), request.profile(), request.tables(), System.nanoTime(), request.allowPartialResults(), - clusterSettings.timeseriesResultTruncationMaxSize(), - clusterSettings.timeseriesResultTruncationDefaultSize() + analyzerSettings.timeseriesResultTruncationMaxSize(), + analyzerSettings.timeseriesResultTruncationDefaultSize() ); FoldContext foldContext = configuration.newFoldContext(); @@ -246,6 +254,7 @@ public void onResponse(Versioned analyzedPlan) { l ) ) + .>andThen((l, r) -> l.onResponse(new Versioned<>(r, minimumVersion))) .addListener(listener); } } @@ -508,7 +517,11 @@ public void analyzedPlan( assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); - PreAnalysisResult result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false); + // Initialize the PreAnalysisResult with the local cluster's minimum transport version, so our planning will be correct also in + // case of ROW queries. ROW queries can still require inter-node communication (for ENRICH and LOOKUP JOIN execution) with an older + // node in the same cluster; so assuming that all nodes are on the same version as this node will be wrong and may cause bugs. + PreAnalysisResult result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false) + .withMinimumTransportVersion(localClusterMinimumVersion); EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); @@ -527,7 +540,13 @@ public void analyzedPlan( }) .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) .andThen((l, r) -> { - enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); + // Do not update PreAnalysisResult.minimumTransportVersion, that's already been determined during main index resolution. + enrichPolicyResolver.resolvePolicies( + preAnalysis.enriches(), + executionInfo, + r.minimumTransportVersion(), + l.map(r::withEnrichResolution) + ); }) .andThen((l, r) -> { inferenceService.inferenceResolver(functionRegistry).resolveInferenceIds(parsed, l.map(r::withInferenceResolution)); @@ -567,12 +586,18 @@ private void preAnalyzeLookupIndex( ThreadPool.Names.SEARCH_COORDINATION, ThreadPool.Names.SYSTEM_READ ); + // No need to update the minimum transport version in the PreAnalysisResult, + // it should already have been determined during the main index resolution. indexResolver.resolveAsMergedMapping( EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern), result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames, null, false, - // Disable aggregate_metric_double and dense_vector until we get version checks in planning + // We use the minimum version determined in the main index resolution, because for remote LOOKUP JOIN, we're only considering + // remote lookup indices in the field caps request - but the coordinating cluster must be considered, too! + // The main index resolution should already have taken the version of the coordinating cluster into account and this should + // be reflected in result.minimumTransportVersion(). + result.minimumTransportVersion(), false, false, listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) @@ -631,6 +656,7 @@ private PreAnalysisResult receiveLookupIndexResolution( + "] mode" ); } + return result.addLookupIndexResolution(index, lookupIndexResolution); } @@ -774,7 +800,6 @@ private void preAnalyzeMainIndicesAndRetrieveMinTransportVersion( // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))) - .withMinimumTransportVersion(TransportVersion.current()) ); } else { indexResolver.resolveAsMergedMappingAndRetrieveMinimumVersion( @@ -791,6 +816,16 @@ private void preAnalyzeMainIndicesAndRetrieveMinTransportVersion( default -> requestFilter; }, preAnalysis.indexMode() == IndexMode.TIME_SERIES, + // TODO: In case of subqueries, the different main index resolutions don't know about each other's minimum version. + // This is bad because `FROM (FROM remote1:*) (FROM remote2:*)` can have different minimum versions + // while resolving each subquery's main index pattern. We'll determine the correct overall minimum transport version + // in the end because we keep updating the PreAnalysisResult after each resolution; but the EsIndex objects may be + // inconsistent with this version: + // The main index pattern from a subquery that we resolve first may have a higher min version in the field caps response + // than an index pattern that we resolve later. + // Thus, the EsIndex for `FROM remote1:*` may contain data types that aren't supported on the overall minimum version + // if we only find out that the overall version is actually lower when resolving `FROM remote2:*`. + result.minimumTransportVersion(), preAnalysis.useAggregateMetricDoubleWhenNotSupported(), preAnalysis.useDenseVectorWhenNotSupported(), listener.delegateFailureAndWrap((l, indexResolution) -> { @@ -940,7 +975,7 @@ public record PreAnalysisResult( ) { public PreAnalysisResult(Set fieldNames, Set wildcardJoinIndices) { - this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY, null); + this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY, TransportVersion.current()); } PreAnalysisResult withIndices(IndexResolution indices) { @@ -985,6 +1020,12 @@ PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolutio } PreAnalysisResult withMinimumTransportVersion(TransportVersion minimumTransportVersion) { + if (this.minimumTransportVersion != null) { + if (this.minimumTransportVersion.equals(minimumTransportVersion)) { + return this; + } + minimumTransportVersion = TransportVersion.min(this.minimumTransportVersion, minimumTransportVersion); + } return new PreAnalysisResult( fieldNames, wildcardJoinIndices, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index fbba2a0b874c8..449229954e15c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -25,7 +25,6 @@ import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; -import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.DateEsField; @@ -84,13 +83,15 @@ public IndexResolver(Client client) { } /** - * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. + * Like {@code resolveAsMergedMappingAndRetrieveMinimumVersion} + * but does not pass on the determined minimum transport version to the listener. */ public void resolveAsMergedMapping( String indexWildcard, Set fieldNames, QueryBuilder requestFilter, boolean includeAllDimensions, + TransportVersion minimumVersion, boolean useAggregateMetricDoubleWhenNotSupported, boolean useDenseVectorWhenNotSupported, ActionListener listener @@ -104,6 +105,7 @@ public void resolveAsMergedMapping( fieldNames, requestFilter, includeAllDimensions, + minimumVersion, useAggregateMetricDoubleWhenNotSupported, useDenseVectorWhenNotSupported, ignoreVersion @@ -111,15 +113,37 @@ public void resolveAsMergedMapping( } /** - * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. Also retrieves the minimum transport - * version available in the cluster (and remotes). + * Perform a field caps request to resolve a pattern to one mapping (potentially compound, meaning it spans multiple indices). + *

+ * The field caps response contains the minimum transport version of all clusters that apply to the pattern, + * and it is used to deal with previously unsupported data types during resolution. + *

+ * If a field's type is not supported on the minimum version, it will be {@link DataType#UNSUPPORTED}. + *

+ * If the nodes are too old to include their minimum transport version in the field caps response, we'll assume + * {@link TransportVersion#minimumCompatible()}. + *

+ * The {@code minimumVersion} already known so far must be passed in and will be used instead of the minimum version + * from the field caps response if it is lower. During main index resolution, this is the local cluster's minimum version. + * This safeguards against using too new a version in case of {@code FROM remote_only:* | ...} queries that don't have any indices + * on the local cluster. + *

+ * But it's also important for remote {@code ENRICH} resolution, because in CCS enrich policies are resolved on remote clusters, + * so the overall minimum transport version that the coordinating cluster observed must be passed in here to avoid inconsistencies. + *

+ * The overall minimum version is updated using the field caps response and is passed on to the listener. */ public void resolveAsMergedMappingAndRetrieveMinimumVersion( String indexWildcard, Set fieldNames, QueryBuilder requestFilter, boolean includeAllDimensions, + TransportVersion minimumVersion, + // Used for bwc with 9.2.0, which supports aggregate_metric_double but doesn't provide its version in the field + // caps response. We'll just assume the type is supported based on usage in the query to not break compatibility + // with 9.2.0. boolean useAggregateMetricDoubleWhenNotSupported, + // Same as above boolean useDenseVectorWhenNotSupported, ActionListener> listener ) { @@ -127,15 +151,29 @@ public void resolveAsMergedMappingAndRetrieveMinimumVersion( EsqlResolveFieldsAction.TYPE, createFieldCapsRequest(indexWildcard, fieldNames, requestFilter, includeAllDimensions), listener.delegateFailureAndWrap((l, response) -> { + TransportVersion responseMinimumVersion = response.minTransportVersion(); + // Note: Once {@link EsqlResolveFieldsResponse}'s CREATED version is live everywhere + // we can remove this and make sure responseMinimumVersion is non-null. That'll be 10.0-ish. + TransportVersion overallMinimumVersion = responseMinimumVersion == null + ? TransportVersion.minimumCompatible() + : TransportVersion.min(minimumVersion, responseMinimumVersion); + FieldsInfo info = new FieldsInfo( response.caps(), - response.minTransportVersion(), + overallMinimumVersion, Build.current().isSnapshot(), useAggregateMetricDoubleWhenNotSupported, useDenseVectorWhenNotSupported ); - LOGGER.debug("minimum transport version {} {}", response.minTransportVersion(), info.effectiveMinTransportVersion()); - l.onResponse(new Versioned<>(mergedMappings(indexWildcard, info), info.effectiveMinTransportVersion())); + LOGGER.debug( + "previously assumed minimum transport version [{}] updated to effective version [{}]" + + " using field caps response version [{}] for index pattern [{}]", + minimumVersion, + info.minTransportVersion(), + responseMinimumVersion, + indexWildcard + ); + l.onResponse(new Versioned<>(mergedMappings(indexWildcard, info), info.minTransportVersion())); }) ); } @@ -148,6 +186,22 @@ public void resolveAsMergedMappingAndRetrieveMinimumVersion( * is targeting. It doesn't matter if the node is a data node or an ML node or a unicorn, it's transport * version counts. BUT if the query doesn't dispatch to that cluster AT ALL, we don't count the versions * of any nodes in that cluster. + *

+ * If any remote didn't tell us the version we assume + * that it's very, very old. This effectively disables any fields that were created "recently". + * Which is appropriate because those fields are not supported on *almost* all versions that + * don't return the transport version in the response. + *

+ * "Very, very old" above means that there are versions of Elasticsearch that we're wire + * compatible that with that don't support sending the version back. That's anything + * from {@code 8.19.FIRST} to {@code 9.2.0}. "Recently" means any field types we + * added support for after the initial release of ESQL. These fields use + * {@link SupportedVersion#supportedOn} rather than {@link SupportedVersion#SUPPORTED_ON_ALL_NODES}. + * Except for DATE_NANOS. For DATE_NANOS we got lucky/made a mistake. It wasn't widely + * used before ESQL added support for it and we weren't careful about enabling it. So + * queries on mixed version clusters that touch DATE_NANOS will fail. All the types + * added after that, like DENSE_VECTOR, will gracefully disable themselves when talking + * to older nodes. * @param currentBuildIsSnapshot is the current build a snapshot? Note: This is always {@code Build.current().isSnapshot()} in * production but tests need more control * @param useAggregateMetricDoubleWhenNotSupported does the query itself force us to use {@code aggregate_metric_double} fields @@ -166,34 +220,7 @@ public record FieldsInfo( boolean currentBuildIsSnapshot, boolean useAggregateMetricDoubleWhenNotSupported, boolean useDenseVectorWhenNotSupported - ) { - /** - * The {@link #minTransportVersion}, but if any remote didn't tell us the version we assume - * that it's very, very old. This effectively disables any fields that were created "recently". - * Which is appropriate because those fields are not supported on *almost* all versions that - * don't return the transport version in the response. - *

- * "Very, very old" above means that there are versions of Elasticsearch that we're wire - * compatible that with that don't support sending the version back. That's anything - * from {@code 8.19.FIRST} to {@code 9.2.0}. "Recently" means any field types we - * added support for after the initial release of ESQL. These fields use - * {@link SupportedVersion#supportedOn} rather than {@link SupportedVersion#SUPPORTED_ON_ALL_NODES}. - * Except for DATE_NANOS. For DATE_NANOS we got lucky/made a mistake. It wasn't widely - * used before ESQL added support for it and we weren't careful about enabling it. So - * queries on mixed version clusters that touch DATE_NANOS will fail. All the types - * added after that, like DENSE_VECTOR, will gracefully disable themselves when talking - * to older nodes. - *

- *

- * Note: Once {@link EsqlResolveFieldsResponse}'s CREATED version is live everywhere - * we can remove this and make sure {@link #minTransportVersion} is non-null. That'll - * be 10.0-ish. - *

- */ - TransportVersion effectiveMinTransportVersion() { - return minTransportVersion != null ? minTransportVersion : TransportVersion.minimumCompatible(); - } - } + ) {} // public for testing only public static IndexResolution mergedMappings(String indexPattern, FieldsInfo fieldsInfo) { @@ -328,8 +355,7 @@ private static EsField createField( IndexFieldCapabilities first = fcs.get(0); List rest = fcs.subList(1, fcs.size()); DataType type = EsqlDataTypeRegistry.INSTANCE.fromEs(first.type(), first.metricType()); - boolean typeSupported = type.supportedVersion() - .supportedOn(fieldsInfo.effectiveMinTransportVersion(), fieldsInfo.currentBuildIsSnapshot) + boolean typeSupported = type.supportedVersion().supportedOn(fieldsInfo.minTransportVersion(), fieldsInfo.currentBuildIsSnapshot) || switch (type) { case AGGREGATE_METRIC_DOUBLE -> fieldsInfo.useAggregateMetricDoubleWhenNotSupported; case DENSE_VECTOR -> fieldsInfo.useDenseVectorWhenNotSupported; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index b21d11f96cfc0..b59c92a1139b2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -601,6 +601,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { FoldContext foldCtx = FoldContext.small(); EsqlSession session = new EsqlSession( getTestName(), + TransportVersion.current(), queryClusterSettings(), null, null, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index f62d065518bdf..3983679a5291f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; @@ -15,6 +16,7 @@ import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.PlanProfile; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import java.util.List; @@ -26,14 +28,21 @@ protected Writeable.Reader instanceReader() { @Override protected EsqlQueryResponse.Profile createTestInstance() { - return new EsqlQueryResponse.Profile(randomDriverProfiles(), randomPlanProfiles()); + return new EsqlQueryResponse.Profile(randomDriverProfiles(), randomPlanProfiles(), randomMinimumVersion()); } @Override protected EsqlQueryResponse.Profile mutateInstance(EsqlQueryResponse.Profile instance) { - return randomBoolean() - ? new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles), instance.plans()) - : new EsqlQueryResponse.Profile(instance.drivers(), randomValueOtherThan(instance.plans(), this::randomPlanProfiles)); + var drivers = instance.drivers(); + var plans = instance.plans(); + var minimumVersion = instance.minimumVersion(); + + switch (between(0, 2)) { + case 0 -> drivers = randomValueOtherThan(drivers, EsqlQueryResponseProfileTests::randomDriverProfiles); + case 1 -> plans = randomValueOtherThan(plans, EsqlQueryResponseProfileTests::randomPlanProfiles); + case 2 -> minimumVersion = randomValueOtherThan(minimumVersion, EsqlQueryResponseProfileTests::randomMinimumVersion); + } + return new EsqlQueryResponse.Profile(drivers, plans, minimumVersion); } @Override @@ -41,7 +50,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(List.of(AbstractPageMappingOperator.Status.ENTRY)); } - private List randomDriverProfiles() { + private static List randomDriverProfiles() { return randomList( 10, () -> new DriverProfile( @@ -53,20 +62,20 @@ private List randomDriverProfiles() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomList(10, this::randomOperatorStatus), + randomList(10, EsqlQueryResponseProfileTests::randomOperatorStatus), DriverSleeps.empty() ) ); } - private List randomPlanProfiles() { + private static List randomPlanProfiles() { return randomList( 10, () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphanumericOfLength(1024)) ); } - private OperatorStatus randomOperatorStatus() { + private static OperatorStatus randomOperatorStatus() { return new OperatorStatus( randomAlphaOfLength(4), randomBoolean() @@ -79,4 +88,8 @@ private OperatorStatus randomOperatorStatus() { : null ); } + + public static TransportVersion randomMinimumVersion() { + return randomBoolean() ? null : EsqlTestUtils.randomMinimumVersion(); + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 77f73f430868e..fee73a591f431 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -9,11 +9,13 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; @@ -980,6 +982,8 @@ private EsqlQueryResponse simple(boolean columnar, boolean async) { } public void testProfileXContent() { + TransportVersion minimumVersion = EsqlQueryResponseProfileTests.randomMinimumVersion(); + try ( EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), @@ -1001,14 +1005,15 @@ public void testProfileXContent() { DriverSleeps.empty() ) ), - List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree")) + List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree")), + minimumVersion ), false, false, null ); ) { - assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(LoggerMessageFormat.format(""" { "documents_found" : 10, "values_loaded" : 100, @@ -1064,9 +1069,10 @@ public void testProfileXContent() { "node_name" : "node-1", "plan" : "plan tree" } - ] + ], + "minimumTransportVersion" : {} } - }""")); + }""", minimumVersion == null ? "null" : minimumVersion.id()))); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 9cb735b955d09..e846363e9e8c9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -454,7 +454,7 @@ EnrichResolution resolvePolicies(Collection clusters, Collection future = new PlainActionFuture<>(); - super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, future); + super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, TransportVersion.current(), future); return future.actionGet(30, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java index b23517dd14088..38d1b8b185a76 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.logging.ESLogMessage; import org.elasticsearch.common.logging.Loggers; @@ -25,6 +26,7 @@ import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -106,7 +108,10 @@ public void testPrioritiesOnSuccess() { for (int i = 0; i < actualTook.length; i++) { EsqlExecutionInfo warnQuery = getEsqlExecutionInfo(actualTook[i], actualPlanningTook[i]); - queryLog.onQueryPhase(new Result(List.of(), List.of(), DriverCompletionInfo.EMPTY, warnQuery), query); + queryLog.onQueryPhase( + new Versioned<>(new Result(List.of(), List.of(), DriverCompletionInfo.EMPTY, warnQuery), TransportVersion.current()), + query + ); if (expectedLevel[i] != null) { assertThat(appender.lastEvent(), is(not(nullValue()))); var msg = (ESLogMessage) appender.lastMessage(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index bf8434e3c11c5..2d94c764f2c20 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.esql.session.EsqlSession; import org.elasticsearch.xpack.esql.session.IndexResolver; import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.session.Versioned; import org.junit.After; import org.junit.Before; import org.mockito.stubbing.Answer; @@ -85,7 +86,7 @@ EnrichPolicyResolver mockEnrichResolver() { ActionListener listener = (ActionListener) arguments[arguments.length - 1]; listener.onResponse(new EnrichResolution()); return null; - }).when(enrichResolver).resolvePolicies(any(), any(), any()); + }).when(enrichResolver).resolvePolicies(any(), any(), any(), any()); return enrichResolver; } @@ -173,6 +174,7 @@ public void testFailedMetric() { planExecutor.esql( request, randomAlphaOfLength(10), + TransportVersion.current(), queryClusterSettings(), enrichResolver, new EsqlExecutionInfo(randomBoolean()), @@ -181,7 +183,7 @@ public void testFailedMetric() { EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override - public void onResponse(Result result) { + public void onResponse(Versioned result) { fail("this shouldn't happen"); } @@ -203,6 +205,7 @@ public void onFailure(Exception e) { planExecutor.esql( request, randomAlphaOfLength(10), + TransportVersion.current(), queryClusterSettings(), enrichResolver, new EsqlExecutionInfo(randomBoolean()), @@ -211,7 +214,7 @@ public void onFailure(Exception e) { EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override - public void onResponse(Result result) {} + public void onResponse(Versioned result) {} @Override public void onFailure(Exception e) {