| 
9 | 9 | import org.elasticsearch.common.io.stream.Writeable;  | 
10 | 10 | import org.elasticsearch.test.AbstractWireSerializingTestCase;  | 
11 | 11 | import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response;  | 
 | 12 | +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response.RunningState;  | 
 | 13 | +import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;  | 
12 | 14 | import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests;  | 
13 | 15 | 
 
  | 
14 | 16 | import java.util.function.Function;  | 
@@ -41,4 +43,68 @@ protected Writeable.Reader<Response> instanceReader() {  | 
41 | 43 |         return Response::new;  | 
42 | 44 |     }  | 
43 | 45 | 
 
  | 
 | 46 | +    /**  | 
 | 47 | +     * Tests merging responses with the same datafeed ID but different running states,  | 
 | 48 | +     * where both states have a searchInterval with different start times.  | 
 | 49 | +     * The state with the more recent searchInterval (larger startMs value) should be selected.  | 
 | 50 | +     */  | 
 | 51 | +    public void testMergeWithDuplicateKeysAndDifferentSearchIntervals() {  | 
 | 52 | +        SearchInterval olderInterval = new SearchInterval(1000L, 2000L);  | 
 | 53 | +        SearchInterval newerInterval = new SearchInterval(3000L, 4000L);  | 
 | 54 | + | 
 | 55 | +        RunningState olderState = new RunningState(true, true, olderInterval);  | 
 | 56 | +        RunningState newerState = new RunningState(false, false, newerInterval);  | 
 | 57 | + | 
 | 58 | +        String datafeedId = "test-datafeed";  | 
 | 59 | +        Response response1 = Response.fromTaskAndState(datafeedId, olderState);  | 
 | 60 | +        Response response2 = Response.fromTaskAndState(datafeedId, newerState);  | 
 | 61 | + | 
 | 62 | +        Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));  | 
 | 63 | + | 
 | 64 | +        assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));  | 
 | 65 | + | 
 | 66 | +        mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));  | 
 | 67 | +        assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));  | 
 | 68 | +    }  | 
 | 69 | + | 
 | 70 | +    /**  | 
 | 71 | +     * Tests merging responses with the same datafeed ID but different running states,  | 
 | 72 | +     * where only one state has a searchInterval.  | 
 | 73 | +     * The state with the searchInterval should be selected, regardless of order.  | 
 | 74 | +     */  | 
 | 75 | +    public void testMergeWithDuplicateKeysWhenOnlyOneHasSearchInterval() {  | 
 | 76 | +        SearchInterval interval = new SearchInterval(1000L, 2000L);  | 
 | 77 | + | 
 | 78 | +        RunningState stateWithInterval = new RunningState(true, true, interval);  | 
 | 79 | +        RunningState stateWithoutInterval = new RunningState(false, false, null);  | 
 | 80 | + | 
 | 81 | +        String datafeedId = "test-datafeed";  | 
 | 82 | +        Response response1 = Response.fromTaskAndState(datafeedId, stateWithInterval);  | 
 | 83 | +        Response response2 = Response.fromTaskAndState(datafeedId, stateWithoutInterval);  | 
 | 84 | + | 
 | 85 | +        Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));  | 
 | 86 | + | 
 | 87 | +        assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));  | 
 | 88 | + | 
 | 89 | +        mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));  | 
 | 90 | +        assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));  | 
 | 91 | +    }  | 
 | 92 | + | 
 | 93 | +    /**  | 
 | 94 | +     * Tests merging responses with the same datafeed ID but different running states,  | 
 | 95 | +     * where neither state has a searchInterval.  | 
 | 96 | +     * In this case, the second state in the list should be selected.  | 
 | 97 | +     */  | 
 | 98 | +    public void testMergeWithDuplicateKeysWhenNeitherHasSearchInterval() {  | 
 | 99 | +        RunningState state1 = new RunningState(true, true, null);  | 
 | 100 | +        RunningState state2 = new RunningState(false, false, null);  | 
 | 101 | + | 
 | 102 | +        String datafeedId = "test-datafeed";  | 
 | 103 | +        Response response1 = Response.fromTaskAndState(datafeedId, state1);  | 
 | 104 | +        Response response2 = Response.fromTaskAndState(datafeedId, state2);  | 
 | 105 | + | 
 | 106 | +        Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));  | 
 | 107 | + | 
 | 108 | +        assertEquals(state2, mergedResponse.getRunningState(datafeedId).orElse(null));  | 
 | 109 | +    }  | 
44 | 110 | }  | 
0 commit comments