Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9204000,9185005
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
min_transport_version,9185004
esql_resolve_fields_response_used,9185005
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_resolve_fields_response_removed_min_tv,9203000
esql_resolve_fields_response_used,9204000
Original file line number Diff line number Diff line change
Expand Up @@ -984,8 +984,13 @@ Builder underConstruction() {
}
}

private static class DataTypesTransportVersions {
public static class DataTypesTransportVersions {

/**
* The first transport version after the PR that introduced geotile/geohash/geohex, resp.
* after 9.1. We didn't require transport versions at that point in time, as geotile/hash/hex require
* using specific functions to even occur in query plans.
*/
public static final TransportVersion INDEX_SOURCE = TransportVersion.fromName("index_source");

public static final TransportVersion ESQL_DENSE_VECTOR_CREATED_VERSION = TransportVersion.fromName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
import static org.elasticsearch.test.ListMatcher.matchesList;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_CREATED_TV;
import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_USED_TV;
import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION;
import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_DENSE_VECTOR_CREATED_VERSION;
import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.INDEX_SOURCE;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -71,8 +74,6 @@
public class AllSupportedFieldsTestCase extends ESRestTestCase {
private static final Logger logger = LogManager.getLogger(FieldExtractorTestCase.class);

private static final TransportVersion INDEX_SOURCE = TransportVersion.fromName("index_source");

@Rule(order = Integer.MIN_VALUE)
public ProfileLogger profileLogger = new ProfileLogger();

Expand Down Expand Up @@ -329,6 +330,76 @@ public final void testFetchDenseVector() throws IOException {
assertMap(indexToRow(columns, values), expectedAllValues);
}

/**
* Tests fetching {@code aggregate_metric_double} if possible. Uses the {@code dense_vector_agg_metric_double_if_fns}
* work around if required.
*/
public final void testFetchAggregateMetricDouble() throws IOException {
Map<String, Object> response;
try {
String request = """
| EVAL strjunk = TO_STRING(f_aggregate_metric_double)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? There's more junk below to trigger the workaround, and this is not kept.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave a comment - yeah, it is.

| KEEP _index, f_aggregate_metric_double
| LIMIT 1000
""";
if (denseVectorAggMetricDoubleIfVersion() == false) {
request = """
| EVAL junk = TO_AGGREGATE_METRIC_DOUBLE(1) // workaround to enable fetching aggregate_metric_double
""" + request;
}
response = esql(request);
if ((Boolean) response.get("is_partial")) {
Map<?, ?> clusters = (Map<?, ?>) response.get("_clusters");
Map<?, ?> details = (Map<?, ?>) clusters.get("details");

boolean foundError = false;
for (Map.Entry<?, ?> cluster : details.entrySet()) {
String failures = cluster.getValue().toString();
if (denseVectorAggMetricDoubleIfFns()) {
throw new AssertionError("should correctly fetch the aggregate_metric_double: " + failures);
}
foundError |= failures.contains("doesn't understand data type [AGGREGATE_METRIC_DOUBLE]");
}
assertTrue("didn't find errors: " + details, foundError);
return;
}
} catch (ResponseException e) {
if (denseVectorAggMetricDoubleIfFns()) {
throw new AssertionError("should correctly fetch the aggregate_metric_double", e);
}
assertThat(
"old version should fail with this error",
EntityUtils.toString(e.getResponse().getEntity()),
anyOf(
containsString("Unknown function [TO_AGGREGATE_METRIC_DOUBLE]"),
containsString("Cannot use field [f_aggregate_metric_double] with unsupported type"),
containsString("doesn't understand data type [AGGREGATE_METRIC_DOUBLE]")
)
);
// Failure is expected and fine
return;
}
List<?> columns = (List<?>) response.get("columns");
List<?> values = (List<?>) response.get("values");

MapMatcher expectedColumns = matchesMap().entry("f_aggregate_metric_double", "aggregate_metric_double").entry("_index", "keyword");
assertMap(nameToType(columns), expectedColumns);

MapMatcher expectedAllValues = matchesMap();
for (Map.Entry<String, NodeInfo> e : expectedIndices().entrySet()) {
String indexName = e.getKey();
NodeInfo nodeInfo = e.getValue();
MapMatcher expectedValues = matchesMap();
expectedValues = expectedValues.entry(
"f_aggregate_metric_double",
"{\"min\":-302.5,\"max\":702.3,\"sum\":200.0,\"value_count\":25}"
);
expectedValues = expectedValues.entry("_index", indexName);
expectedAllValues = expectedAllValues.entry(indexName, expectedValues);
}
assertMap(indexToRow(columns, values), expectedAllValues);
}

private Map<String, Object> esql(String query) throws IOException {
Request request = new Request("POST", "_query");
XContentBuilder body = JsonXContent.contentBuilder().startObject();
Expand Down Expand Up @@ -473,21 +544,15 @@ private Matcher<?> expectedValue(DataType type, NodeInfo nodeInfo) throws IOExce
case GEO_SHAPE -> equalTo("POINT (-71.34 41.12)");
case NULL -> nullValue();
case AGGREGATE_METRIC_DOUBLE -> {
/*
* We need both AGGREGATE_METRIC_DOUBLE_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV
* but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it.
*/
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false
|| minVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) {
yield nullValue();
}
yield equalTo("{\"min\":-302.5,\"max\":702.3,\"sum\":200.0,\"value_count\":25}");
}
case DENSE_VECTOR -> {
/*
* We need both DENSE_VECTOR_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV
* but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it.
*/
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false
|| minVersion().supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) {
yield nullValue();
}
yield equalTo(List.of(0.5, 10.0, 5.9999995));
Expand Down Expand Up @@ -574,15 +639,24 @@ private Matcher<String> expectedType(DataType type) throws IOException {
case BYTE, SHORT -> equalTo("integer");
case HALF_FLOAT, SCALED_FLOAT, FLOAT -> equalTo("double");
case NULL -> equalTo("keyword");
case AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> {
/*
* We need both <type_name>_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV
* but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it.
*/
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
case AGGREGATE_METRIC_DOUBLE -> {
// RESOLVE_FIELDS_RESPONSE_USED_TV is newer and technically sufficient to check.
// We also check for ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION for clarity.
// Future data types added here should only require the TV when they were created,
// because it will be after RESOLVE_FIELDS_RESPONSE_USED_TV.
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false
|| minVersion().supports(ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) == false) {
yield equalTo("unsupported");
}
yield equalTo("aggregate_metric_double");
}
case DENSE_VECTOR -> {
logger.error("ADFDAFAF " + minVersion());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. will zap

if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false
|| minVersion().supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) {
yield equalTo("unsupported");
}
yield equalTo(type.esType());
yield equalTo("dense_vector");
}
default -> equalTo(type.esType());
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,31 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.type.DataType;

import java.io.IOException;

public class EsqlResolveFieldsResponse extends ActionResponse {
public static final TransportVersion RESOLVE_FIELDS_RESPONSE_CREATED_TV = TransportVersion.fromName(
private static final TransportVersion RESOLVE_FIELDS_RESPONSE_CREATED_TV = TransportVersion.fromName(
"esql_resolve_fields_response_created"
);
public static final TransportVersion RESOLVE_FIELDS_RESPONSE_REMOVED_MIN_TV = TransportVersion.fromName(
"esql_resolve_fields_response_removed_min_tv"
);

/**
* Marks when we started using the minimum transport version to determine whether a data type is supported on all nodes.
* This is about the coordinator - data nodes will be able to respond with the correct data as long as they're on the
* transport version required for the respective data types. See {@link DataType#supportedVersion()}.
* <p>
* Note: this is in 9.2.1, but not 9.2.0 - in 9.2.0 we resorted to workarounds to sometimes enable {@link DataType#DENSE_VECTOR} and
* {@link DataType#AGGREGATE_METRIC_DOUBLE}, even though 9.2.0 nodes already support these types.
* <p>
* This means that mixed clusters with a 9.2.1 coordinator and 9.2.0 data nodes will properly support these types,
* but a 9.2.0 coordinator with 9.2.1+ nodes will still require the workaround.
*/
public static final TransportVersion RESOLVE_FIELDS_RESPONSE_USED_TV = TransportVersion.fromName("esql_resolve_fields_response_used");

private final FieldCapabilitiesResponse caps;

public EsqlResolveFieldsResponse(FieldCapabilitiesResponse caps) {
Expand Down