Skip to content

Commit c2561b5

Browse files
authored
Fix union types in CCS (#128111)
Currently, union types in CCS is broken. For example, FROM *:remote-indices | EVAL port = TO_INT(port) returns all nulls if the types of the port field conflict. This happens because converters are a map of the fully qualified cluster:index -name (defined in MultiTypeEsField), but we are looking up the converter using only the index name, which leads to a wrong or missing converter on remote clusters. Our tests didn't catch this because MultiClusterSpecIT generates the same index for both clusters, allowing the local converter to be used for remote indices.
1 parent 3594ade commit c2561b5

File tree

5 files changed

+67
-7
lines changed

5 files changed

+67
-7
lines changed

docs/changelog/128111.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128111
2+
summary: Fix union types in CCS
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ public static org.elasticsearch.Version remoteClusterVersion() {
6767
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
6868
}
6969

70+
public static org.elasticsearch.Version bwcVersion() {
71+
org.elasticsearch.Version local = localClusterVersion();
72+
org.elasticsearch.Version remote = remoteClusterVersion();
73+
return local.before(remote) ? local : remote;
74+
}
75+
7076
private static Version distributionVersion(String key) {
7177
final String val = System.getProperty(key);
7278
return val != null ? Version.fromString(val) : Version.CURRENT;

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
7676

7777
private static TestFeatureService remoteFeaturesService;
7878
private static RestClient remoteClusterClient;
79+
private static DataLocation dataLocation = null;
7980

8081
@ParametersFactory(argumentFormatting = "%2$s.%3$s")
8182
public static List<Object[]> readScriptSpec() throws Exception {
@@ -188,8 +189,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
188189
*/
189190
static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException {
190191
RestClient twoClients = mock(RestClient.class);
192+
assertNotNull("data location was set", dataLocation);
191193
// write to a single cluster for now due to the precision of some functions such as avg and tests related to updates
192-
final RestClient bulkClient = randomFrom(localClient, remoteClient);
194+
final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient);
193195
when(twoClients.performRequest(any())).then(invocation -> {
194196
Request request = invocation.getArgument(0);
195197
String endpoint = request.getEndpoint();
@@ -214,6 +216,11 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
214216
return twoClients;
215217
}
216218

219+
enum DataLocation {
220+
REMOTE_ONLY,
221+
ANY_CLUSTER
222+
}
223+
217224
static Request[] cloneRequests(Request orig, int numClones) throws IOException {
218225
Request[] clones = new Request[numClones];
219226
for (int i = 0; i < clones.length; i++) {
@@ -238,26 +245,37 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {
238245
* Convert FROM employees ... => FROM *:employees,employees
239246
*/
240247
static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
248+
if (dataLocation == null) {
249+
dataLocation = randomFrom(DataLocation.values());
250+
}
241251
String query = testCase.query;
242252
String[] commands = query.split("\\|");
243253
String first = commands[0].trim();
244254
if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) {
245255
String[] parts = commands[0].split("(?i)metadata");
246256
assert parts.length >= 1 : parts;
247257
String fromStatement = parts[0];
248-
249258
String[] localIndices = fromStatement.substring("FROM ".length()).split(",");
250-
String remoteIndices = Arrays.stream(localIndices)
251-
.map(index -> "*:" + index.trim() + "," + index.trim())
252-
.collect(Collectors.joining(","));
259+
final String remoteIndices;
260+
if (canUseRemoteIndicesOnly() && randomBoolean()) {
261+
remoteIndices = Arrays.stream(localIndices).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
262+
} else {
263+
remoteIndices = Arrays.stream(localIndices)
264+
.map(index -> "*:" + index.trim() + "," + index.trim())
265+
.collect(Collectors.joining(","));
266+
}
253267
var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length());
254268
testCase.query = newFrom + query.substring(first.length());
255269
}
256270
if (commands[0].toLowerCase(Locale.ROOT).startsWith("ts ")) {
257271
String[] parts = commands[0].split("\\s+");
258272
assert parts.length >= 2 : commands[0];
259273
String[] indices = parts[1].split(",");
260-
parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(","));
274+
if (canUseRemoteIndicesOnly() && randomBoolean()) {
275+
parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
276+
} else {
277+
parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim() + "," + index.trim()).collect(Collectors.joining(","));
278+
}
261279
String newNewMetrics = String.join(" ", parts);
262280
testCase.query = newNewMetrics + query.substring(first.length());
263281
}
@@ -274,6 +292,12 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
274292
return testCase;
275293
}
276294

295+
static boolean canUseRemoteIndicesOnly() {
296+
// If the data is indexed only into the remote cluster, we can query only the remote indices.
297+
// However, due to the union types bug in CCS, we must include the local indices in versions without the fix.
298+
return dataLocation == DataLocation.REMOTE_ONLY && Clusters.bwcVersion().onOrAfter(Version.V_9_1_0);
299+
}
300+
277301
static boolean hasIndexMetadata(String query) {
278302
String[] commands = query.split("\\|");
279303
if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,4 +991,28 @@ void populateRemoteIndicesFail(String clusterAlias, String indexName, int numSha
991991
remoteClient.admin().indices().prepareRefresh(indexName).get();
992992
}
993993

994+
public void testMultiTypes() throws Exception {
995+
Client remoteClient = client(REMOTE_CLUSTER_1);
996+
int totalDocs = 0;
997+
for (String type : List.of("integer", "long")) {
998+
String index = "conflict-index-" + type;
999+
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
1000+
int numDocs = between(1, 10);
1001+
for (int i = 0; i < numDocs; i++) {
1002+
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
1003+
}
1004+
remoteClient.admin().indices().prepareRefresh(index).get();
1005+
totalDocs += numDocs;
1006+
}
1007+
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
1008+
EsqlQueryRequest request = new EsqlQueryRequest();
1009+
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
1010+
try (EsqlQueryResponse resp = runQuery(request)) {
1011+
List<List<Object>> values = getValuesList(resp);
1012+
assertThat(values, hasSize(1));
1013+
assertThat(values.get(0), hasSize(1));
1014+
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
1015+
}
1016+
}
1017+
}
9941018
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy
157157
BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference);
158158
MultiTypeEsField unionTypes = findUnionTypes(attr);
159159
if (unionTypes != null) {
160-
String indexName = shardContext.ctx.index().getName();
160+
// Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
161+
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
161162
Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
162163
return conversion == null
163164
? BlockLoader.CONSTANT_NULLS

0 commit comments

Comments
 (0)