1111import org .elasticsearch .search .SearchHit ;
1212import org .elasticsearch .search .sort .SortOrder ;
1313import org .elasticsearch .xpack .core .ml .action .GetRecordsAction ;
14+ import org .elasticsearch .xpack .core .ml .action .PutFilterAction ;
1415import org .elasticsearch .xpack .core .ml .action .UpdateFilterAction ;
1516import org .elasticsearch .xpack .core .ml .annotations .Annotation ;
1617import org .elasticsearch .xpack .core .ml .job .config .AnalysisConfig ;
@@ -98,7 +99,7 @@ public void testCondition() throws Exception {
9899
99100 // push the data for the first half buckets
100101 postData (job .getId (), joinBetween (0 , data .size () / 2 , data ));
101- closeJob (job .getId ());
102+ flushJob (job .getId (), true );
102103
103104 List <AnomalyRecord > records = getRecords (job .getId ());
104105 // remove records that are not anomalies
@@ -116,18 +117,35 @@ public void testCondition() throws Exception {
116117 JobUpdate .Builder update = new JobUpdate .Builder (job .getId ());
117118 update .setDetectorUpdates (Arrays .asList (new JobUpdate .DetectorUpdate (0 , null , Arrays .asList (newRule ))));
118119 updateJob (job .getId (), update .build ());
120+ // Wait until the notification that the job was updated is indexed
121+ assertBusy (
122+ () -> assertResponse (
123+ prepareSearch (NotificationsIndex .NOTIFICATIONS_INDEX ).setSize (1 )
124+ .addSort ("timestamp" , SortOrder .DESC )
125+ .setQuery (
126+ QueryBuilders .boolQuery ()
127+ .filter (QueryBuilders .termQuery ("job_id" , job .getId ()))
128+ .filter (QueryBuilders .termQuery ("level" , "info" ))
129+ ),
130+ searchResponse -> {
131+ SearchHit [] hits = searchResponse .getHits ().getHits ();
132+ assertThat (hits .length , equalTo (1 ));
133+ assertThat ((String ) hits [0 ].getSourceAsMap ().get ("message" ), containsString ("Job updated: [detectors]" ));
134+ }
135+ )
136+ );
119137 }
120138
121139 // push second half
122- openJob (job .getId ());
123140 postData (job .getId (), joinBetween (data .size () / 2 , data .size (), data ));
124- closeJob (job .getId ());
141+ flushJob (job .getId (), true );
125142
126143 GetRecordsAction .Request recordsAfterFirstHalf = new GetRecordsAction .Request (job .getId ());
127144 recordsAfterFirstHalf .setStart (String .valueOf (firstRecordTimestamp + 1 ));
128145 records = getRecords (recordsAfterFirstHalf );
129146 assertThat ("records were " + records , (int ) (records .stream ().filter (r -> r .getProbability () < 0.01 ).count ()), equalTo (1 ));
130147 assertThat (records .get (0 ).getByFieldValue (), equalTo ("low" ));
148+ closeJob (job .getId ());
131149 }
132150
133151 public void testScope () throws Exception {
@@ -242,7 +260,7 @@ public void testScope() throws Exception {
242260 closeJob (job .getId ());
243261 }
244262
245- public void testScopeAndCondition () throws IOException {
263+ public void testScopeAndCondition () throws Exception {
246264 // We have 2 IPs and they're both safe-listed.
247265 List <String > ips = Arrays .asList ("111.111.111.111" , "222.222.222.222" );
248266 MlFilter safeIps = MlFilter .builder ("safe_ips" ).setItems (ips ).build ();
@@ -298,11 +316,112 @@ public void testScopeAndCondition() throws IOException {
298316 }
299317
300318 postData (job .getId (), joinBetween (0 , data .size (), data ));
301- closeJob (job .getId ());
319+ flushJob (job .getId (), true );
302320
303321 List <AnomalyRecord > records = getRecords (job .getId ());
304322 assertThat (records .size (), equalTo (1 ));
305323 assertThat (records .get (0 ).getOverFieldValue (), equalTo ("222.222.222.222" ));
324+
325+ // Remove "111.111.111.111" from the "safe_ips" filter
326+ List <String > addedIps = Arrays .asList ();
327+ List <String > removedIps = Arrays .asList ("111.111.111.111" );
328+ PutFilterAction .Response updatedFilter = updateMlFilter ("safe_ips" , addedIps , removedIps );
329+ // Wait until the notification that the filter was updated is indexed
330+ assertBusy (
331+ () -> assertResponse (
332+ prepareSearch (NotificationsIndex .NOTIFICATIONS_INDEX ).setSize (1 )
333+ .addSort ("timestamp" , SortOrder .DESC )
334+ .setQuery (
335+ QueryBuilders .boolQuery ()
336+ .filter (QueryBuilders .termQuery ("job_id" , job .getId ()))
337+ .filter (QueryBuilders .termQuery ("level" , "info" ))
338+ ),
339+ searchResponse -> {
340+ SearchHit [] hits = searchResponse .getHits ().getHits ();
341+ assertThat (hits .length , equalTo (1 ));
342+ assertThat (
343+ (String ) hits [0 ].getSourceAsMap ().get ("message" ),
344+ containsString ("Filter [safe_ips] has been modified; removed items: ['111.111.111.111']" )
345+ );
346+ }
347+ )
348+ );
349+ MlFilter updatedSafeIps = MlFilter .builder ("safe_ips" ).setItems (Arrays .asList ("222.222.222.222" )).build ();
350+ assertThat (updatedFilter .getFilter (), equalTo (updatedSafeIps ));
351+
352+ data .clear ();
353+ // Now send anomalous count of 9 for 111.111.111.111
354+ for (int i = 0 ; i < 9 ; i ++) {
355+ data .add (createIpRecord (timestamp , "111.111.111.111" ));
356+ }
357+
358+ // Some more normal buckets
359+ for (int bucket = 0 ; bucket < 3 ; bucket ++) {
360+ for (String ip : ips ) {
361+ data .add (createIpRecord (timestamp , ip ));
362+ }
363+ timestamp += TimeValue .timeValueHours (1 ).getMillis ();
364+ }
365+
366+ postData (job .getId (), joinBetween (0 , data .size (), data ));
367+ flushJob (job .getId (), true );
368+
369+ records = getRecords (job .getId ());
370+ assertThat (records .size (), equalTo (2 ));
371+ assertThat (records .get (0 ).getOverFieldValue (), equalTo ("222.222.222.222" ));
372+ assertThat (records .get (1 ).getOverFieldValue (), equalTo ("111.111.111.111" ));
373+
374+ {
375+ // Update detection rules such that it now applies only to actual values > 10.0
376+ DetectionRule newRule = new DetectionRule .Builder (
377+ Arrays .asList (new RuleCondition (RuleCondition .AppliesTo .ACTUAL , Operator .GT , 10.0 ))
378+ ).build ();
379+ JobUpdate .Builder update = new JobUpdate .Builder (job .getId ());
380+ update .setDetectorUpdates (Arrays .asList (new JobUpdate .DetectorUpdate (0 , null , Arrays .asList (newRule ))));
381+ updateJob (job .getId (), update .build ());
382+ // Wait until the notification that the job was updated is indexed
383+ assertBusy (
384+ () -> assertResponse (
385+ prepareSearch (NotificationsIndex .NOTIFICATIONS_INDEX ).setSize (1 )
386+ .addSort ("timestamp" , SortOrder .DESC )
387+ .setQuery (
388+ QueryBuilders .boolQuery ()
389+ .filter (QueryBuilders .termQuery ("job_id" , job .getId ()))
390+ .filter (QueryBuilders .termQuery ("level" , "info" ))
391+ ),
392+ searchResponse -> {
393+ SearchHit [] hits = searchResponse .getHits ().getHits ();
394+ assertThat (hits .length , equalTo (1 ));
395+ assertThat ((String ) hits [0 ].getSourceAsMap ().get ("message" ), containsString ("Job updated: [detectors]" ));
396+ }
397+ )
398+ );
399+ }
400+
401+ data .clear ();
402+ // Now send anomalous count of 10 for 222.222.222.222
403+ for (int i = 0 ; i < 10 ; i ++) {
404+ data .add (createIpRecord (timestamp , "222.222.222.222" ));
405+ }
406+
407+ // Some more normal buckets
408+ for (int bucket = 0 ; bucket < 3 ; bucket ++) {
409+ for (String ip : ips ) {
410+ data .add (createIpRecord (timestamp , ip ));
411+ }
412+ timestamp += TimeValue .timeValueHours (1 ).getMillis ();
413+ }
414+
415+ postData (job .getId (), joinBetween (0 , data .size (), data ));
416+
417+ closeJob (job .getId ());
418+
419+ // The anomalous records should not have changed.
420+ records = getRecords (job .getId ());
421+ assertThat (records .size (), equalTo (2 ));
422+ assertThat (records .get (0 ).getOverFieldValue (), equalTo ("222.222.222.222" ));
423+ assertThat (records .get (1 ).getOverFieldValue (), equalTo ("111.111.111.111" ));
424+
306425 }
307426
308427 public void testForceTimeShiftAction () throws Exception {
0 commit comments