Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124913.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124913
summary: Report `original_types`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00);
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_030_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,14 @@ public String nodeString() {
}

protected abstract String label();

/**
* If this field is unsupported this contains the underlying ES types. If there
* is a type conflict this will have many elements, some or all of which may
* be actually supported types.
*/
@Nullable
public List<String> originalTypes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attribute != Field
This information makes sense only in the case of unsupported fields (potentially union types in the future) and thus makes sense on FieldAttribute & co (such as UnsupportedAttribute) alone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean.

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
*/
package org.elasticsearch.xpack.esql.core.type;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
Expand All @@ -23,32 +27,39 @@
*/
public class UnsupportedEsField extends EsField {

private final String originalType;
private final List<String> originalTypes;
private final String inherited; // for fields belonging to parents (or grandparents) that have an unsupported type

public UnsupportedEsField(String name, String originalType) {
this(name, originalType, null, new TreeMap<>());
public UnsupportedEsField(String name, List<String> originalTypes) {
this(name, originalTypes, null, new TreeMap<>());
}

public UnsupportedEsField(String name, String originalType, String inherited, Map<String, EsField> properties) {
public UnsupportedEsField(String name, List<String> originalTypes, String inherited, Map<String, EsField> properties) {
super(name, DataType.UNSUPPORTED, properties, false);
this.originalType = originalType;
this.originalTypes = originalTypes;
this.inherited = inherited;
}

public UnsupportedEsField(StreamInput in) throws IOException {
this(
readCachedStringWithVersionCheck(in),
readCachedStringWithVersionCheck(in),
in.readOptionalString(),
in.readImmutableMap(EsField::readFrom)
);
this(readCachedStringWithVersionCheck(in), readOriginalTypes(in), in.readOptionalString(), in.readImmutableMap(EsField::readFrom));
}

private static List<String> readOriginalTypes(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REPORT_ORIGINAL_TYPES)) {
return in.readCollectionAsList(i -> ((PlanStreamInput) i).readCachedString());
} else {
return List.of(readCachedStringWithVersionCheck(in).split(","));
}
}

@Override
public void writeContent(StreamOutput out) throws IOException {
writeCachedStringWithVersionCheck(out, getName());
writeCachedStringWithVersionCheck(out, getOriginalType());
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REPORT_ORIGINAL_TYPES)) {
out.writeCollection(getOriginalTypes(), (o, s) -> ((PlanStreamOutput) o).writeCachedString(s));
} else {
writeCachedStringWithVersionCheck(out, String.join(",", getOriginalTypes()));
}
out.writeOptionalString(getInherited());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
}
Expand All @@ -57,8 +68,8 @@ public String getWriteableName() {
return "UnsupportedEsField";
}

public String getOriginalType() {
return originalType;
public List<String> getOriginalTypes() {
return originalTypes;
}

public String getInherited() {
Expand All @@ -81,11 +92,11 @@ public boolean equals(Object o) {
return false;
}
UnsupportedEsField that = (UnsupportedEsField) o;
return Objects.equals(originalType, that.originalType) && Objects.equals(inherited, that.inherited);
return Objects.equals(originalTypes, that.originalTypes) && Objects.equals(inherited, that.inherited);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), originalType, inherited);
return Objects.hash(super.hashCode(), originalTypes, inherited);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,13 @@ public void testAliasToInt() throws IOException {
}

public void testFlattenedUnsupported() throws IOException {
assumeOriginalTypesReported();
new Test("flattened").createIndex("test", "flattened");
index("test", """
{"flattened": {"a": "foo"}}""");
Map<String, Object> result = runEsql("FROM test* | LIMIT 2");

assertResultMap(result, List.of(columnInfo("flattened", "unsupported")), List.of(matchesList().item(null)));
assertResultMap(result, List.of(unsupportedColumnInfo("flattened", "flattened")), List.of(matchesList().item(null)));
}

public void testEmptyMapping() throws IOException {
Expand Down Expand Up @@ -685,6 +686,7 @@ public void testByteFieldWithIntSubfieldTooBig() throws IOException {
* </pre>.
*/
public void testIncompatibleTypes() throws IOException {
assumeOriginalTypesReported();
keywordTest().createIndex("test1", "f");
index("test1", """
{"f": "f1"}""");
Expand All @@ -693,7 +695,11 @@ public void testIncompatibleTypes() throws IOException {
{"f": 1}""");

Map<String, Object> result = runEsql("FROM test*");
assertResultMap(result, List.of(columnInfo("f", "unsupported")), List.of(matchesList().item(null), matchesList().item(null)));
assertResultMap(
result,
List.of(unsupportedColumnInfo("f", "keyword", "long")),
List.of(matchesList().item(null), matchesList().item(null))
);
ResponseException e = expectThrows(ResponseException.class, () -> runEsql("FROM test* | SORT f | LIMIT 3"));
String err = EntityUtils.toString(e.getResponse().getEntity());
assertThat(
Expand Down Expand Up @@ -754,10 +760,7 @@ public void testDistinctInEachIndex() throws IOException {
* </pre>.
*/
public void testMergeKeywordAndObject() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
keywordTest().createIndex("test1", "file");
index("test1", """
{"file": "f1"}""");
Expand Down Expand Up @@ -793,7 +796,7 @@ public void testMergeKeywordAndObject() throws IOException {
Map<String, Object> result = runEsql("FROM test* | SORT file.raw | LIMIT 2");
assertResultMap(
result,
List.of(columnInfo("file", "unsupported"), columnInfo("file.raw", "keyword")),
List.of(unsupportedColumnInfo("file", "keyword", "object"), columnInfo("file.raw", "keyword")),
List.of(matchesList().item(null).item("o2"), matchesList().item(null).item(null))
);
}
Expand All @@ -813,6 +816,7 @@ public void testMergeKeywordAndObject() throws IOException {
* </pre>.
*/
public void testPropagateUnsupportedToSubFields() throws IOException {
assumeOriginalTypesReported();
createIndex("test", index -> {
index.startObject("properties");
index.startObject("f");
Expand All @@ -838,7 +842,7 @@ public void testPropagateUnsupportedToSubFields() throws IOException {
Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(
result,
List.of(columnInfo("f", "unsupported"), columnInfo("f.raw", "unsupported")),
List.of(unsupportedColumnInfo("f", "ip_range"), unsupportedColumnInfo("f.raw", "ip_range")),
List.of(matchesList().item(null).item(null))
);
}
Expand All @@ -863,10 +867,7 @@ public void testPropagateUnsupportedToSubFields() throws IOException {
* </pre>.
*/
public void testMergeUnsupportedAndObject() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
createIndex("test1", index -> {
index.startObject("properties");
index.startObject("f").field("type", "ip_range").endObject();
Expand Down Expand Up @@ -901,7 +902,7 @@ public void testMergeUnsupportedAndObject() throws IOException {
Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(
result,
List.of(columnInfo("f", "unsupported"), columnInfo("f.raw", "unsupported")),
List.of(unsupportedColumnInfo("f", "ip_range"), unsupportedColumnInfo("f.raw", "ip_range")),
List.of(matchesList().item(null).item(null), matchesList().item(null).item(null))
);
}
Expand Down Expand Up @@ -954,10 +955,7 @@ public void testIntegerDocValuesConflict() throws IOException {
* In an ideal world we'd promote the {@code integer} to an {@code long} and just go.
*/
public void testLongIntegerConflict() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
longTest().sourceMode(SourceMode.DEFAULT).createIndex("test1", "emp_no");
index("test1", """
{"emp_no": 1}""");
Expand All @@ -976,7 +974,11 @@ public void testLongIntegerConflict() throws IOException {
);

Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(result, List.of(columnInfo("emp_no", "unsupported")), List.of(matchesList().item(null), matchesList().item(null)));
assertResultMap(
result,
List.of(unsupportedColumnInfo("emp_no", "integer", "long")),
List.of(matchesList().item(null), matchesList().item(null))
);
}

/**
Expand All @@ -996,10 +998,7 @@ public void testLongIntegerConflict() throws IOException {
* In an ideal world we'd promote the {@code short} to an {@code integer} and just go.
*/
public void testIntegerShortConflict() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
intTest().sourceMode(SourceMode.DEFAULT).createIndex("test1", "emp_no");
index("test1", """
{"emp_no": 1}""");
Expand All @@ -1018,7 +1017,11 @@ public void testIntegerShortConflict() throws IOException {
);

Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(result, List.of(columnInfo("emp_no", "unsupported")), List.of(matchesList().item(null), matchesList().item(null)));
assertResultMap(
result,
List.of(unsupportedColumnInfo("emp_no", "integer", "short")),
List.of(matchesList().item(null), matchesList().item(null))
);
}

/**
Expand All @@ -1044,10 +1047,7 @@ public void testIntegerShortConflict() throws IOException {
* </pre>.
*/
public void testTypeConflictInObject() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
createIndex("test1", empNoInObject("integer"));
index("test1", """
{"foo": {"emp_no": 1}}""");
Expand All @@ -1056,7 +1056,10 @@ public void testTypeConflictInObject() throws IOException {
{"foo": {"emp_no": "cat"}}""");

Map<String, Object> result = runEsql("FROM test* | LIMIT 3");
assertMap(result, getResultMatcher(result).entry("columns", List.of(columnInfo("foo.emp_no", "unsupported"))).extraOk());
assertMap(
result,
getResultMatcher(result).entry("columns", List.of(unsupportedColumnInfo("foo.emp_no", "integer", "keyword"))).extraOk()
);

ResponseException e = expectThrows(ResponseException.class, () -> runEsql("FROM test* | SORT foo.emp_no | LIMIT 3"));
String err = EntityUtils.toString(e.getResponse().getEntity());
Expand Down Expand Up @@ -1366,6 +1369,12 @@ private void assumeIndexResolverNestedFieldsNameClashFixed() throws IOException
);
}

private void assumeOriginalTypesReported() throws IOException {
var capsName = EsqlCapabilities.Cap.REPORT_ORIGINAL_TYPES.name().toLowerCase(Locale.ROOT);
boolean requiredClusterCapability = clusterHasCapability("POST", "/_query", List.of(), List.of(capsName)).orElse(false);
assumeTrue("This test makes sense for versions that report original types", requiredClusterCapability);
}

private CheckedConsumer<XContentBuilder, IOException> empNoInObject(String empNoType) {
return index -> {
index.startObject("properties");
Expand Down Expand Up @@ -1701,6 +1710,10 @@ private static Map<String, Object> columnInfo(String name, String type) {
return Map.of("name", name, "type", type);
}

private static Map<String, Object> unsupportedColumnInfo(String name, String... originalTypes) {
return Map.of("name", name, "type", "unsupported", "original_types", List.of(originalTypes));
}

private static void index(String name, String... docs) throws IOException {
Request request = new Request("POST", "/" + name + "/_bulk");
request.addParameter("refresh", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -110,7 +111,7 @@ private static void walkMapping(String name, Object value, Map<String, EsField>
field = DateEsField.dateEsField(name, properties, docValues);
} else if (esDataType == UNSUPPORTED) {
String type = content.get("type").toString();
field = new UnsupportedEsField(name, type, null, properties);
field = new UnsupportedEsField(name, List.of(type), null, properties);
propagateUnsupportedType(name, type, properties);
} else {
field = new EsField(name, esDataType, properties, docValues);
Expand Down Expand Up @@ -165,9 +166,9 @@ public static void propagateUnsupportedType(String inherited, String originalTyp
UnsupportedEsField u;
if (field instanceof UnsupportedEsField) {
u = (UnsupportedEsField) field;
u = new UnsupportedEsField(u.getName(), originalType, inherited, u.getProperties());
u = new UnsupportedEsField(u.getName(), List.of(originalType), inherited, u.getProperties());
} else {
u = new UnsupportedEsField(field.getName(), originalType, inherited, field.getProperties());
u = new UnsupportedEsField(field.getName(), List.of(originalType), inherited, field.getProperties());
}
entry.setValue(u);
propagateUnsupportedType(inherited, originalType, u.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testBasicAsyncExecution() throws Exception {
try (var finalResponse = future.get()) {
assertThat(finalResponse, notNullValue());
assertThat(finalResponse.isRunning(), is(false));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(finalResponse).size(), equalTo(1));
}

Expand All @@ -103,7 +103,7 @@ public void testBasicAsyncExecution() throws Exception {
try (var finalResponse = again.get()) {
assertThat(finalResponse, notNullValue());
assertThat(finalResponse.isRunning(), is(false));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(finalResponse).size(), equalTo(1));
}

Expand Down Expand Up @@ -231,7 +231,7 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {

try (var response = request.execute().actionGet(60, TimeUnit.SECONDS)) {
assertThat(response.isRunning(), is(false));
assertThat(response.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(response.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(response).size(), equalTo(1));

if (keepOnCompletion) {
Expand All @@ -244,7 +244,7 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {
try (var resp = future.actionGet(60, TimeUnit.SECONDS)) {
assertThat(resp.asyncExecutionId().get(), equalTo(id));
assertThat(resp.isRunning(), is(false));
assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(resp).size(), equalTo(1));
}
} else {
Expand Down
Loading