Skip to content

Commit 89adec1

Browse files
authored
[ML] Resolve duplicate key exception in GetDatafeedRunningStateAction (#125477)
1 parent fc933d4 commit 89adec1

File tree

3 files changed

+89
-1
lines changed

3 files changed

+89
-1
lines changed

docs/changelog/125477.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 125477
2+
summary: Prevent get datafeeds stats API returning an error when local tasks are slow to stop
3+
area: Machine Learning
4+
type: bug
5+
issues:
6+
- 104160

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,28 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
146146

147147
private final Map<String, RunningState> datafeedRunningState;
148148

149+
private static RunningState selectMostRecentState(RunningState state1, RunningState state2) {
150+
151+
if (state1.searchInterval != null && state2.searchInterval != null) {
152+
return state1.searchInterval.startMs() > state2.searchInterval.startMs() ? state1 : state2;
153+
}
154+
155+
if (state1.searchInterval != null) {
156+
return state1;
157+
}
158+
if (state2.searchInterval != null) {
159+
return state2;
160+
}
161+
162+
return state2;
163+
}
164+
149165
public static Response fromResponses(List<Response> responses) {
150166
return new Response(
151167
responses.stream()
152168
.flatMap(r -> r.datafeedRunningState.entrySet().stream())
153169
.filter(entry -> entry.getValue() != null)
154-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
170+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Response::selectMostRecentState))
155171
);
156172
}
157173

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import org.elasticsearch.common.io.stream.Writeable;
1010
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1111
import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response;
12+
import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response.RunningState;
13+
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
1214
import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests;
1315

1416
import java.util.function.Function;
@@ -41,4 +43,68 @@ protected Writeable.Reader<Response> instanceReader() {
4143
return Response::new;
4244
}
4345

46+
/**
47+
* Tests merging responses with the same datafeed ID but different running states,
48+
* where both states have a searchInterval with different start times.
49+
* The state with the more recent searchInterval (larger startMs value) should be selected.
50+
*/
51+
public void testMergeWithDuplicateKeysAndDifferentSearchIntervals() {
52+
SearchInterval olderInterval = new SearchInterval(1000L, 2000L);
53+
SearchInterval newerInterval = new SearchInterval(3000L, 4000L);
54+
55+
RunningState olderState = new RunningState(true, true, olderInterval);
56+
RunningState newerState = new RunningState(false, false, newerInterval);
57+
58+
String datafeedId = "test-datafeed";
59+
Response response1 = Response.fromTaskAndState(datafeedId, olderState);
60+
Response response2 = Response.fromTaskAndState(datafeedId, newerState);
61+
62+
Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));
63+
64+
assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));
65+
66+
mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));
67+
assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));
68+
}
69+
70+
/**
71+
* Tests merging responses with the same datafeed ID but different running states,
72+
* where only one state has a searchInterval.
73+
* The state with the searchInterval should be selected, regardless of order.
74+
*/
75+
public void testMergeWithDuplicateKeysWhenOnlyOneHasSearchInterval() {
76+
SearchInterval interval = new SearchInterval(1000L, 2000L);
77+
78+
RunningState stateWithInterval = new RunningState(true, true, interval);
79+
RunningState stateWithoutInterval = new RunningState(false, false, null);
80+
81+
String datafeedId = "test-datafeed";
82+
Response response1 = Response.fromTaskAndState(datafeedId, stateWithInterval);
83+
Response response2 = Response.fromTaskAndState(datafeedId, stateWithoutInterval);
84+
85+
Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));
86+
87+
assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));
88+
89+
mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));
90+
assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));
91+
}
92+
93+
/**
94+
* Tests merging responses with the same datafeed ID but different running states,
95+
* where neither state has a searchInterval.
96+
* In this case, the second state in the list should be selected.
97+
*/
98+
public void testMergeWithDuplicateKeysWhenNeitherHasSearchInterval() {
99+
RunningState state1 = new RunningState(true, true, null);
100+
RunningState state2 = new RunningState(false, false, null);
101+
102+
String datafeedId = "test-datafeed";
103+
Response response1 = Response.fromTaskAndState(datafeedId, state1);
104+
Response response2 = Response.fromTaskAndState(datafeedId, state2);
105+
106+
Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));
107+
108+
assertEquals(state2, mergedResponse.getRunningState(datafeedId).orElse(null));
109+
}
44110
}

0 commit comments

Comments
 (0)