@@ -87,6 +87,7 @@ protected Settings nodeSettings() {
87
87
}
88
88
89
89
static final EnrichPolicy hostPolicy = new EnrichPolicy ("match" , null , List .of ("hosts" ), "ip" , List .of ("ip" , "os" ));
90
+ static final EnrichPolicy hostPolicyLocal = new EnrichPolicy ("match" , null , List .of ("hosts_local" ), "ip" , List .of ("ip" , "os" ));
90
91
static final EnrichPolicy vendorPolicy = new EnrichPolicy ("match" , null , List .of ("vendors" ), "os" , List .of ("os" , "vendor" ));
91
92
92
93
@ Before
@@ -115,18 +116,23 @@ public void setupHostsEnrich() {
115
116
"Windows"
116
117
);
117
118
for (String cluster : allClusters ()) {
118
- Client client = client (cluster );
119
- client .admin ().indices ().prepareCreate ("hosts" ).setMapping ("ip" , "type=ip" , "os" , "type=keyword" ).get ();
120
- for (Map .Entry <String , String > h : allHosts .entrySet ()) {
121
- client .prepareIndex ("hosts" ).setSource ("ip" , h .getKey (), "os" , h .getValue ()).get ();
122
- }
123
- client .admin ().indices ().prepareRefresh ("hosts" ).get ();
124
- client .execute (PutEnrichPolicyAction .INSTANCE , new PutEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , "hosts" , hostPolicy ))
125
- .actionGet ();
126
- client .execute (ExecuteEnrichPolicyAction .INSTANCE , new ExecuteEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , "hosts" ))
127
- .actionGet ();
128
- assertAcked (client .admin ().indices ().prepareDelete ("hosts" ));
119
+ initHostsPolicy (client (cluster ), "hosts" , hostPolicy , allHosts );
120
+ }
121
+ // create policy on coordinator only
122
+ initHostsPolicy (client (), "hosts_local" , hostPolicyLocal , allHosts );
123
+ }
124
+
125
+ private static void initHostsPolicy (Client client , String indexName , EnrichPolicy policy , Map <String , String > allHosts ) {
126
+ client .admin ().indices ().prepareCreate (indexName ).setMapping ("ip" , "type=ip" , "os" , "type=keyword" ).get ();
127
+ for (Map .Entry <String , String > h : allHosts .entrySet ()) {
128
+ client .prepareIndex (indexName ).setSource ("ip" , h .getKey (), "os" , h .getValue ()).get ();
129
129
}
130
+ client .admin ().indices ().prepareRefresh (indexName ).get ();
131
+ client .execute (PutEnrichPolicyAction .INSTANCE , new PutEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , indexName , policy ))
132
+ .actionGet ();
133
+ client .execute (ExecuteEnrichPolicyAction .INSTANCE , new ExecuteEnrichPolicyAction .Request (TEST_REQUEST_TIMEOUT , indexName ))
134
+ .actionGet ();
135
+ assertAcked (client .admin ().indices ().prepareDelete (indexName ));
130
136
}
131
137
132
138
@ Before
@@ -201,7 +207,7 @@ record Event(long timestamp, String user, String host) {
201
207
public void wipeEnrichPolicies () {
202
208
for (String cluster : allClusters ()) {
203
209
cluster (cluster ).wipe (Set .of ());
204
- for (String policy : List .of ("hosts" , "vendors" )) {
210
+ for (String policy : List .of ("hosts" , "hosts_local" , " vendors" )) {
205
211
if (tolerateErrorsWhenWipingEnrichPolicies ()) {
206
212
try {
207
213
client (cluster ).execute (
@@ -226,6 +232,10 @@ static String enrichHosts(Enrich.Mode mode) {
226
232
return EsqlTestUtils .randomEnrichCommand ("hosts" , mode , hostPolicy .getMatchField (), hostPolicy .getEnrichFields ());
227
233
}
228
234
235
+ static String enrichHostsLocal (Enrich .Mode mode ) {
236
+ return EsqlTestUtils .randomEnrichCommand ("hosts_local" , mode , hostPolicyLocal .getMatchField (), hostPolicyLocal .getEnrichFields ());
237
+ }
238
+
229
239
static String enrichVendors (Enrich .Mode mode ) {
230
240
return EsqlTestUtils .randomEnrichCommand ("vendors" , mode , vendorPolicy .getMatchField (), vendorPolicy .getEnrichFields ());
231
241
}
0 commit comments