@@ -87,6 +87,7 @@ protected Settings nodeSettings() {
8787 }
8888
8989 static final EnrichPolicy hostPolicy = new EnrichPolicy ("match" , null , List .of ("hosts" ), "ip" , List .of ("ip" , "os" ));
90+ static final EnrichPolicy hostPolicyLocal = new EnrichPolicy ("match" , null , List .of ("hosts_local" ), "ip" , List .of ("ip" , "os" ));
9091 static final EnrichPolicy vendorPolicy = new EnrichPolicy ("match" , null , List .of ("vendors" ), "os" , List .of ("os" , "vendor" ));
9192
9293 @ Before
@@ -115,18 +116,23 @@ public void setupHostsEnrich() {
115116 "Windows"
116117 );
117118 for (String cluster : allClusters ()) {
118- Client client = client (cluster );
119- client .admin ().indices ().prepareCreate ("hosts" ).setMapping ("ip" , "type=ip" , "os" , "type=keyword" ).get ();
120- for (Map .Entry <String , String > h : allHosts .entrySet ()) {
121- client .prepareIndex ("hosts" ).setSource ("ip" , h .getKey (), "os" , h .getValue ()).get ();
122- }
123- client .admin ().indices ().prepareRefresh ("hosts" ).get ();
124- client .execute (PutEnrichPolicyAction .INSTANCE , new PutEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , "hosts" , hostPolicy ))
125- .actionGet ();
126- client .execute (ExecuteEnrichPolicyAction .INSTANCE , new ExecuteEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , "hosts" ))
127- .actionGet ();
128- assertAcked (client .admin ().indices ().prepareDelete ("hosts" ));
119+ initHostsPolicy (client (cluster ), "hosts" , hostPolicy , allHosts );
120+ }
121+ // create policy on coordinator only
122+ initHostsPolicy (client (), "hosts_local" , hostPolicyLocal , allHosts );
123+ }
124+
125+ private static void initHostsPolicy (Client client , String indexName , EnrichPolicy policy , Map <String , String > allHosts ) {
126+ client .admin ().indices ().prepareCreate (indexName ).setMapping ("ip" , "type=ip" , "os" , "type=keyword" ).get ();
127+ for (Map .Entry <String , String > h : allHosts .entrySet ()) {
128+ client .prepareIndex (indexName ).setSource ("ip" , h .getKey (), "os" , h .getValue ()).get ();
129129 }
130+ client .admin ().indices ().prepareRefresh (indexName ).get ();
131+ client .execute (PutEnrichPolicyAction .INSTANCE , new PutEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , indexName , policy ))
132+ .actionGet ();
133+ client .execute (ExecuteEnrichPolicyAction .INSTANCE , new ExecuteEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , indexName ))
134+ .actionGet ();
135+ assertAcked (client .admin ().indices ().prepareDelete (indexName ));
130136 }
131137
132138 @ Before
@@ -201,7 +207,7 @@ record Event(long timestamp, String user, String host) {
201207 public void wipeEnrichPolicies () {
202208 for (String cluster : allClusters ()) {
203209 cluster (cluster ).wipe (Set .of ());
204- for (String policy : List .of ("hosts" , "vendors" )) {
210+ for (String policy : List .of ("hosts" , "hosts_local" , " vendors" )) {
205211 if (tolerateErrorsWhenWipingEnrichPolicies ()) {
206212 try {
207213 client (cluster ).execute (
@@ -226,6 +232,10 @@ static String enrichHosts(Enrich.Mode mode) {
226232 return EsqlTestUtils .randomEnrichCommand ("hosts" , mode , hostPolicy .getMatchField (), hostPolicy .getEnrichFields ());
227233 }
228234
235+ static String enrichHostsLocal (Enrich .Mode mode ) {
236+ return EsqlTestUtils .randomEnrichCommand ("hosts_local" , mode , hostPolicyLocal .getMatchField (), hostPolicyLocal .getEnrichFields ());
237+ }
238+
229239 static String enrichVendors (Enrich .Mode mode ) {
230240 return EsqlTestUtils .randomEnrichCommand ("vendors" , mode , vendorPolicy .getMatchField (), vendorPolicy .getEnrichFields ());
231241 }
0 commit comments