Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Features
- Correlating Anomalies via Temporal Overlap Similarity ([#1641](https://github.com/opensearch-project/anomaly-detection/pull/1641))
- Introduce Insights API ([1610](https://github.com/opensearch-project/anomaly-detection/pull/1610))

### Enhancements
### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1196,4 +1196,4 @@ tasks.withType(AbstractPublishToMaven) {
onlyIf("Publishing only ZIP distributions") {
predicate.get()
}
}
}
1,019 changes: 1,019 additions & 0 deletions src/main/java/org/opensearch/ad/InsightsJobProcessor.java

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/main/java/org/opensearch/ad/constant/ADCommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class ADCommonName {
// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";

// The insights result index alias
public static final String INSIGHTS_RESULT_INDEX_ALIAS = "opensearch-ad-plugin-insights";

// ======================================
// Anomaly Detector name for X-Opaque-Id header
// ======================================
Expand Down Expand Up @@ -72,4 +75,10 @@ public class ADCommonName {
public static final String DUMMY_AD_RESULT_ID = "dummy_ad_result_id";
public static final String DUMMY_DETECTOR_ID = "dummy_detector_id";
public static final String CUSTOM_RESULT_INDEX_PREFIX = "opensearch-ad-plugin-result-";

// ======================================
// Insights job
// ======================================
// The AD Insights job name
public static final String INSIGHTS_JOB_NAME = "ad_insights_job";
}
7 changes: 6 additions & 1 deletion src/main/java/org/opensearch/ad/indices/ADIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ public enum ADIndex implements TimeSeriesIndex {
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getCheckpointMappings)
),
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings)),
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),;
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),
CUSTOM_INSIGHTS_RESULT(
ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS,
true,
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getInsightsResultMappings)
);

private final String indexName;
// whether we use an alias for the index
Expand Down
126 changes: 126 additions & 0 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,35 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.INSIGHTS_RESULT_INDEX_MAPPING_FILE;

import java.io.IOException;
import java.util.EnumMap;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.transport.client.Client;
Expand All @@ -54,6 +60,9 @@
public class ADIndexManagement extends IndexManagement<ADIndex> {
private static final Logger logger = LogManager.getLogger(ADIndexManagement.class);

// cache the insights result mapping configs to avoid repeated parsing
private volatile Map<String, Object> INSIGHTS_RESULT_FIELD_CONFIGS;

// The index name pattern to query all the AD result history indices
public static final String AD_RESULT_HISTORY_INDEX_PATTERN = "<.opendistro-anomaly-results-history-{now/d}-1>";

Expand Down Expand Up @@ -142,6 +151,58 @@ public static String getFlattenedResultMappings() throws IOException {
return objectMapper.writeValueAsString(mapping);
}

/**
* Get insights result index mapping json content.
*
* @return insights result index mapping
* @throws IOException IOException if mapping file can't be read correctly
*/
public static String getInsightsResultMappings() throws IOException {
return getMappings(INSIGHTS_RESULT_INDEX_MAPPING_FILE);
}

private void initInsightsResultMapping() throws IOException {
if (INSIGHTS_RESULT_FIELD_CONFIGS != null) {
return;
}

String mappingJson = getInsightsResultMappings();
Map<String, Object> asMap = XContentHelper.convertToMap(new BytesArray(mappingJson), false, XContentType.JSON).v2();
Object properties = asMap.get(CommonName.PROPERTIES);
if (properties instanceof Map) {
INSIGHTS_RESULT_FIELD_CONFIGS = (Map<String, Object>) properties;
} else {
logger.error("Fail to read insights result mapping file.");
}
}

/**
* Validate insights result index mapping against the insights mapping file.
*
* @param resultIndexOrAlias insights result index or alias
* @param thenDo listener returns true if insights result index mapping is valid
*/
public void validateInsightsResultIndexMapping(String resultIndexOrAlias, ActionListener<Boolean> thenDo) {
getConcreteIndex(resultIndexOrAlias, ActionListener.wrap(concreteIndex -> {
if (concreteIndex == null) {
thenDo.onResponse(false);
return;
}
try {
initInsightsResultMapping();
if (INSIGHTS_RESULT_FIELD_CONFIGS == null) {
// failed to populate the field
thenDo.onResponse(false);
return;
}
validateIndexMapping(concreteIndex, INSIGHTS_RESULT_FIELD_CONFIGS, "insights result index", thenDo);
} catch (Exception e) {
logger.error("Failed to validate insights result index mapping for index " + concreteIndex, e);
thenDo.onResponse(false);
}
}, thenDo::onFailure));
}

/**
* Get anomaly detector state index mapping json content.
*
Expand Down Expand Up @@ -213,6 +274,52 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
);
}

/**
* Check if insights result index alias exists.
*
* @return true if insights result index alias exists
*/
public boolean doesInsightsResultIndexExist() {
return doesAliasExist(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS);
}

/**
* Create insights result index directly.
* Uses the same rollover pattern as custom result indices.
*
* @param actionListener action called after create index
*/
public void initInsightsResultIndexDirectly(ActionListener<CreateIndexResponse> actionListener) {
try {
String insightsResultIndexPattern = getRolloverIndexPattern(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS);
String mapping = getInsightsResultMappings();

CreateIndexRequest request = new CreateIndexRequest(insightsResultIndexPattern)
.mapping(mapping, XContentType.JSON)
.alias(new Alias(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS).writeIndex(true));

request.settings(Settings.builder().put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, customResultIndexAutoExpandReplica));

adminClient.indices().create(request, actionListener);
} catch (IOException e) {
logger.error("Failed to init insights result index", e);
actionListener.onFailure(e);
}
}

/**
* Create insights result index if it does not exist.
*
* @param actionListener action called after create index
*/
public void initInsightsResultIndexIfAbsent(ActionListener<CreateIndexResponse> actionListener) {
if (!doesInsightsResultIndexExist()) {
initInsightsResultIndexDirectly(actionListener);
} else {
actionListener.onResponse(null);
}
}

/**
* Create the state index.
*
Expand Down Expand Up @@ -252,12 +359,31 @@ public void initCheckpointIndex(ActionListener<CreateIndexResponse> actionListen

@Override
protected void rolloverAndDeleteHistoryIndex() {
// rollover anomaly result index
rolloverAndDeleteHistoryIndex(
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
ALL_AD_RESULTS_INDEX_PATTERN,
AD_RESULT_HISTORY_INDEX_PATTERN,
ADIndex.RESULT
);

// rollover insights result index
rolloverAndDeleteInsightsHistoryIndex();
}

/**
* rollover and delete old insights result indices.
* Uses same retention policy as system result index.
*/
protected void rolloverAndDeleteInsightsHistoryIndex() {
if (doesInsightsResultIndexExist()) {
rolloverAndDeleteHistoryIndex(
ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS,
getAllHistoryIndexPattern(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS),
getRolloverIndexPattern(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS),
ADIndex.CUSTOM_INSIGHTS_RESULT
);
}
}

/**
Expand Down
Loading
Loading