Skip to content

Commit e3333f5

Browse files
committed
Fix union types in CCS (#128111)
Currently, union types in CCS is broken. For example, FROM *:remote-indices | EVAL port = TO_INT(port) returns all nulls if the types of the port field conflict. This happens because converters are a map of the fully qualified cluster:index -name (defined in MultiTypeEsField), but we are looking up the converter using only the index name, which leads to a wrong or missing converter on remote clusters. Our tests didn't catch this because MultiClusterSpecIT generates the same index for both clusters, allowing the local converter to be used for remote indices.
1 parent c615db2 commit e3333f5

File tree

4 files changed

+37
-1
lines changed

4 files changed

+37
-1
lines changed

docs/changelog/128111.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128111
2+
summary: Fix union types in CCS
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ public static org.elasticsearch.Version remoteClusterVersion() {
5656
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
5757
}
5858

59+
public static org.elasticsearch.Version bwcVersion() {
60+
org.elasticsearch.Version local = localClusterVersion();
61+
org.elasticsearch.Version remote = remoteClusterVersion();
62+
return local.before(remote) ? local : remote;
63+
}
64+
5965
private static Version distributionVersion(String key) {
6066
final String val = System.getProperty(key);
6167
return val != null ? Version.fromString(val) : Version.CURRENT;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,4 +1058,28 @@ private void clearSkipUnavailable() {
10581058
.setPersistentSettings(settingsBuilder.build())
10591059
.get();
10601060
}
1061+
public void testMultiTypes() throws Exception {
1062+
Client remoteClient = client(REMOTE_CLUSTER_1);
1063+
int totalDocs = 0;
1064+
for (String type : List.of("integer", "long")) {
1065+
String index = "conflict-index-" + type;
1066+
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
1067+
int numDocs = between(1, 10);
1068+
for (int i = 0; i < numDocs; i++) {
1069+
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
1070+
}
1071+
remoteClient.admin().indices().prepareRefresh(index).get();
1072+
totalDocs += numDocs;
1073+
}
1074+
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
1075+
EsqlQueryRequest request = new EsqlQueryRequest();
1076+
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
1077+
try (EsqlQueryResponse resp = runQuery(request)) {
1078+
List<List<Object>> values = getValuesList(resp);
1079+
assertThat(values, hasSize(1));
1080+
assertThat(values.get(0), hasSize(1));
1081+
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
1082+
}
1083+
}
1084+
}
10611085
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy
142142
BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference);
143143
MultiTypeEsField unionTypes = findUnionTypes(attr);
144144
if (unionTypes != null) {
145-
String indexName = shardContext.ctx.index().getName();
145+
// Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
146+
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
146147
Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
147148
return conversion == null
148149
? BlockLoader.CONSTANT_NULLS

0 commit comments

Comments
 (0)