Skip to content

Commit 96338a8

Browse files
committed
Merge branch 'update_hkmeanstest' of github.com:john-wagster/elasticsearch into update_hkmeanstest
2 parents 526a969 + 9cd17aa commit 96338a8

File tree

32 files changed

+1313
-176
lines changed

32 files changed

+1313
-176
lines changed

server/src/main/java/org/elasticsearch/cluster/NotMasterException.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,20 @@
99
package org.elasticsearch.cluster;
1010

1111
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
13+
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
1214
import org.elasticsearch.common.io.stream.StreamInput;
1315

1416
import java.io.IOException;
1517

1618
/**
17-
* Exception which indicates that an operation failed because the node stopped being the elected master.
19+
* Exception indicating that a cluster state update operation failed because the node stopped being the elected master.
20+
* Since this exception is thrown prior to the cluster state publication, it should only be used when the cluster state update
21+
* <i>definitely</i> did not happen, and there is no possibility the next master committed the cluster state update.
22+
*
23+
* This is different from {@link FailedToCommitClusterStateException}.
24+
*
25+
* This exception is retryable within {@link TransportMasterNodeAction}.
1826
*/
1927
public class NotMasterException extends ElasticsearchException {
2028

server/src/main/java/org/elasticsearch/cluster/coordination/FailedToCommitClusterStateException.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,20 @@
99
package org.elasticsearch.cluster.coordination;
1010

1111
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
13+
import org.elasticsearch.cluster.NotMasterException;
1214
import org.elasticsearch.common.io.stream.StreamInput;
1315

1416
import java.io.IOException;
1517

1618
/**
1719
* Thrown when a cluster state publication fails to commit the new cluster state. If publication fails then a new master is elected but the
18-
* update might or might not take effect, depending on whether or not the newly-elected master accepted the published state that failed to
19-
* be committed.
20+
* update might or might not take effect, depending on whether the newly-elected master accepted the published state that failed to
21+
* be committed. This exception should only be used when there is <i>ambiguity</i> whether a state update took effect or not.
22+
*
23+
* This is different from {@link NotMasterException} where we know for certain that a state update never took effect.
24+
*
25+
* This exception is retryable within {@link TransportMasterNodeAction}.
2026
*
2127
* See {@link ClusterStatePublisher} for more details.
2228
*/

x-pack/plugin/build.gradle

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
142142
task.skipTest("ml/sparse_vector_search/Search on a sparse_vector field with dots in the field names", "Vectors are no longer returned by default")
143143
task.skipTest("ml/sparse_vector_search/Search on a nested sparse_vector field with dots in the field names and conflicting child fields", "Vectors are no longer returned by default")
144144
task.skipTest("esql/190_lookup_join/lookup-no-key-only-key", "Requires the fix")
145+
task.skipTest("esql/40_tsdb/aggregate_metric_double unsortable", "Extra function required to enable the field type")
146+
task.skipTest("esql/40_tsdb/avg of aggregate_metric_double", "Extra function required to enable the field type")
147+
task.skipTest("esql/40_tsdb/grouping stats on aggregate_metric_double", "Extra function required to enable the field type")
148+
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing min and max", "Extra function required to enable the field type")
149+
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing value", "Extra function required to enable the field type")
150+
task.skipTest("esql/40_tsdb/sorting with aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
151+
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double missing min and max", "Extra function required to enable the field type")
152+
task.skipTest("esql/40_tsdb/to_string aggregate_metric_double", "Extra function required to enable the field type")
153+
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
154+
task.skipTest("esql/46_downsample/MV_EXPAND on non-MV aggregate metric double", "Extra function required to enable the field type")
155+
task.skipTest("esql/46_downsample/Query stats on downsampled index", "Extra function required to enable the field type")
156+
task.skipTest("esql/46_downsample/Render stats from downsampled index", "Extra function required to enable the field type")
157+
task.skipTest("esql/46_downsample/Sort from multiple indices one with aggregate metric double", "Extra function required to enable the field type")
145158
})
146159

147160
tasks.named('yamlRestCompatTest').configure {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.core.type;
9+
10+
import org.elasticsearch.TransportVersion;
11+
12+
/**
13+
* Version that supports a {@link DataType}.
14+
*/
15+
public interface CreatedVersion {
16+
boolean supports(TransportVersion version);
17+
18+
CreatedVersion SUPPORTED_ON_ALL_NODES = new CreatedVersion() {
19+
@Override
20+
public boolean supports(TransportVersion version) {
21+
return true;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return "SupportedOnAllVersions";
27+
}
28+
};
29+
30+
static CreatedVersion supportedOn(TransportVersion createdVersion) {
31+
return new CreatedVersion() {
32+
@Override
33+
public boolean supports(TransportVersion version) {
34+
return version.supports(createdVersion);
35+
}
36+
37+
@Override
38+
public String toString() {
39+
return "SupportedOn[" + createdVersion + "]";
40+
}
41+
};
42+
}
43+
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
package org.elasticsearch.xpack.esql.core.type;
88

99
import org.apache.lucene.util.BytesRef;
10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.common.io.stream.StreamInput;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.io.stream.Writeable;
1214
import org.elasticsearch.common.util.FeatureFlag;
1315
import org.elasticsearch.index.IndexMode;
1416
import org.elasticsearch.index.mapper.SourceFieldMapper;
@@ -32,6 +34,8 @@
3234
import java.util.function.Function;
3335

3436
import static java.util.stream.Collectors.toMap;
37+
import static org.elasticsearch.TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING;
38+
import static org.elasticsearch.TransportVersions.ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION;
3539

3640
/**
3741
* This enum represents data types the ES|QL query processing layer is able to
@@ -140,7 +144,7 @@
140144
* unsupported types.</li>
141145
* </ul>
142146
*/
143-
public enum DataType {
147+
public enum DataType implements Writeable {
144148
/**
145149
* Fields of this type are unsupported by any functions and are always
146150
* rendered as {@code null} in the response.
@@ -306,12 +310,26 @@ public enum DataType {
306310
*/
307311
PARTIAL_AGG(builder().esType("partial_agg").estimatedSize(1024)),
308312

309-
AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").estimatedSize(Double.BYTES * 3 + Integer.BYTES)),
313+
AGGREGATE_METRIC_DOUBLE(
314+
builder().esType("aggregate_metric_double")
315+
.estimatedSize(Double.BYTES * 3 + Integer.BYTES)
316+
.createdVersion(
317+
// Version created just *after* we committed support for aggregate_metric_double
318+
INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING
319+
)
320+
),
310321

311322
/**
312323
* Fields with this type are dense vectors, represented as an array of double values.
313324
*/
314-
DENSE_VECTOR(builder().esType("dense_vector").estimatedSize(4096));
325+
DENSE_VECTOR(
326+
builder().esType("dense_vector")
327+
.estimatedSize(4096)
328+
.createdVersion(
329+
// Version created just *after* we committed support for dense_vector
330+
ML_INFERENCE_SAGEMAKER_CHAT_COMPLETION
331+
)
332+
);
315333

316334
/**
317335
* Types that are actively being built. These types are
@@ -375,6 +393,11 @@ public enum DataType {
375393
*/
376394
private final DataType counter;
377395

396+
/**
397+
* Version that first created this data type.
398+
*/
399+
private final CreatedVersion createdVersion;
400+
378401
DataType(Builder builder) {
379402
String typeString = builder.typeName != null ? builder.typeName : builder.esType;
380403
this.typeName = typeString.toLowerCase(Locale.ROOT);
@@ -387,6 +410,7 @@ public enum DataType {
387410
this.isCounter = builder.isCounter;
388411
this.widenSmallNumeric = builder.widenSmallNumeric;
389412
this.counter = builder.counter;
413+
this.createdVersion = builder.createdVersion;
390414
}
391415

392416
private static final Collection<DataType> TYPES = Arrays.stream(values())
@@ -727,7 +751,20 @@ public DataType counter() {
727751
return counter;
728752
}
729753

754+
@Override
730755
public void writeTo(StreamOutput out) throws IOException {
756+
if (createdVersion.supports(out.getTransportVersion()) == false) {
757+
/*
758+
* TODO when we implement version aware planning flip this to an IllegalStateException
759+
* so we throw a 500 error. It'll be our bug then. Right now it's a sign that the user
760+
* tried to do something like `KNN(dense_vector_field, [1, 2])` against an old node.
761+
* Like, during the rolling upgrade that enables KNN or to a remote cluster that has
762+
* not yet been upgraded.
763+
*/
764+
throw new IllegalArgumentException(
765+
"remote node at version [" + out.getTransportVersion() + "] doesn't understand data type [" + this + "]"
766+
);
767+
}
731768
((PlanStreamOutput) out).writeCachedString(typeName);
732769
}
733770

@@ -779,6 +816,10 @@ public boolean isDate() {
779816
};
780817
}
781818

819+
public CreatedVersion createdVersion() {
820+
return createdVersion;
821+
}
822+
782823
public static DataType suggestedCast(Set<DataType> originalTypes) {
783824
if (originalTypes.isEmpty() || originalTypes.contains(UNSUPPORTED)) {
784825
return null;
@@ -846,6 +887,13 @@ private static class Builder {
846887
*/
847888
private DataType counter;
848889

890+
/**
891+
* The version when this data type was created. We default to the first
892+
* version for which we maintain wire compatibility, which is pretty
893+
* much {@code 8.18.0}.
894+
*/
895+
private CreatedVersion createdVersion = CreatedVersion.SUPPORTED_ON_ALL_NODES;
896+
849897
Builder() {}
850898

851899
Builder esType(String esType) {
@@ -901,5 +949,10 @@ Builder counter(DataType counter) {
901949
this.counter = counter;
902950
return this;
903951
}
952+
953+
Builder createdVersion(TransportVersion createdVersion) {
954+
this.createdVersion = CreatedVersion.supportedOn(createdVersion);
955+
return this;
956+
}
904957
}
905958
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.mixed;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.elasticsearch.index.IndexMode;
13+
import org.elasticsearch.index.mapper.MappedFieldType;
14+
import org.elasticsearch.test.TestClustersThreadFilter;
15+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16+
import org.elasticsearch.xpack.esql.qa.rest.AllSupportedFieldsTestCase;
17+
import org.junit.ClassRule;
18+
19+
/**
20+
* Fetch all field types in a mixed version cluster, simulating a rolling upgrade.
21+
*/
22+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
23+
public class AllSupportedFieldsIT extends AllSupportedFieldsTestCase {
24+
@ClassRule
25+
public static ElasticsearchCluster cluster = Clusters.mixedVersionCluster();
26+
27+
public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPreference, IndexMode indexMode) {
28+
super(extractPreference, indexMode);
29+
}
30+
31+
@Override
32+
protected String getTestRestCluster() {
33+
return cluster.getHttpAddresses();
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.ccq;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.apache.http.HttpHost;
13+
import org.elasticsearch.client.RestClient;
14+
import org.elasticsearch.core.IOUtils;
15+
import org.elasticsearch.index.IndexMode;
16+
import org.elasticsearch.index.mapper.MappedFieldType;
17+
import org.elasticsearch.test.TestClustersThreadFilter;
18+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
19+
import org.elasticsearch.xpack.esql.qa.rest.AllSupportedFieldsTestCase;
20+
import org.junit.AfterClass;
21+
import org.junit.Before;
22+
import org.junit.ClassRule;
23+
import org.junit.rules.RuleChain;
24+
import org.junit.rules.TestRule;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.TreeMap;
30+
31+
/**
32+
* Fetch all field types via cross cluster search, possible on a different version.
33+
*/
34+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
35+
public class AllSupportedFieldsIT extends AllSupportedFieldsTestCase {
36+
static ElasticsearchCluster remoteCluster = Clusters.remoteCluster();
37+
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster);
38+
39+
@ClassRule
40+
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
41+
42+
private static RestClient remoteClient;
43+
private static Map<String, NodeInfo> remoteNodeToInfo;
44+
45+
public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPreference, IndexMode indexMode) {
46+
super(extractPreference, indexMode);
47+
}
48+
49+
@Before
50+
public void createRemoteIndices() throws IOException {
51+
if (supportsNodeAssignment()) {
52+
for (Map.Entry<String, NodeInfo> e : remoteNodeToInfo().entrySet()) {
53+
createIndexForNode(remoteClient(), e.getKey(), e.getValue().id());
54+
}
55+
} else {
56+
createIndexForNode(remoteClient(), null, null);
57+
}
58+
}
59+
60+
private Map<String, NodeInfo> remoteNodeToInfo() throws IOException {
61+
if (remoteNodeToInfo == null) {
62+
remoteNodeToInfo = fetchNodeToInfo(remoteClient(), "remote_cluster");
63+
}
64+
return remoteNodeToInfo;
65+
}
66+
67+
@Override
68+
protected Map<String, NodeInfo> allNodeToInfo() throws IOException {
69+
Map<String, NodeInfo> all = new TreeMap<>();
70+
all.putAll(super.allNodeToInfo());
71+
all.putAll(remoteNodeToInfo());
72+
return all;
73+
}
74+
75+
private RestClient remoteClient() throws IOException {
76+
if (remoteClient == null) {
77+
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
78+
remoteClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
79+
}
80+
return remoteClient;
81+
}
82+
83+
@Override
84+
protected String getTestRestCluster() {
85+
return localCluster.getHttpAddresses();
86+
}
87+
88+
@AfterClass
89+
public static void closeRemoteClient() throws IOException {
90+
try {
91+
IOUtils.close(remoteClient);
92+
} finally {
93+
remoteClient = null;
94+
}
95+
}
96+
97+
@Override
98+
protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException {
99+
return super.fetchDenseVectorAggMetricDoubleIfFns()
100+
&& clusterHasCapability(remoteClient(), "GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_FNS")).orElse(
101+
false
102+
);
103+
}
104+
}

0 commit comments

Comments
 (0)