Skip to content

Commit 659d7bd

Browse files
dnhatnelasticsearchmachine
andauthored
Fix union types in CCS (#128111) (#128206)
* Fix union types in CCS (#128111) Currently, union types in CCS is broken. For example, FROM *:remote-indices | EVAL port = TO_INT(port) returns all nulls if the types of the port field conflict. This happens because converters are a map of the fully qualified cluster:index -name (defined in MultiTypeEsField), but we are looking up the converter using only the index name, which leads to a wrong or missing converter on remote clusters. Our tests didn't catch this because MultiClusterSpecIT generates the same index for both clusters, allowing the local converter to be used for remote indices. * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent b82430d commit 659d7bd

File tree

4 files changed

+38
-1
lines changed

4 files changed

+38
-1
lines changed

docs/changelog/128111.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128111
2+
summary: Fix union types in CCS
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ public static org.elasticsearch.Version remoteClusterVersion() {
5656
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
5757
}
5858

59+
public static org.elasticsearch.Version bwcVersion() {
60+
org.elasticsearch.Version local = localClusterVersion();
61+
org.elasticsearch.Version remote = remoteClusterVersion();
62+
return local.before(remote) ? local : remote;
63+
}
64+
5965
private static Version distributionVersion(String key) {
6066
final String val = System.getProperty(key);
6167
return val != null ? Version.fromString(val) : Version.CURRENT;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,4 +1058,29 @@ private void clearSkipUnavailable() {
10581058
.setPersistentSettings(settingsBuilder.build())
10591059
.get();
10601060
}
1061+
1062+
public void testMultiTypes() throws Exception {
1063+
Client remoteClient = client(REMOTE_CLUSTER_1);
1064+
int totalDocs = 0;
1065+
for (String type : List.of("integer", "long")) {
1066+
String index = "conflict-index-" + type;
1067+
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
1068+
int numDocs = between(1, 10);
1069+
for (int i = 0; i < numDocs; i++) {
1070+
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
1071+
}
1072+
remoteClient.admin().indices().prepareRefresh(index).get();
1073+
totalDocs += numDocs;
1074+
}
1075+
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
1076+
EsqlQueryRequest request = new EsqlQueryRequest();
1077+
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
1078+
try (EsqlQueryResponse resp = runQuery(request)) {
1079+
List<List<Object>> values = getValuesList(resp);
1080+
assertThat(values, hasSize(1));
1081+
assertThat(values.get(0), hasSize(1));
1082+
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
1083+
}
1084+
}
1085+
}
10611086
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ private BlockLoader getBlockLoaderFor(
136136
DefaultShardContext shardContext = (DefaultShardContext) shardContexts.get(shardId);
137137
BlockLoader blockLoader = shardContext.blockLoader(fieldName, isUnsupported, fieldExtractPreference);
138138
if (unionTypes != null) {
139-
String indexName = shardContext.ctx.index().getName();
139+
// Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
140+
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
140141
Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
141142
return conversion == null
142143
? BlockLoader.CONSTANT_NULLS

0 commit comments

Comments
 (0)