Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,6 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("ml/sparse_vector_search/Search on a sparse_vector field with dots in the field names", "Vectors are no longer returned by default")
task.skipTest("ml/sparse_vector_search/Search on a nested sparse_vector field with dots in the field names and conflicting child fields", "Vectors are no longer returned by default")
task.skipTest("esql/190_lookup_join/lookup-no-key-only-key", "Requires the fix")
task.skipTest("esql/40_tsdb/aggregate_metric_double unsortable", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/avg of aggregate_metric_double", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/grouping stats on aggregate_metric_double", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing min and max", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing value", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/sorting with aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double missing min and max", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/to_string aggregate_metric_double", "Extra function required to enable the field type")
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
task.skipTest("esql/46_downsample/MV_EXPAND on non-MV aggregate metric double", "Extra function required to enable the field type")
task.skipTest("esql/46_downsample/Query stats on downsampled index", "Extra function required to enable the field type")
task.skipTest("esql/46_downsample/Render stats from downsampled index", "Extra function required to enable the field type")
task.skipTest("esql/46_downsample/Sort from multiple indices one with aggregate metric double", "Extra function required to enable the field type")
})

tasks.named('yamlRestCompatTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.core.type;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -744,7 +745,7 @@ public DataType counter() {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (supportedVersion.supports(out.getTransportVersion()) == false) {
if (supportedVersion.supports(out.getTransportVersion(), Build.current().isSnapshot()) == false) {
/*
* TODO when we implement version aware planning flip this to an IllegalStateException
* so we throw a 500 error. It'll be our bug then. Right now it's a sign that the user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import org.elasticsearch.TransportVersion;

public interface SupportedVersion {
boolean supports(TransportVersion version);
boolean supports(TransportVersion version, boolean currentBuildIsSnapshot);

default boolean supportedLocally() {
return supports(TransportVersion.current());
return supports(TransportVersion.current(), Build.current().isSnapshot());
}

SupportedVersion SUPPORTED_ON_ALL_NODES = new SupportedVersion() {
@Override
public boolean supports(TransportVersion version) {
public boolean supports(TransportVersion version, boolean currentBuildIsSnapshot) {
return true;
}

Expand Down Expand Up @@ -56,8 +56,8 @@ public String toString() {
// Check usage of this constant to be sure.
SupportedVersion UNDER_CONSTRUCTION = new SupportedVersion() {
@Override
public boolean supports(TransportVersion version) {
return Build.current().isSnapshot();
public boolean supports(TransportVersion version, boolean currentBuildIsSnapshot) {
return currentBuildIsSnapshot;
}

@Override
Expand All @@ -76,8 +76,8 @@ public String toString() {
static SupportedVersion supportedOn(TransportVersion supportedVersion) {
return new SupportedVersion() {
@Override
public boolean supports(TransportVersion version) {
return version.supports(supportedVersion) || Build.current().isSnapshot();
public boolean supports(TransportVersion version, boolean currentBuildIsSnapshot) {
return version.supports(supportedVersion) || currentBuildIsSnapshot;
}

@Override
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change effectively adds support to agg metric double and dense vector fields in lookup indices and enrich policies.

Let's add a test case for using all kinds of types as lookup/enrich fields, right? (Not match fields, we have separate tests for that.) In a follow-up that is, this should be fine to merge without.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -43,6 +44,7 @@
import static org.elasticsearch.test.ListMatcher.matchesList;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_CREATED_TV;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -76,11 +78,6 @@ public class AllSupportedFieldsTestCase extends ESRestTestCase {

@ParametersFactory(argumentFormatting = "pref=%s mode=%s")
public static List<Object[]> args() {
if (Build.current().isSnapshot()) {
// We only test behavior in release builds. Snapshot builds will have data types enabled that are still under construction.
return List.of();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this.


List<Object[]> args = new ArrayList<>();
for (MappedFieldType.FieldExtractPreference extractPreference : Arrays.asList(
null,
Expand All @@ -102,7 +99,7 @@ protected AllSupportedFieldsTestCase(MappedFieldType.FieldExtractPreference extr
this.indexMode = indexMode;
}

protected record NodeInfo(String cluster, String id, TransportVersion version, Set<String> roles) {}
protected record NodeInfo(String cluster, String id, boolean snapshot, TransportVersion version, Set<String> roles) {}

private static Map<String, NodeInfo> nodeToInfo;

Expand All @@ -126,6 +123,19 @@ protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException {
return clusterHasCapability("GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_FNS")).orElse(false);
}

private static Boolean denseVectorAggMetricDoubleIfVersion;

private boolean denseVectorAggMetricDoubleIfVersion() throws IOException {
if (denseVectorAggMetricDoubleIfVersion == null) {
denseVectorAggMetricDoubleIfVersion = fetchDenseVectorAggMetricDoubleIfVersion();
}
return denseVectorAggMetricDoubleIfVersion;
}

protected boolean fetchDenseVectorAggMetricDoubleIfVersion() throws IOException {
return clusterHasCapability("GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_VERSION")).orElse(false);
}

private static Boolean supportsNodeAssignment;

protected boolean supportsNodeAssignment() throws IOException {
Expand Down Expand Up @@ -154,10 +164,12 @@ protected static Map<String, NodeInfo> fetchNodeToInfo(RestClient client, String
Map<?, ?> nodeInfo = (Map<?, ?>) n.getValue();
String nodeName = (String) extractValue(nodeInfo, "name");
TransportVersion transportVersion = TransportVersion.fromId((Integer) extractValue(nodeInfo, "transport_version"));
String version = (String) extractValue(nodeInfo, "version");
boolean snapshot = nodeInfo.toString().contains(version + "-SNAPSHOT");
List<?> roles = (List<?>) nodeInfo.get("roles");
nodeToInfo.put(
nodeName,
new NodeInfo(cluster, id, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet()))
new NodeInfo(cluster, id, snapshot, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet()))
);
}

Expand All @@ -175,6 +187,14 @@ public void createIndices() throws IOException {
}
}

@Before
public void onlyOnSnapshot() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name seems to suggest the opposite of the actual method body?

Suggested change
public void onlyOnSnapshot() throws IOException {
public void onlyOnProductionBuilds() throws IOException {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! I thought that running tests in release mode will use release builds of all bwc versions as well. Is that not the case?

Can we add a comment to explain this? Also, should we double check our release tests whether they always should use release builds of bwc versions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment, yeah. If you run:

./gradlew -p x-pack/plugin/esql/qa/server/multi-clusters/ clean v9.1.5#newToOld -Dbuild.snapshot=false -Dlicense.key="x-pack/license-tools/src/test/resources/public.key" -Dtests.jvm.argline="-Dbuild.snapshot=false" -Dtests.class='*All*'

while 9.1.5 is the tip of the 9.1 branch then we'll compile 9.1 and run it as a snapshot.

I'll ask around if this is intended.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll ask around if this is intended.

Well, not "intended", but not something we're likely to fix right now.

assumeFalse("Only supported on production builds", Build.current().isSnapshot());
for (NodeInfo n : allNodeToInfo().values()) {
assumeFalse("Only supported on production builds", n.snapshot());
}
}

// TODO: Also add a test for _tsid once we can determine the minimum transport version of all nodes.
public final void testFetchAll() throws IOException {
Map<String, Object> response = esql("""
Expand Down Expand Up @@ -212,7 +232,7 @@ public final void testFetchAll() throws IOException {
if (supportedInIndex(type) == false) {
continue;
}
expectedValues = expectedValues.entry(fieldName(type), expectedValue(type));
expectedValues = expectedValues.entry(fieldName(type), expectedValue(type, nodeInfo));
}
expectedValues = expectedValues.entry("_id", any(String.class))
.entry("_ignored", nullValue())
Expand All @@ -227,15 +247,23 @@ public final void testFetchAll() throws IOException {
profileLogger.clearProfile();
}

// Tests a workaround and will become obsolete once we can determine the actual minimum transport version of all nodes.
/**
* Tests fetching {@code dense_vector} if possible. Uses the {@code dense_vector_agg_metric_double_if_fns}
* work around if required.
*/
public final void testFetchDenseVector() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a similar test for aggregate metric double, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Map<String, Object> response;
try {
response = esql("""
| EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector
String request = """
| KEEP _index, f_dense_vector
| LIMIT 1000
""");
""";
if (denseVectorAggMetricDoubleIfVersion() == false) {
request = """
| EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector
""" + request;
}
response = esql(request);
if ((Boolean) response.get("is_partial")) {
Map<?, ?> clusters = (Map<?, ?>) response.get("_clusters");
Map<?, ?> details = (Map<?, ?>) clusters.get("details");
Expand Down Expand Up @@ -410,7 +438,7 @@ private void createAllTypesDoc(RestClient client, String indexName) throws IOExc
}

// This will become dependent on the minimum transport version of all nodes once we can determine that.
private Matcher<?> expectedValue(DataType type) {
private Matcher<?> expectedValue(DataType type, NodeInfo nodeInfo) throws IOException {
return switch (type) {
case BOOLEAN -> equalTo(true);
case COUNTER_LONG, LONG, COUNTER_INTEGER, INTEGER, UNSIGNED_LONG, SHORT, BYTE -> equalTo(1);
Expand All @@ -429,14 +457,16 @@ private Matcher<?> expectedValue(DataType type) {
case GEO_SHAPE -> equalTo("POINT (-71.34 41.12)");
case NULL -> nullValue();
case AGGREGATE_METRIC_DOUBLE -> {
// Currently, we cannot tell if all nodes support it or not so we treat it as unsupported.
// TODO: Fix this once we know the node versions.
yield nullValue();
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
yield nullValue();
}
yield equalTo("{\"min\":-302.5,\"max\":702.3,\"sum\":200.0,\"value_count\":25}");
}
case DENSE_VECTOR -> {
// Currently, we cannot tell if all nodes support it or not so we treat it as unsupported.
// TODO: Fix this once we know the node versions.
yield nullValue();
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
yield nullValue();
}
yield equalTo(List.of(0.5, 10.0, 5.9999995));
}
default -> throw new AssertionError("unsupported field type [" + type + "]");
};
Expand Down Expand Up @@ -507,7 +537,7 @@ private Map<String, Object> nameToValue(List<String> names, List<?> values) {
}

// This will become dependent on the minimum transport version of all nodes once we can determine that.
private Matcher<String> expectedType(DataType type) {
private Matcher<String> expectedType(DataType type) throws IOException {
return switch (type) {
case COUNTER_DOUBLE, COUNTER_LONG, COUNTER_INTEGER -> {
if (indexMode == IndexMode.TIME_SERIES) {
Expand All @@ -518,10 +548,12 @@ private Matcher<String> expectedType(DataType type) {
case BYTE, SHORT -> equalTo("integer");
case HALF_FLOAT, SCALED_FLOAT, FLOAT -> equalTo("double");
case NULL -> equalTo("keyword");
// Currently unsupported without TS command or KNN function
case AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR ->
// TODO: Fix this once we know the node versions.
equalTo("unsupported");
case AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> {
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
yield equalTo("unsupported");
}
yield equalTo(type.esType());
}
default -> equalTo(type.esType());
};
}
Expand Down Expand Up @@ -555,9 +587,13 @@ private Map<String, NodeInfo> expectedIndices() throws IOException {
name = e.getValue().cluster + ":" + name;
}
// We should only end up with one per cluster
result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().version(), null));
result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().snapshot(), e.getValue().version(), null));
}
}
return result;
}

protected TransportVersion minVersion() throws IOException {
return allNodeToInfo().values().stream().map(NodeInfo::version).min(Comparator.naturalOrder()).get();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
retrieveBitVectorData
required_capability: dense_vector_field_type_released
required_capability: dense_vector_agg_metric_double_if_version
required_capability: l2_norm_vector_similarity_function

FROM dense_vector
| EVAL k = v_l2_norm(bit_vector, [1,2]) // workaround to enable fetching dense_vector
| KEEP id, bit_vector
| SORT id
;
Expand All @@ -16,11 +15,11 @@ id:l | bit_vector:dense_vector
;

denseBitVectorWithEval
required_capability: dense_vector_field_type_released
required_capability: dense_vector_agg_metric_double_if_version
required_capability: l2_norm_vector_similarity_function

FROM dense_vector
| EVAL v = bit_vector, k = v_l2_norm(bit_vector, [1,2]) // workaround to enable fetching dense_vector
| EVAL v = bit_vector
| KEEP id, v
| SORT id
;
Expand All @@ -33,14 +32,13 @@ id:l | v:dense_vector
;

denseBitVectorWithRenameAndDrop
required_capability: dense_vector_field_type_released
required_capability: dense_vector_agg_metric_double_if_version
required_capability: l2_norm_vector_similarity_function

FROM dense_vector
| EVAL v = bit_vector
| EVAL k = v_l2_norm(bit_vector, [1,2]) // workaround to enable fetching dense_vector
| RENAME v AS new_vector
| DROP float_vector, byte_vector, bit_vector, k
| DROP float_vector, byte_vector, bit_vector
| SORT id
;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
retrieveByteVectorData
required_capability: dense_vector_field_type_released
required_capability: dense_vector_agg_metric_double_if_version
required_capability: l2_norm_vector_similarity_function

FROM dense_vector
| EVAL k = v_l2_norm(byte_vector, [1,2,3]) // workaround to enable fetching dense_vector
| KEEP id, byte_vector
| SORT id
;
Expand All @@ -16,12 +15,11 @@ id:l | byte_vector:dense_vector
;

denseByteVectorWithEval
required_capability: dense_vector_field_type_released
required_capability: dense_vector_agg_metric_double_if_version
required_capability: l2_norm_vector_similarity_function

FROM dense_vector
| EVAL v = byte_vector
| EVAL k = v_l2_norm(byte_vector, [1,2,3]) // workaround to enable fetching dense_vector
| KEEP id, v
| SORT id
;
Expand All @@ -34,14 +32,13 @@ id:l | v:dense_vector
;

denseByteVectorWithRenameAndDrop
required_capability: dense_vector_field_type_released
required_capability: dense_vector_agg_metric_double_if_version
required_capability: l2_norm_vector_similarity_function

FROM dense_vector
| EVAL v = byte_vector
| EVAL k = v_l2_norm(byte_vector, [1,2,3]) // workaround to enable fetching dense_vector
| RENAME v AS new_vector
| DROP float_vector, byte_vector, bit_vector, k
| DROP float_vector, byte_vector, bit_vector
| SORT id
;

Expand Down
Loading