diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java index de54e7de13b79..7aa775d03f435 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java @@ -87,6 +87,7 @@ protected Settings nodeSettings() { } static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os")); + static final EnrichPolicy hostPolicyLocal = new EnrichPolicy("match", null, List.of("hosts_local"), "ip", List.of("ip", "os")); static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor")); @Before @@ -115,18 +116,23 @@ public void setupHostsEnrich() { "Windows" ); for (String cluster : allClusters()) { - Client client = client(cluster); - client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get(); - for (Map.Entry h : allHosts.entrySet()) { - client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get(); - } - client.admin().indices().prepareRefresh("hosts").get(); - client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy)) - .actionGet(); - client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) - .actionGet(); - assertAcked(client.admin().indices().prepareDelete("hosts")); + initHostsPolicy(client(cluster), "hosts", hostPolicy, allHosts); + } + // create policy on coordinator only + initHostsPolicy(client(), "hosts_local", hostPolicyLocal, allHosts); + } + + private static void initHostsPolicy(Client client, String indexName, EnrichPolicy policy, Map allHosts) { + client.admin().indices().prepareCreate(indexName).setMapping("ip", "type=ip", "os", "type=keyword").get(); + for (Map.Entry h : allHosts.entrySet()) { + client.prepareIndex(indexName).setSource("ip", h.getKey(), "os", h.getValue()).get(); } + client.admin().indices().prepareRefresh(indexName).get(); + client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, indexName, policy)) + .actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, indexName)) + .actionGet(); + assertAcked(client.admin().indices().prepareDelete(indexName)); } @Before @@ -201,7 +207,7 @@ record Event(long timestamp, String user, String host) { public void wipeEnrichPolicies() { for (String cluster : allClusters()) { cluster(cluster).wipe(Set.of()); - for (String policy : List.of("hosts", "vendors")) { + for (String policy : List.of("hosts", "hosts_local", "vendors")) { if (tolerateErrorsWhenWipingEnrichPolicies()) { try { client(cluster).execute( @@ -226,6 +232,10 @@ static String enrichHosts(Enrich.Mode mode) { return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields()); } + static String enrichHostsLocal(Enrich.Mode mode) { + return EsqlTestUtils.randomEnrichCommand("hosts_local", mode, hostPolicyLocal.getMatchField(), hostPolicyLocal.getEnrichFields()); + } + static String enrichVendors(Enrich.Mode mode) { return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields()); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index 631e3575d2f60..f848c663d3ca6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -110,6 +110,52 @@ public void testWithHostsPolicy() { } } + public void testFromRemotesWithCoordPolicy() { + + String query = "FROM *:events | eval ip= TO_STR(host) | " + + enrichHostsLocal(Enrich.Mode.COORDINATOR) + + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, null)) { + List> rows = getValuesList(resp); + assertThat( + rows, + equalTo( + List.of( + List.of(1L, "Android"), + List.of(2L, "Linux"), + List.of(4L, "MacOS"), + List.of(3L, "Windows"), + List.of(1L, "iOS"), + Arrays.asList(2L, (String) null) + ) + ) + ); + assertTrue(resp.getExecutionInfo().isCrossClusterSearch()); + } + + query = "FROM *:events | eval ip= TO_STR(host) | stats by ip | " + + enrichHostsLocal(Enrich.Mode.COORDINATOR) + + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, null)) { + List> rows = getValuesList(resp); + assertThat( + rows, + equalTo( + List.of( + List.of(1L, "Android"), + List.of(2L, "Linux"), + List.of(2L, "MacOS"), + List.of(2L, "Windows"), + List.of(1L, "iOS"), + Arrays.asList(2L, (String) null) + ) + ) + ); + assertTrue(resp.getExecutionInfo().isCrossClusterSearch()); + } + + } + public void testEnrichHostsAggThenEnrichVendorCoordinator() { Tuple includeCCSMetadata = randomIncludeCCSMetadata(); Boolean requestIncludeMeta = includeCCSMetadata.v1();