Skip to content

Commit 94c1ba4

Browse files
authored
ESQL: Opt in to support for new aggregate_metric_double and dense_vector using query constructs (elastic#135215)
Works around a problem with ESQL planning around new types. Without this if you have either `aggregate_metric_double` or `dense_vector` fields running on a node running < 9.2.0 ESQL will detect those for queries like `FROM *` and request those fields. The old nodes don't know about the data type. So they'll claim they can't parse the request and fail. That's bad because in pure 9.1.x cluster `FROM *` would have returned `null` for those fields rather than failing. And, come on, `FROM *` shouldn't fail during an upgrade. And it's not *just* during an upgrade. A mixed remote cluster being sent `FROM remote:*` over cross cluster search will have the same problem if it is 9.1 and the coordinating node is 9.2. The fix we'd *like* to do is get the version of all nodes that'll have the request and only enable the type if all versions support it. That's complex, but it easy to think about and explain and handle in the planner. We thought about a different fix - downgrading these fields to `unsupported` on write to an old version - but that's difficult to reason about and quite likely to fail in a long tail of weird ways. This is something a simpler version of the "get all the versions and disable unsupported fields" that uses a cute hack to not have to fetch the version, which is the complex part. Instead, it scrapes the query for the `TS` command or for functions like `KNN` or `TO_AGGREGATE_METRIC_DOUBLE` and enables these fields if those are there. This works because folks who want `dense_vector` will always be using `KNN` in 9.2.0. And folks who want `aggregate_metric_double` will use `TS`. These things will fail if the entire cluster and all remote nodes aren't on 9.2.0 because they don't exist on those versions. So users will tell us, by writing things that require 9.2.0, if all the nodes are on 9.2.0. We'll implement the transport version based approach in a follow up. But, for now, we'll use this trick. Fix elastic#135193
1 parent f115d02 commit 94c1ba4

File tree

26 files changed

+1259
-171
lines changed

26 files changed

+1259
-171
lines changed

x-pack/plugin/build.gradle

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
142142
task.skipTest("ml/sparse_vector_search/Search on a sparse_vector field with dots in the field names", "Vectors are no longer returned by default")
143143
task.skipTest("ml/sparse_vector_search/Search on a nested sparse_vector field with dots in the field names and conflicting child fields", "Vectors are no longer returned by default")
144144
task.skipTest("esql/190_lookup_join/lookup-no-key-only-key", "Requires the fix")
145+
task.skipTest("esql/40_tsdb/aggregate_metric_double unsortable", "Extra function required to enable the field type")
146+
task.skipTest("esql/40_tsdb/avg of aggregate_metric_double", "Extra function required to enable the field type")
147+
task.skipTest("esql/40_tsdb/grouping stats on aggregate_metric_double", "Extra function required to enable the field type")
148+
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing min and max", "Extra function required to enable the field type")
149+
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing value", "Extra function required to enable the field type")
150+
task.skipTest("esql/40_tsdb/sorting with aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
151+
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double missing min and max", "Extra function required to enable the field type")
152+
task.skipTest("esql/40_tsdb/to_string aggregate_metric_double", "Extra function required to enable the field type")
153+
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
154+
task.skipTest("esql/46_downsample/MV_EXPAND on non-MV aggregate metric double", "Extra function required to enable the field type")
155+
task.skipTest("esql/46_downsample/Query stats on downsampled index", "Extra function required to enable the field type")
156+
task.skipTest("esql/46_downsample/Render stats from downsampled index", "Extra function required to enable the field type")
157+
task.skipTest("esql/46_downsample/Sort from multiple indices one with aggregate metric double", "Extra function required to enable the field type")
145158
})
146159

147160
tasks.named('yamlRestCompatTest').configure {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.core.type;
9+
10+
import org.elasticsearch.TransportVersion;
11+
12+
/**
13+
* Version that supports a {@link DataType}.
14+
*/
15+
public interface CreatedVersion {
16+
boolean supports(TransportVersion version);
17+
18+
CreatedVersion SUPPORTED_ON_ALL_NODES = new CreatedVersion() {
19+
@Override
20+
public boolean supports(TransportVersion version) {
21+
return true;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return "SupportedOnAllVersions";
27+
}
28+
};
29+
30+
static CreatedVersion supportedOn(TransportVersion createdVersion) {
31+
return new CreatedVersion() {
32+
@Override
33+
public boolean supports(TransportVersion version) {
34+
return version.supports(createdVersion);
35+
}
36+
37+
@Override
38+
public String toString() {
39+
return "SupportedOn[" + createdVersion + "]";
40+
}
41+
};
42+
}
43+
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
package org.elasticsearch.xpack.esql.core.type;
88

99
import org.apache.lucene.util.BytesRef;
10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.common.io.stream.StreamInput;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.io.stream.Writeable;
1214
import org.elasticsearch.common.util.FeatureFlag;
1315
import org.elasticsearch.index.IndexMode;
1416
import org.elasticsearch.index.mapper.SourceFieldMapper;
@@ -32,6 +34,8 @@
3234
import java.util.function.Function;
3335

3436
import static java.util.stream.Collectors.toMap;
37+
import static org.elasticsearch.TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING;
38+
import static org.elasticsearch.TransportVersions.ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION;
3539

3640
/**
3741
* This enum represents data types the ES|QL query processing layer is able to
@@ -140,7 +144,7 @@
140144
* unsupported types.</li>
141145
* </ul>
142146
*/
143-
public enum DataType {
147+
public enum DataType implements Writeable {
144148
/**
145149
* Fields of this type are unsupported by any functions and are always
146150
* rendered as {@code null} in the response.
@@ -306,12 +310,26 @@ public enum DataType {
306310
*/
307311
PARTIAL_AGG(builder().esType("partial_agg").estimatedSize(1024)),
308312

309-
AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").estimatedSize(Double.BYTES * 3 + Integer.BYTES)),
313+
AGGREGATE_METRIC_DOUBLE(
314+
builder().esType("aggregate_metric_double")
315+
.estimatedSize(Double.BYTES * 3 + Integer.BYTES)
316+
.createdVersion(
317+
// Version created just *after* we committed support for aggregate_metric_double
318+
INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING
319+
)
320+
),
310321

311322
/**
312323
* Fields with this type are dense vectors, represented as an array of double values.
313324
*/
314-
DENSE_VECTOR(builder().esType("dense_vector").estimatedSize(4096));
325+
DENSE_VECTOR(
326+
builder().esType("dense_vector")
327+
.estimatedSize(4096)
328+
.createdVersion(
329+
// Version created just *after* we committed support for dense_vector
330+
ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION
331+
)
332+
);
315333

316334
/**
317335
* Types that are actively being built. These types are
@@ -375,6 +393,11 @@ public enum DataType {
375393
*/
376394
private final DataType counter;
377395

396+
/**
397+
* Version that first created this data type.
398+
*/
399+
private final CreatedVersion createdVersion;
400+
378401
DataType(Builder builder) {
379402
String typeString = builder.typeName != null ? builder.typeName : builder.esType;
380403
this.typeName = typeString.toLowerCase(Locale.ROOT);
@@ -387,6 +410,7 @@ public enum DataType {
387410
this.isCounter = builder.isCounter;
388411
this.widenSmallNumeric = builder.widenSmallNumeric;
389412
this.counter = builder.counter;
413+
this.createdVersion = builder.createdVersion;
390414
}
391415

392416
private static final Collection<DataType> TYPES = Arrays.stream(values())
@@ -727,7 +751,20 @@ public DataType counter() {
727751
return counter;
728752
}
729753

754+
@Override
730755
public void writeTo(StreamOutput out) throws IOException {
756+
if (createdVersion.supports(out.getTransportVersion()) == false) {
757+
/*
758+
* TODO when we implement version aware planning flip this to an IllegalStateException
759+
* so we throw a 500 error. It'll be our bug then. Right now it's a sign that the user
760+
* tried to do something like `KNN(dense_vector_field, [1, 2])` against an old node.
761+
* Like, during the rolling upgrade that enables KNN or to a remote cluster that has
762+
* not yet been upgraded.
763+
*/
764+
throw new IllegalArgumentException(
765+
"remote node at version [" + out.getTransportVersion() + "] doesn't understand data type [" + this + "]"
766+
);
767+
}
731768
((PlanStreamOutput) out).writeCachedString(typeName);
732769
}
733770

@@ -779,6 +816,10 @@ public boolean isDate() {
779816
};
780817
}
781818

819+
public CreatedVersion createdVersion() {
820+
return createdVersion;
821+
}
822+
782823
public static DataType suggestedCast(Set<DataType> originalTypes) {
783824
if (originalTypes.isEmpty() || originalTypes.contains(UNSUPPORTED)) {
784825
return null;
@@ -846,6 +887,13 @@ private static class Builder {
846887
*/
847888
private DataType counter;
848889

890+
/**
891+
* The version when this data type was created. We default to the first
892+
* version for which we maintain wire compatibility, which is pretty
893+
* much {@code 8.18.0}.
894+
*/
895+
private CreatedVersion createdVersion = CreatedVersion.SUPPORTED_ON_ALL_NODES;
896+
849897
Builder() {}
850898

851899
Builder esType(String esType) {
@@ -901,5 +949,10 @@ Builder counter(DataType counter) {
901949
this.counter = counter;
902950
return this;
903951
}
952+
953+
Builder createdVersion(TransportVersion createdVersion) {
954+
this.createdVersion = CreatedVersion.supportedOn(createdVersion);
955+
return this;
956+
}
904957
}
905958
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.mixed;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.elasticsearch.index.IndexMode;
13+
import org.elasticsearch.index.mapper.MappedFieldType;
14+
import org.elasticsearch.test.TestClustersThreadFilter;
15+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16+
import org.elasticsearch.xpack.esql.qa.rest.AllSupportedFieldsTestCase;
17+
import org.junit.ClassRule;
18+
19+
/**
20+
* Fetch all field types in a mixed version cluster, simulating a rolling upgrade.
21+
*/
22+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
23+
public class AllSupportedFieldsIT extends AllSupportedFieldsTestCase {
24+
@ClassRule
25+
public static ElasticsearchCluster cluster = Clusters.mixedVersionCluster();
26+
27+
public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPreference, IndexMode indexMode) {
28+
super(extractPreference, indexMode);
29+
}
30+
31+
@Override
32+
protected String getTestRestCluster() {
33+
return cluster.getHttpAddresses();
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.ccq;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.apache.http.HttpHost;
13+
import org.elasticsearch.client.RestClient;
14+
import org.elasticsearch.core.IOUtils;
15+
import org.elasticsearch.index.IndexMode;
16+
import org.elasticsearch.index.mapper.MappedFieldType;
17+
import org.elasticsearch.test.TestClustersThreadFilter;
18+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
19+
import org.elasticsearch.xpack.esql.qa.rest.AllSupportedFieldsTestCase;
20+
import org.junit.AfterClass;
21+
import org.junit.Before;
22+
import org.junit.ClassRule;
23+
import org.junit.rules.RuleChain;
24+
import org.junit.rules.TestRule;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.TreeMap;
30+
31+
/**
32+
* Fetch all field types via cross cluster search, possible on a different version.
33+
*/
34+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
35+
public class AllSupportedFieldsIT extends AllSupportedFieldsTestCase {
36+
static ElasticsearchCluster remoteCluster = Clusters.remoteCluster();
37+
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster);
38+
39+
@ClassRule
40+
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
41+
42+
private static RestClient remoteClient;
43+
private static Map<String, NodeInfo> remoteNodeToInfo;
44+
45+
public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPreference, IndexMode indexMode) {
46+
super(extractPreference, indexMode);
47+
}
48+
49+
@Before
50+
public void createRemoteIndices() throws IOException {
51+
if (supportsNodeAssignment()) {
52+
for (Map.Entry<String, NodeInfo> e : remoteNodeToInfo().entrySet()) {
53+
createIndexForNode(remoteClient(), e.getKey(), e.getValue().id());
54+
}
55+
} else {
56+
createIndexForNode(remoteClient(), null, null);
57+
}
58+
}
59+
60+
private Map<String, NodeInfo> remoteNodeToInfo() throws IOException {
61+
if (remoteNodeToInfo == null) {
62+
remoteNodeToInfo = fetchNodeToInfo(remoteClient(), "remote_cluster");
63+
}
64+
return remoteNodeToInfo;
65+
}
66+
67+
@Override
68+
protected Map<String, NodeInfo> allNodeToInfo() throws IOException {
69+
Map<String, NodeInfo> all = new TreeMap<>();
70+
all.putAll(super.allNodeToInfo());
71+
all.putAll(remoteNodeToInfo());
72+
return all;
73+
}
74+
75+
private RestClient remoteClient() throws IOException {
76+
if (remoteClient == null) {
77+
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
78+
remoteClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
79+
}
80+
return remoteClient;
81+
}
82+
83+
@Override
84+
protected String getTestRestCluster() {
85+
return localCluster.getHttpAddresses();
86+
}
87+
88+
@AfterClass
89+
public static void closeRemoteClient() throws IOException {
90+
try {
91+
IOUtils.close(remoteClient);
92+
} finally {
93+
remoteClient = null;
94+
}
95+
}
96+
97+
@Override
98+
protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException {
99+
return super.fetchDenseVectorAggMetricDoubleIfFns()
100+
&& clusterHasCapability(remoteClient(), "GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_FNS")).orElse(
101+
false
102+
);
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.single_node;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.elasticsearch.index.IndexMode;
13+
import org.elasticsearch.index.mapper.MappedFieldType;
14+
import org.elasticsearch.test.TestClustersThreadFilter;
15+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16+
import org.elasticsearch.xpack.esql.qa.rest.AllSupportedFieldsTestCase;
17+
import org.junit.ClassRule;
18+
19+
/**
20+
* Simple test for fetching all supported field types.
21+
*/
22+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
23+
public class AllSupportedFieldsIT extends AllSupportedFieldsTestCase {
24+
@ClassRule
25+
public static ElasticsearchCluster cluster = Clusters.testCluster(c -> {});
26+
27+
public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPreference, IndexMode indexMode) {
28+
super(extractPreference, indexMode);
29+
}
30+
31+
@Override
32+
protected String getTestRestCluster() {
33+
return cluster.getHttpAddresses();
34+
}
35+
}

0 commit comments

Comments
 (0)