Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128111.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128111
summary: Fix union types in CCS
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public static org.elasticsearch.Version remoteClusterVersion() {
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
}

public static org.elasticsearch.Version bwcVersion() {
org.elasticsearch.Version local = localClusterVersion();
org.elasticsearch.Version remote = remoteClusterVersion();
return local.before(remote) ? local : remote;
}

private static Version distributionVersion(String key) {
final String val = System.getProperty(key);
return val != null ? Version.fromString(val) : Version.CURRENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {

private static TestFeatureService remoteFeaturesService;
private static RestClient remoteClusterClient;
private static DataLocation dataLocation = null;

@ParametersFactory(argumentFormatting = "%2$s.%3$s")
public static List<Object[]> readScriptSpec() throws Exception {
Expand Down Expand Up @@ -188,8 +189,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
*/
static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException {
RestClient twoClients = mock(RestClient.class);
assertNotNull("data location was set", dataLocation);
// write to a single cluster for now due to the precision of some functions such as avg and tests related to updates
final RestClient bulkClient = randomFrom(localClient, remoteClient);
final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient);
when(twoClients.performRequest(any())).then(invocation -> {
Request request = invocation.getArgument(0);
String endpoint = request.getEndpoint();
Expand All @@ -214,6 +216,11 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
return twoClients;
}

enum DataLocation {
REMOTE_ONLY,
ANY_CLUSTER
}

static Request[] cloneRequests(Request orig, int numClones) throws IOException {
Request[] clones = new Request[numClones];
for (int i = 0; i < clones.length; i++) {
Expand All @@ -238,26 +245,33 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {
* Convert FROM employees ... => FROM *:employees,employees
*/
static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
if (dataLocation == null) {
dataLocation = randomFrom(DataLocation.values());
}
String query = testCase.query;
String[] commands = query.split("\\|");
String first = commands[0].trim();
if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) {
String[] parts = commands[0].split("(?i)metadata");
assert parts.length >= 1 : parts;
String fromStatement = parts[0];

String[] localIndices = fromStatement.substring("FROM ".length()).split(",");
String remoteIndices = Arrays.stream(localIndices)
.map(index -> "*:" + index.trim() + "," + index.trim())
.collect(Collectors.joining(","));
var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length());
String index = fromStatement.substring("FROM ".length());
if (canUseRemoteIndicesOnly() && randomBoolean()) {
index = remoteIndices(index);
} else {
index = index + "," + remoteIndices(index);
}
var newFrom = "FROM " + index + " " + commands[0].substring(fromStatement.length());
testCase.query = newFrom + query.substring(first.length());
}
if (commands[0].toLowerCase(Locale.ROOT).startsWith("ts ")) {
String[] parts = commands[0].split("\\s+");
assert parts.length >= 2 : commands[0];
String[] indices = parts[1].split(",");
parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(","));
if (canUseRemoteIndicesOnly() && randomBoolean()) {
parts[1] = remoteIndices(parts[1]);
} else {
parts[1] = parts[1] + "," + remoteIndices(parts[1]);
}
String newNewMetrics = String.join(" ", parts);
testCase.query = newNewMetrics + query.substring(first.length());
}
Expand All @@ -274,6 +288,15 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
return testCase;
}

static String remoteIndices(String localIndices) {
String[] parts = localIndices.split(",");
return Arrays.stream(parts).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
}

static boolean canUseRemoteIndicesOnly() {
return dataLocation == DataLocation.REMOTE_ONLY && Clusters.bwcVersion().onOrAfter(Version.V_9_1_0);
}

static boolean hasIndexMetadata(String query) {
String[] commands = query.split("\\|");
if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,4 +991,28 @@ void populateRemoteIndicesFail(String clusterAlias, String indexName, int numSha
remoteClient.admin().indices().prepareRefresh(indexName).get();
}

public void testMultiTypes() throws Exception {
Client remoteClient = client(REMOTE_CLUSTER_1);
int totalDocs = 0;
for (String type : List.of("integer", "long")) {
String index = "conflict-index-" + type;
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
}
remoteClient.admin().indices().prepareRefresh(index).get();
totalDocs += numDocs;
}
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
try (EsqlQueryResponse resp = runQuery(request)) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(1));
assertThat(values.get(0), hasSize(1));
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy
BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference);
MultiTypeEsField unionTypes = findUnionTypes(attr);
if (unionTypes != null) {
String indexName = shardContext.ctx.index().getName();
// Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix.

Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
return conversion == null
? BlockLoader.CONSTANT_NULLS
Expand Down
Loading