|
12 | 12 | import org.elasticsearch.common.settings.Settings; |
13 | 13 | import org.elasticsearch.test.junit.annotations.TestLogging; |
14 | 14 | import org.elasticsearch.xpack.esql.VerificationException; |
| 15 | +import org.elasticsearch.xpack.esql.core.type.DataType; |
15 | 16 |
|
16 | 17 | import java.io.IOException; |
17 | 18 | import java.util.List; |
@@ -72,17 +73,43 @@ public void testLookupJoinAcrossClusters() throws IOException { |
72 | 73 |
|
73 | 74 | populateLookupIndex(LOCAL_CLUSTER, "values_lookup2", 5); |
74 | 75 | populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup2", 5); |
75 | | - // FIXME: this currently does not work |
76 | 76 | try ( |
77 | 77 | EsqlQueryResponse resp = runQuery( |
78 | 78 | "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key " |
79 | | - + "| LOOKUP JOIN values_lookup2 ON lookup_tag", |
| 79 | + + "| LOOKUP JOIN values_lookup2 ON lookup_key", |
80 | 80 | randomBoolean() |
81 | 81 | ) |
82 | 82 | ) { |
83 | 83 | List<List<Object>> values = getValuesList(resp); |
84 | 84 | assertThat(values, hasSize(20)); |
| 85 | + } |
85 | 86 |
|
| 87 | + try ( |
| 88 | + EsqlQueryResponse resp = runQuery( |
| 89 | + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key " |
| 90 | + + "| STATS c = count(*) BY lookup_name | SORT c", |
| 91 | + randomBoolean() |
| 92 | + ) |
| 93 | + ) { |
| 94 | + List<List<Object>> values = getValuesList(resp); |
| 95 | + // 0-9 + null + 16 |
| 96 | + assertThat(values, hasSize(12)); |
| 97 | + for (var row : values) { |
| 98 | + if (row.get(1) == null) { |
| 99 | + assertThat((Long) row.get(0), equalTo(5L)); // null |
| 100 | + } else { |
| 101 | + assertThat((String) row.get(1), containsString("lookup_")); |
| 102 | + if (row.get(1).equals("lookup_0") |
| 103 | + || row.get(1).equals("lookup_1") |
| 104 | + || row.get(1).equals("lookup_4") |
| 105 | + || row.get(1).equals("lookup_9")) { |
| 106 | + // squares |
| 107 | + assertThat((Long) row.get(0), equalTo(2L)); |
| 108 | + } else { |
| 109 | + assertThat((Long) row.get(0), equalTo(1L)); |
| 110 | + } |
| 111 | + } |
| 112 | + } |
86 | 113 | } |
87 | 114 | } |
88 | 115 |
|
@@ -292,6 +319,40 @@ public void testLookupJoinIndexMode() throws IOException { |
292 | 319 | ); |
293 | 320 | } |
294 | 321 |
|
| 322 | + public void testLookupJoinFieldTypes() throws IOException { |
| 323 | + setupClusters(2); |
| 324 | + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); |
| 325 | + populateLookupIndexKeyword(REMOTE_CLUSTER_1, "values_lookup", 10); |
| 326 | + |
| 327 | + setSkipUnavailable(REMOTE_CLUSTER_1, true); |
| 328 | + var ex = expectThrows( |
| 329 | + VerificationException.class, |
| 330 | + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) |
| 331 | + ); |
| 332 | + assertThat( |
| 333 | + ex.getMessage(), |
| 334 | + containsString( |
| 335 | + "Cannot use field [lookup_key] due to ambiguities being mapped as [2] incompatible types:" |
| 336 | + + " [keyword] in [cluster-a:values_lookup], [long] in [values_lookup]" |
| 337 | + ) |
| 338 | + ); |
| 339 | + |
| 340 | + try ( |
| 341 | + EsqlQueryResponse resp = runQuery( |
| 342 | + "FROM logs-*,c*:logs-* | EVAL lookup_name = v::keyword | LOOKUP JOIN values_lookup ON lookup_name", |
| 343 | + randomBoolean() |
| 344 | + ) |
| 345 | + ) { |
| 346 | + var columns = resp.columns(); |
| 347 | + assertThat(columns, hasSize(9)); |
| 348 | + var keyColumn = columns.stream().filter(c -> c.name().equals("lookup_key")).findFirst(); |
| 349 | + assertTrue(keyColumn.isPresent()); |
| 350 | + assertThat(keyColumn.get().type(), equalTo(DataType.UNSUPPORTED)); |
| 351 | + assertThat(keyColumn.get().originalTypes(), hasItems("keyword", "long")); |
| 352 | + } |
| 353 | + |
| 354 | + } |
| 355 | + |
295 | 356 | protected Map<String, Object> setupClustersAndLookups() throws IOException { |
296 | 357 | var setupData = setupClusters(2); |
297 | 358 | populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); |
@@ -325,6 +386,32 @@ protected void populateLookupIndex(String clusterAlias, String indexName, int nu |
325 | 386 | client.admin().indices().prepareRefresh(indexName).get(); |
326 | 387 | } |
327 | 388 |
|
| 389 | + protected void populateLookupIndexKeyword(String clusterAlias, String indexName, int numDocs) { |
| 390 | + Client client = client(clusterAlias); |
| 391 | + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; |
| 392 | + String field_tag = Strings.isEmpty(clusterAlias) ? "local_tag" : "remote_tag"; |
| 393 | + assertAcked( |
| 394 | + client.admin() |
| 395 | + .indices() |
| 396 | + .prepareCreate(indexName) |
| 397 | + .setSettings(Settings.builder().put("index.mode", "lookup")) |
| 398 | + .setMapping( |
| 399 | + "lookup_key", |
| 400 | + "type=keyword", |
| 401 | + "lookup_name", |
| 402 | + "type=keyword", |
| 403 | + "lookup_tag", |
| 404 | + "type=keyword", |
| 405 | + field_tag, |
| 406 | + "type=keyword" |
| 407 | + ) |
| 408 | + ); |
| 409 | + for (int i = 0; i < numDocs; i++) { |
| 410 | + client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag, field_tag, i).get(); |
| 411 | + } |
| 412 | + client.admin().indices().prepareRefresh(indexName).get(); |
| 413 | + } |
| 414 | + |
328 | 415 | private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { |
329 | 416 | assertNotNull(executionInfo); |
330 | 417 | assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); |
|
0 commit comments