-
Notifications
You must be signed in to change notification settings - Fork 25.7k
ESQL: Enable new data types with created version #136327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
57b63c2
3b5c4e2
ac6efb9
ad8b9b9
fb665c1
8b22742
a7d19df
5016b73
8e2d031
47590a1
0527749
e9b20c8
dd895a1
b86a096
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 136327 | ||
| summary: Enable new data types with created version | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Comparator; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
|
|
@@ -43,6 +44,7 @@ | |
| import static org.elasticsearch.test.ListMatcher.matchesList; | ||
| import static org.elasticsearch.test.MapMatcher.assertMap; | ||
| import static org.elasticsearch.test.MapMatcher.matchesMap; | ||
| import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_CREATED_TV; | ||
| import static org.hamcrest.Matchers.any; | ||
| import static org.hamcrest.Matchers.anyOf; | ||
| import static org.hamcrest.Matchers.containsString; | ||
|
|
@@ -76,11 +78,6 @@ public class AllSupportedFieldsTestCase extends ESRestTestCase { | |
|
|
||
| @ParametersFactory(argumentFormatting = "pref=%s mode=%s") | ||
| public static List<Object[]> args() { | ||
| if (Build.current().isSnapshot()) { | ||
| // We only test behavior in release builds. Snapshot builds will have data types enabled that are still under construction. | ||
| return List.of(); | ||
| } | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this. |
||
|
|
||
| List<Object[]> args = new ArrayList<>(); | ||
| for (MappedFieldType.FieldExtractPreference extractPreference : Arrays.asList( | ||
| null, | ||
|
|
@@ -102,7 +99,7 @@ protected AllSupportedFieldsTestCase(MappedFieldType.FieldExtractPreference extr | |
| this.indexMode = indexMode; | ||
| } | ||
|
|
||
| protected record NodeInfo(String cluster, String id, TransportVersion version, Set<String> roles) {} | ||
| protected record NodeInfo(String cluster, String id, boolean snapshot, TransportVersion version, Set<String> roles) {} | ||
|
|
||
| private static Map<String, NodeInfo> nodeToInfo; | ||
|
|
||
|
|
@@ -126,6 +123,19 @@ protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException { | |
| return clusterHasCapability("GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_FNS")).orElse(false); | ||
| } | ||
|
|
||
| private static Boolean denseVectorAggMetricDoubleIfVersion; | ||
|
|
||
| private boolean denseVectorAggMetricDoubleIfVersion() throws IOException { | ||
| if (denseVectorAggMetricDoubleIfVersion == null) { | ||
| denseVectorAggMetricDoubleIfVersion = fetchDenseVectorAggMetricDoubleIfVersion(); | ||
| } | ||
| return denseVectorAggMetricDoubleIfVersion; | ||
| } | ||
|
|
||
| protected boolean fetchDenseVectorAggMetricDoubleIfVersion() throws IOException { | ||
| return clusterHasCapability("GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_VERSION")).orElse(false); | ||
| } | ||
|
|
||
| private static Boolean supportsNodeAssignment; | ||
|
|
||
| protected boolean supportsNodeAssignment() throws IOException { | ||
|
|
@@ -153,11 +163,21 @@ protected static Map<String, NodeInfo> fetchNodeToInfo(RestClient client, String | |
| String id = (String) n.getKey(); | ||
| Map<?, ?> nodeInfo = (Map<?, ?>) n.getValue(); | ||
| String nodeName = (String) extractValue(nodeInfo, "name"); | ||
|
|
||
| /* | ||
| * Figuring out is a node is a snapshot is kind of tricky. The main version | ||
| * doesn't include -SNAPSHOT. But ${VERSION}-SNAPSHOT is in the node info | ||
| * *somewhere*. So we do this silly toString here. | ||
| */ | ||
| String version = (String) extractValue(nodeInfo, "version"); | ||
| boolean snapshot = nodeInfo.toString().contains(version + "-SNAPSHOT"); | ||
|
|
||
| TransportVersion transportVersion = TransportVersion.fromId((Integer) extractValue(nodeInfo, "transport_version")); | ||
| List<?> roles = (List<?>) nodeInfo.get("roles"); | ||
|
|
||
| nodeToInfo.put( | ||
| nodeName, | ||
| new NodeInfo(cluster, id, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet())) | ||
| new NodeInfo(cluster, id, snapshot, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet())) | ||
| ); | ||
| } | ||
|
|
||
|
|
@@ -175,6 +195,22 @@ public void createIndices() throws IOException { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Make sure the test doesn't run on snapshot builds. Release builds only. | ||
| * <p> | ||
| * {@link Build#isSnapshot()} checks if the version under test is a snapshot. | ||
| * But! This run test runs against many versions and if *any* are snapshots | ||
| * then this will fail. So we check the versions of each node in the cluster too. | ||
| * </p> | ||
| */ | ||
| @Before | ||
| public void skipSnapshots() throws IOException { | ||
| assumeFalse("Only supported on production builds", Build.current().isSnapshot()); | ||
| for (NodeInfo n : allNodeToInfo().values()) { | ||
| assumeFalse("Only supported on production builds", n.snapshot()); | ||
| } | ||
| } | ||
|
|
||
| // TODO: Also add a test for _tsid once we can determine the minimum transport version of all nodes. | ||
| public final void testFetchAll() throws IOException { | ||
| Map<String, Object> response = esql(""" | ||
|
|
@@ -212,7 +248,7 @@ public final void testFetchAll() throws IOException { | |
| if (supportedInIndex(type) == false) { | ||
| continue; | ||
| } | ||
| expectedValues = expectedValues.entry(fieldName(type), expectedValue(type)); | ||
| expectedValues = expectedValues.entry(fieldName(type), expectedValue(type, nodeInfo)); | ||
| } | ||
| expectedValues = expectedValues.entry("_id", any(String.class)) | ||
| .entry("_ignored", nullValue()) | ||
|
|
@@ -227,15 +263,23 @@ public final void testFetchAll() throws IOException { | |
| profileLogger.clearProfile(); | ||
| } | ||
|
|
||
| // Tests a workaround and will become obsolete once we can determine the actual minimum transport version of all nodes. | ||
| /** | ||
| * Tests fetching {@code dense_vector} if possible. Uses the {@code dense_vector_agg_metric_double_if_fns} | ||
| * work around if required. | ||
| */ | ||
| public final void testFetchDenseVector() throws IOException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could use a similar test for aggregate metric double, no?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| Map<String, Object> response; | ||
| try { | ||
| response = esql(""" | ||
| | EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector | ||
| String request = """ | ||
| | KEEP _index, f_dense_vector | ||
| | LIMIT 1000 | ||
| """); | ||
| """; | ||
| if (denseVectorAggMetricDoubleIfVersion() == false) { | ||
| request = """ | ||
| | EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector | ||
| """ + request; | ||
| } | ||
| response = esql(request); | ||
| if ((Boolean) response.get("is_partial")) { | ||
| Map<?, ?> clusters = (Map<?, ?>) response.get("_clusters"); | ||
| Map<?, ?> details = (Map<?, ?>) clusters.get("details"); | ||
|
|
@@ -410,7 +454,7 @@ private void createAllTypesDoc(RestClient client, String indexName) throws IOExc | |
| } | ||
|
|
||
| // This will become dependent on the minimum transport version of all nodes once we can determine that. | ||
| private Matcher<?> expectedValue(DataType type) { | ||
| private Matcher<?> expectedValue(DataType type, NodeInfo nodeInfo) throws IOException { | ||
| return switch (type) { | ||
| case BOOLEAN -> equalTo(true); | ||
| case COUNTER_LONG, LONG, COUNTER_INTEGER, INTEGER, UNSIGNED_LONG, SHORT, BYTE -> equalTo(1); | ||
|
|
@@ -429,14 +473,24 @@ private Matcher<?> expectedValue(DataType type) { | |
| case GEO_SHAPE -> equalTo("POINT (-71.34 41.12)"); | ||
| case NULL -> nullValue(); | ||
| case AGGREGATE_METRIC_DOUBLE -> { | ||
| // Currently, we cannot tell if all nodes support it or not so we treat it as unsupported. | ||
| // TODO: Fix this once we know the node versions. | ||
| yield nullValue(); | ||
| /* | ||
| * We need both AGGREGATE_METRIC_DOUBLE_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV | ||
| * but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it. | ||
| */ | ||
| if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) { | ||
nik9000 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| yield nullValue(); | ||
| } | ||
| yield equalTo("{\"min\":-302.5,\"max\":702.3,\"sum\":200.0,\"value_count\":25}"); | ||
| } | ||
| case DENSE_VECTOR -> { | ||
| // Currently, we cannot tell if all nodes support it or not so we treat it as unsupported. | ||
| // TODO: Fix this once we know the node versions. | ||
| yield nullValue(); | ||
| /* | ||
| * We need both DENSE_VECTOR_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV | ||
| * but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it. | ||
| */ | ||
| if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) { | ||
| yield nullValue(); | ||
| } | ||
| yield equalTo(List.of(0.5, 10.0, 5.9999995)); | ||
| } | ||
| default -> throw new AssertionError("unsupported field type [" + type + "]"); | ||
| }; | ||
|
|
@@ -507,7 +561,7 @@ private Map<String, Object> nameToValue(List<String> names, List<?> values) { | |
| } | ||
|
|
||
| // This will become dependent on the minimum transport version of all nodes once we can determine that. | ||
| private Matcher<String> expectedType(DataType type) { | ||
| private Matcher<String> expectedType(DataType type) throws IOException { | ||
| return switch (type) { | ||
| case COUNTER_DOUBLE, COUNTER_LONG, COUNTER_INTEGER -> { | ||
| if (indexMode == IndexMode.TIME_SERIES) { | ||
|
|
@@ -518,10 +572,16 @@ private Matcher<String> expectedType(DataType type) { | |
| case BYTE, SHORT -> equalTo("integer"); | ||
| case HALF_FLOAT, SCALED_FLOAT, FLOAT -> equalTo("double"); | ||
| case NULL -> equalTo("keyword"); | ||
| // Currently unsupported without TS command or KNN function | ||
| case AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> | ||
| // TODO: Fix this once we know the node versions. | ||
| equalTo("unsupported"); | ||
| case AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> { | ||
| /* | ||
| * We need both <type_name>_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV | ||
| * but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it. | ||
| */ | ||
| if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) { | ||
nik9000 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| yield equalTo("unsupported"); | ||
| } | ||
| yield equalTo(type.esType()); | ||
| } | ||
| default -> equalTo(type.esType()); | ||
| }; | ||
| } | ||
|
|
@@ -555,9 +615,13 @@ private Map<String, NodeInfo> expectedIndices() throws IOException { | |
| name = e.getValue().cluster + ":" + name; | ||
| } | ||
| // We should only end up with one per cluster | ||
| result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().version(), null)); | ||
| result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().snapshot(), e.getValue().version(), null)); | ||
| } | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| protected TransportVersion minVersion() throws IOException { | ||
| return allNodeToInfo().values().stream().map(NodeInfo::version).min(Comparator.naturalOrder()).get(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change effectively adds support to agg metric double and dense vector fields in lookup indices and enrich policies.
Let's add a test case for using all kinds of types as lookup/enrich fields, right? (Not match fields, we have separate tests for that.) In a follow-up that is, this should be fine to merge without.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍