|
50 | 50 | import org.elasticsearch.test.InternalTestCluster;
|
51 | 51 | import org.elasticsearch.test.NodeRoles;
|
52 | 52 | import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
|
| 53 | +import org.elasticsearch.test.transport.MockTransportService; |
53 | 54 | import org.elasticsearch.transport.TransportActionProxy;
|
54 | 55 | import org.elasticsearch.transport.TransportService;
|
55 | 56 | import org.elasticsearch.xcontent.XContentParser;
|
|
62 | 63 | import java.util.Map;
|
63 | 64 | import java.util.concurrent.CountDownLatch;
|
64 | 65 | import java.util.concurrent.TimeUnit;
|
| 66 | +import java.util.concurrent.atomic.AtomicInteger; |
65 | 67 | import java.util.concurrent.atomic.AtomicReference;
|
66 | 68 | import java.util.stream.Collectors;
|
67 | 69 |
|
@@ -446,6 +448,64 @@ public void testLookupFields() throws Exception {
|
446 | 448 | assertThat(hit2.field("to").getValues(), contains(Map.of("name", List.of("Local B")), Map.of("name", List.of("Local C"))));
|
447 | 449 | });
|
448 | 450 | }
|
| 451 | + // Search locally, but lookup fields on remote clusters |
| 452 | + { |
| 453 | + final String remoteLookupFields = """ |
| 454 | + { |
| 455 | + "from": { |
| 456 | + "type": "lookup", |
| 457 | + "target_index": "cluster_a:users", |
| 458 | + "input_field": "from_user", |
| 459 | + "target_field": "_id", |
| 460 | + "fetch_fields": ["name"] |
| 461 | + }, |
| 462 | + "to": { |
| 463 | + "type": "lookup", |
| 464 | + "target_index": "cluster_a:users", |
| 465 | + "input_field": "to_user", |
| 466 | + "target_field": "_id", |
| 467 | + "fetch_fields": ["name"] |
| 468 | + } |
| 469 | + } |
| 470 | + """; |
| 471 | + final Map<String, Object> remoteRuntimeMappings; |
| 472 | + try (XContentParser parser = createParser(JsonXContent.jsonXContent, remoteLookupFields)) { |
| 473 | + remoteRuntimeMappings = parser.map(); |
| 474 | + } |
| 475 | + AtomicInteger searchSearchRequests = new AtomicInteger(0); |
| 476 | + for (TransportService ts : cluster("cluster_a").getInstances(TransportService.class)) { |
| 477 | + MockTransportService transportService = (MockTransportService) ts; |
| 478 | + transportService.addRequestHandlingBehavior(TransportSearchShardsAction.NAME, (handler, request, channel, task) -> { |
| 479 | + handler.messageReceived(request, channel, task); |
| 480 | + searchSearchRequests.incrementAndGet(); |
| 481 | + }); |
| 482 | + } |
| 483 | + for (boolean ccsMinimizeRoundtrips : List.of(true, false)) { |
| 484 | + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c")) |
| 485 | + .runtimeMappings(remoteRuntimeMappings) |
| 486 | + .sort(new FieldSortBuilder("duration")) |
| 487 | + .fetchField("from") |
| 488 | + .fetchField("to"); |
| 489 | + SearchRequest request = new SearchRequest("local_calls").source(searchSourceBuilder); |
| 490 | + request.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips); |
| 491 | + assertResponse(client().search(request), response -> { |
| 492 | + assertHitCount(response, 1); |
| 493 | + SearchHit hit = response.getHits().getHits()[0]; |
| 494 | + assertThat(hit.getIndex(), equalTo("local_calls")); |
| 495 | + assertThat(hit.field("from").getValues(), contains(Map.of("name", List.of("Remote A")))); |
| 496 | + assertThat( |
| 497 | + hit.field("to").getValues(), |
| 498 | + contains(Map.of("name", List.of("Remote B")), Map.of("name", List.of("Remote C"))) |
| 499 | + ); |
| 500 | + }); |
| 501 | + if (ccsMinimizeRoundtrips) { |
| 502 | + assertThat(searchSearchRequests.get(), equalTo(0)); |
| 503 | + } else { |
| 504 | + assertThat(searchSearchRequests.get(), greaterThan(0)); |
| 505 | + searchSearchRequests.set(0); |
| 506 | + } |
| 507 | + } |
| 508 | + } |
449 | 509 | }
|
450 | 510 |
|
451 | 511 | @Override
|
|
0 commit comments