Skip to content

Commit 972228d

Browse files
Fix NPE in anomaly localization (#280)
* Fix NPE in anomaly localization Signed-off-by: jackiehanyang <[email protected]> * Change the way of finding if an index exists in anomaly localization Signed-off-by: jackiehanyang <[email protected]> * Fix failed unit tests in anomaly localizer Signed-off-by: jackiehanyang <[email protected]> * Support index pattern check for localization Signed-off-by: jackiehanyang <[email protected]> * Update the logic for checking if pattern exist for localization Signed-off-by: jackiehanyang <[email protected]>
1 parent 1e15c88 commit 972228d

File tree

3 files changed

+167
-17
lines changed

3 files changed

+167
-17
lines changed

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/anomalylocalization/AnomalyLocalizerImpl.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@
2626
import org.opensearch.action.search.MultiSearchResponse;
2727
import org.opensearch.action.search.SearchRequest;
2828
import org.opensearch.action.search.SearchResponse;
29+
import org.opensearch.action.support.IndicesOptions;
2930
import org.opensearch.client.Client;
31+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
32+
import org.opensearch.cluster.service.ClusterService;
3033
import org.opensearch.common.settings.Settings;
34+
import org.opensearch.index.IndexNotFoundException;
3135
import org.opensearch.index.query.BoolQueryBuilder;
3236
import org.opensearch.index.query.QueryBuilders;
3337
import org.opensearch.index.query.RangeQueryBuilder;
@@ -73,6 +77,8 @@ public class AnomalyLocalizerImpl implements AnomalyLocalizer, Executable {
7377

7478
private final Client client;
7579
private final Settings settings;
80+
private final ClusterService clusterService;
81+
private final IndexNameExpressionResolver indexNameExpressionResolver;
7682

7783
/**
7884
* Constructor.
@@ -82,9 +88,13 @@ public class AnomalyLocalizerImpl implements AnomalyLocalizer, Executable {
8288
*/
8389
public AnomalyLocalizerImpl(
8490
Client client,
85-
Settings settings) {
91+
Settings settings,
92+
ClusterService clusterService,
93+
IndexNameExpressionResolver indexNameExpressionResolver) {
8694
this.client = client;
8795
this.settings = settings;
96+
this.clusterService = clusterService;
97+
this.indexNameExpressionResolver = indexNameExpressionResolver;
8898
}
8999

90100
/**
@@ -125,18 +135,32 @@ private void onOverallAggregatesResponse(MultiSearchResponse response, AnomalyLo
125135
LocalizationTimeBuckets timeBuckets, ActionListener<AnomalyLocalizationOutput> listener) {
126136
AnomalyLocalizationOutput.Result result = new AnomalyLocalizationOutput.Result();
127137
List<Map.Entry<Long, Long>> intervals = timeBuckets.getAllIntervals();
128-
for (int i = 0; i < intervals.size(); i++) {
129-
double value = getDoubleValue((SingleValue) response.getResponses()[i].getResponse().getAggregations().get(agg.getName()));
130-
131-
AnomalyLocalizationOutput.Bucket bucket = new AnomalyLocalizationOutput.Bucket();
132-
bucket.setStartTime(intervals.get(i).getKey());
133-
bucket.setEndTime(intervals.get(i).getValue());
134-
bucket.setOverallAggValue(value);
135-
result.getBuckets().add(bucket);
136-
}
137138

138-
output.getResults().put(agg.getName(), result);
139-
getLocalizedEntities(input, agg, result, output, listener);
139+
if (isIndexExist(input.getIndexName())) {
140+
for (int i = 0; i < intervals.size(); i++) {
141+
double value = getDoubleValue((SingleValue) response.getResponses()[i].getResponse().getAggregations().get(agg.getName()));
142+
143+
AnomalyLocalizationOutput.Bucket bucket = new AnomalyLocalizationOutput.Bucket();
144+
bucket.setStartTime(intervals.get(i).getKey());
145+
bucket.setEndTime(intervals.get(i).getValue());
146+
bucket.setOverallAggValue(value);
147+
result.getBuckets().add(bucket);
148+
}
149+
output.getResults().put(agg.getName(), result);
150+
getLocalizedEntities(input, agg, result, output, listener);
151+
} else {
152+
log.info("index: {} does not exist", input.getIndexName());
153+
listener.onFailure(new IndexNotFoundException("Failed to find index: " + input.getIndexName()));
154+
}
155+
}
156+
157+
private boolean isIndexExist(String indexName) {
158+
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
159+
IndicesOptions.lenientExpandOpen(), indexName);
160+
if (concreteIndices == null || concreteIndices.length == 0) {
161+
return false;
162+
}
163+
return Arrays.stream(concreteIndices).anyMatch(index -> clusterService.state().metadata().hasIndex(index));
140164
}
141165

142166
/**

ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/anomalylocalization/AnomalyLocalizerImplTests.java

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,39 @@
88
import java.util.Arrays;
99
import java.util.Collections;
1010
import java.util.HashMap;
11+
import java.util.HashSet;
1112
import java.util.Map;
1213
import java.util.Optional;
14+
import java.util.Set;
15+
import java.util.concurrent.atomic.AtomicInteger;
1316

17+
import org.junit.AfterClass;
1418
import org.junit.Before;
19+
import org.junit.Rule;
1520
import org.junit.Test;
21+
import org.junit.rules.ExpectedException;
1622
import org.mockito.ArgumentCaptor;
1723
import org.mockito.Mock;
1824
import org.mockito.MockitoAnnotations;
25+
import org.opensearch.Version;
1926
import org.opensearch.action.ActionListener;
2027
import org.opensearch.action.search.MultiSearchResponse;
2128
import org.opensearch.action.search.SearchResponse;
29+
import org.opensearch.action.support.IndicesOptions;
2230
import org.opensearch.client.Client;
31+
import org.opensearch.cluster.ClusterName;
32+
import org.opensearch.cluster.ClusterState;
33+
import org.opensearch.cluster.metadata.IndexMetadata;
34+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
35+
import org.opensearch.cluster.metadata.Metadata;
36+
import org.opensearch.cluster.node.DiscoveryNode;
37+
import org.opensearch.cluster.node.DiscoveryNodeRole;
38+
import org.opensearch.cluster.node.DiscoveryNodes;
39+
import org.opensearch.cluster.service.ClusterService;
40+
import org.opensearch.common.collect.ImmutableOpenMap;
2341
import org.opensearch.common.settings.Settings;
42+
import org.opensearch.common.transport.TransportAddress;
43+
import org.opensearch.index.IndexNotFoundException;
2444
import org.opensearch.index.query.QueryBuilder;
2545
import org.opensearch.ml.common.input.execute.anomalylocalization.AnomalyLocalizationInput;
2646
import org.opensearch.ml.common.output.execute.anomalylocalization.AnomalyLocalizationOutput;
@@ -32,10 +52,13 @@
3252
import org.opensearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
3353

3454
import static org.junit.Assert.assertEquals;
55+
import static org.mockito.ArgumentMatchers.anyString;
56+
import static org.mockito.ArgumentMatchers.eq;
3557
import static org.mockito.Mockito.any;
3658
import static org.mockito.Mockito.doAnswer;
3759
import static org.mockito.Mockito.doReturn;
3860
import static org.mockito.Mockito.mock;
61+
import static org.mockito.Mockito.spy;
3962
import static org.mockito.Mockito.verify;
4063
import static org.mockito.Mockito.when;
4164

@@ -47,6 +70,15 @@ public class AnomalyLocalizerImplTests {
4770
@Mock
4871
private ActionListener<AnomalyLocalizationOutput> outputListener;
4972

73+
@Mock
74+
private ClusterService clusterService;
75+
76+
@Mock
77+
private IndexNameExpressionResolver indexNameExpressionResolver;
78+
79+
@Rule
80+
public ExpectedException exceptionRule = ExpectedException.none();
81+
5082
private Settings settings;
5183

5284
private AnomalyLocalizerImpl anomalyLocalizer;
@@ -71,13 +103,26 @@ public class AnomalyLocalizerImplTests {
71103
private AnomalyLocalizationOutput.Bucket expectedBucketOne;
72104
private AnomalyLocalizationOutput.Bucket expectedBucketTwo;
73105
private AnomalyLocalizationOutput.Entity entity;
106+
private static final AtomicInteger portGenerator = new AtomicInteger();
107+
ClusterState testState;
108+
String clusterName = "test cluster";
109+
DiscoveryNode node;
110+
String[] IndicesOptions;
111+
String[] invalidIndicesOptions;
74112

75113
@Before
76114
@SuppressWarnings("unchecked")
77115
public void setup() {
78116
MockitoAnnotations.openMocks(this);
79117
settings = Settings.builder().build();
80-
anomalyLocalizer = new AnomalyLocalizerImpl(client, settings);
118+
testState = setupTestClusterState();
119+
IndicesOptions = new String[]{"indexName"};
120+
invalidIndicesOptions = new String[]{};
121+
anomalyLocalizer = spy(
122+
new AnomalyLocalizerImpl(client,
123+
settings,
124+
clusterService,
125+
indexNameExpressionResolver));
81126

82127
input = new AnomalyLocalizationInput(indexName, Arrays.asList(attributeFieldNameOne), Arrays.asList(agg), timeFieldName,
83128
startTime, endTime,
@@ -200,8 +245,17 @@ public void setup() {
200245
expectedOutput.getResults().put(agg.getName(), result);
201246
}
202247

248+
@AfterClass
249+
public static void resetPortCounter() {
250+
portGenerator.set(0);
251+
}
252+
203253
@Test
204254
public void testGetLocalizedResultsGivenNoAnomaly() {
255+
when(clusterService.state()).thenReturn(testState);
256+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
257+
any(IndicesOptions.class), anyString()))
258+
.thenReturn(IndicesOptions);
205259
anomalyLocalizer.getLocalizationResults(input, outputListener);
206260

207261
ArgumentCaptor<AnomalyLocalizationOutput> outputCaptor = ArgumentCaptor.forClass(AnomalyLocalizationOutput.class);
@@ -213,6 +267,10 @@ public void testGetLocalizedResultsGivenNoAnomaly() {
213267
@Test
214268
public void testGetLocalizedResultsGivenAnomaly() {
215269
when(valueThree.value()).thenReturn(Double.NaN);
270+
when(clusterService.state()).thenReturn(testState);
271+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
272+
any(IndicesOptions.class), anyString()))
273+
.thenReturn(IndicesOptions);
216274
input = new AnomalyLocalizationInput(indexName, Arrays.asList(attributeFieldNameOne), Arrays.asList(agg), timeFieldName,
217275
startTime, endTime,
218276
minTimeInterval, numOutput, Optional.of(1L), Optional.of(mock(QueryBuilder.class)));
@@ -234,6 +292,39 @@ public void testGetLocalizedResultsForInvalidTimeRange() {
234292
anomalyLocalizer.getLocalizationResults(input, outputListener);
235293
}
236294

295+
@Test
296+
public void testGetLocalizedResultsForInvalidIndexName() {
297+
input = new AnomalyLocalizationInput("invalid", Arrays.asList(attributeFieldNameOne), Arrays.asList(agg), timeFieldName,
298+
startTime, endTime,
299+
minTimeInterval, numOutput, Optional.of(1L), Optional.of(mock(QueryBuilder.class)));
300+
testState = setupTestClusterState();
301+
when(clusterService.state()).thenReturn(testState);
302+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
303+
any(IndicesOptions.class), anyString()))
304+
.thenReturn(invalidIndicesOptions);
305+
anomalyLocalizer.getLocalizationResults(input, outputListener);
306+
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
307+
verify(outputListener).onFailure(argumentCaptor.capture());
308+
assertEquals(IndexNotFoundException.class, argumentCaptor.getValue().getClass());
309+
}
310+
311+
@Test
312+
public void testGetLocalizedResultsGivenIndexPattern() {
313+
input = new AnomalyLocalizationInput("index*", Arrays.asList(attributeFieldNameOne), Arrays.asList(agg), timeFieldName,
314+
startTime, endTime,
315+
minTimeInterval, numOutput, Optional.of(1L), Optional.of(mock(QueryBuilder.class)));
316+
when(clusterService.state()).thenReturn(testState);
317+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
318+
any(IndicesOptions.class), eq("index*")))
319+
.thenReturn(IndicesOptions);
320+
anomalyLocalizer.getLocalizationResults(input, outputListener);
321+
322+
ArgumentCaptor<AnomalyLocalizationOutput> outputCaptor = ArgumentCaptor.forClass(AnomalyLocalizationOutput.class);
323+
verify(outputListener).onResponse(outputCaptor.capture());
324+
AnomalyLocalizationOutput actualOutput = outputCaptor.getValue();
325+
assertEquals(expectedOutput, actualOutput);
326+
}
327+
237328
@Test
238329
@SuppressWarnings("unchecked")
239330
public void testGetLocalizedResultsForSearchFailure() {
@@ -256,7 +347,10 @@ public void testGetLocalizedResultsOverallDecrease() {
256347
when(valueOne.value()).thenReturn(10.);
257348
when(valueTwo.value()).thenReturn(0.);
258349
when(valueThree.value()).thenReturn(11.);
259-
350+
when(clusterService.state()).thenReturn(testState);
351+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
352+
any(IndicesOptions.class), anyString()))
353+
.thenReturn(IndicesOptions);
260354
anomalyLocalizer.getLocalizationResults(input, outputListener);
261355

262356
ArgumentCaptor<AnomalyLocalizationOutput> outputCaptor = ArgumentCaptor.forClass(AnomalyLocalizationOutput.class);
@@ -274,7 +368,10 @@ public void testGetLocalizedResultsOverallDecrease() {
274368
public void testGetLocalizedResultsOverallUnchange() {
275369
when(valueOne.value()).thenReturn(0.);
276370
when(valueTwo.value()).thenReturn(0.);
277-
371+
when(clusterService.state()).thenReturn(testState);
372+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
373+
any(IndicesOptions.class), anyString()))
374+
.thenReturn(IndicesOptions);
278375
anomalyLocalizer.getLocalizationResults(input, outputListener);
279376

280377
ArgumentCaptor<AnomalyLocalizationOutput> outputCaptor = ArgumentCaptor.forClass(AnomalyLocalizationOutput.class);
@@ -292,7 +389,10 @@ public void testGetLocalizedResultsFilterEntity() {
292389
input = new AnomalyLocalizationInput(indexName, Arrays.asList(attributeFieldNameOne), Arrays.asList(agg), timeFieldName,
293390
startTime, endTime,
294391
minTimeInterval, 2, Optional.empty(), Optional.empty());
295-
392+
when(clusterService.state()).thenReturn(testState);
393+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
394+
any(IndicesOptions.class), anyString()))
395+
.thenReturn(IndicesOptions);
296396
anomalyLocalizer.getLocalizationResults(input, outputListener);
297397

298398
ArgumentCaptor<AnomalyLocalizationOutput> outputCaptor = ArgumentCaptor.forClass(AnomalyLocalizationOutput.class);
@@ -303,6 +403,10 @@ public void testGetLocalizedResultsFilterEntity() {
303403

304404
@Test
305405
public void testExecuteSucceed() {
406+
when(clusterService.state()).thenReturn(testState);
407+
when(indexNameExpressionResolver.concreteIndexNames(any(ClusterState.class),
408+
any(IndicesOptions.class), anyString()))
409+
.thenReturn(IndicesOptions);
306410
AnomalyLocalizationOutput actualOutput = (AnomalyLocalizationOutput) anomalyLocalizer.execute(input);
307411

308412
assertEquals(expectedOutput, actualOutput);
@@ -326,6 +430,28 @@ public void testExecuteInterrupted() {
326430
Thread.currentThread().interrupt();
327431
anomalyLocalizer.execute(input);
328432
}
433+
434+
private ClusterState setupTestClusterState() {
435+
Set<DiscoveryNodeRole> roleSet = new HashSet<>();
436+
roleSet.add(DiscoveryNodeRole.DATA_ROLE);
437+
node = new DiscoveryNode("node",
438+
new TransportAddress(TransportAddress.META_ADDRESS, portGenerator.incrementAndGet()),
439+
new HashMap<>(), roleSet,
440+
Version.CURRENT);
441+
Metadata metadata = new Metadata.Builder()
442+
.indices(ImmutableOpenMap
443+
.<String, IndexMetadata>builder()
444+
.fPut(indexName, IndexMetadata.builder("test")
445+
.settings(Settings.builder()
446+
.put("index.number_of_shards", 1)
447+
.put("index.number_of_replicas", 1)
448+
.put("index.version.created", Version.CURRENT.id))
449+
.build())
450+
.build()).build();
451+
return new ClusterState(new ClusterName(clusterName), 123l, "111111",
452+
metadata, null, DiscoveryNodes.builder().add(node).build(),
453+
null, null, 0, false);
454+
}
329455
}
330456

331457

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public Collection<Object> createComponents(
217217
LocalSampleCalculator localSampleCalculator = new LocalSampleCalculator(client, settings);
218218
MLEngineClassLoader.register(FunctionName.LOCAL_SAMPLE_CALCULATOR, localSampleCalculator);
219219

220-
AnomalyLocalizerImpl anomalyLocalizer = new AnomalyLocalizerImpl(client, settings);
220+
AnomalyLocalizerImpl anomalyLocalizer = new AnomalyLocalizerImpl(client, settings, clusterService, indexNameExpressionResolver);
221221
MLEngineClassLoader.register(FunctionName.ANOMALY_LOCALIZATION, anomalyLocalizer);
222222

223223
MLSearchHandler mlSearchHandler = new MLSearchHandler(client, xContentRegistry);

0 commit comments

Comments
 (0)