Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
64021b5
initial commit
jackiehanyang Nov 6, 2025
45e795f
Merge remote-tracking branch 'origin/main' into tmp/merge-main-into-i…
jackiehanyang Nov 10, 2025
b14da81
Introduce Insights API
jackiehanyang Nov 11, 2025
ed68a6d
spotless apply
jackiehanyang Nov 11, 2025
200d461
add change log
jackiehanyang Nov 11, 2025
671e103
fix
jackiehanyang Nov 11, 2025
77fcda9
make timestamp more user friendly when reading
jackiehanyang Nov 11, 2025
3d50179
fix forbidden api violation by avoiding default-locale usage
jackiehanyang Nov 11, 2025
49018e0
add more tests
jackiehanyang Nov 12, 2025
816d69b
use stashed context when starting insights job
jackiehanyang Nov 20, 2025
1356920
add one time insights job run
jackiehanyang Nov 22, 2025
b731aef
do immediate one time run job right after starting insights job
jackiehanyang Nov 23, 2025
1ebe2a8
address comments
jackiehanyang Nov 26, 2025
be01bc9
Merge remote-tracking branch 'origin/main' into insights
jackiehanyang Jan 23, 2026
7525e0d
replace ml-commons metrics correlation algorithm with temporal overla…
jackiehanyang Jan 26, 2026
85db8bb
address comments
jackiehanyang Jan 27, 2026
bb524d1
add more tests
jackiehanyang Jan 27, 2026
83f21d7
address comments
jackiehanyang Jan 29, 2026
92914b5
Merge remote-tracking branch 'origin/main' into insights
jackiehanyang Jan 29, 2026
fd0f1d9
fix security test
jackiehanyang Feb 1, 2026
4d6ef59
Merge branch 'main' into insights
jackiehanyang Feb 2, 2026
be03e63
remove usage of pluginClient
jackiehanyang Feb 4, 2026
b380dc1
Merge branch 'main' into insights
jackiehanyang Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Features
- Correlating Anomalies via Temporal Overlap Similarity ([#1641](https://github.com/opensearch-project/anomaly-detection/pull/1641))
- Introduce Insights API ([1610](https://github.com/opensearch-project/anomaly-detection/pull/1610))

### Enhancements
### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1193,4 +1193,4 @@ tasks.withType(AbstractPublishToMaven) {
onlyIf("Publishing only ZIP distributions") {
predicate.get()
}
}
}
1,014 changes: 1,014 additions & 0 deletions src/main/java/org/opensearch/ad/InsightsJobProcessor.java

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/main/java/org/opensearch/ad/constant/ADCommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class ADCommonName {
// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";

// The insights result index alias
public static final String INSIGHTS_RESULT_INDEX_ALIAS = "opensearch-ad-plugin-insights";

// ======================================
// Anomaly Detector name for X-Opaque-Id header
// ======================================
Expand Down Expand Up @@ -72,4 +75,10 @@ public class ADCommonName {
public static final String DUMMY_AD_RESULT_ID = "dummy_ad_result_id";
public static final String DUMMY_DETECTOR_ID = "dummy_detector_id";
public static final String CUSTOM_RESULT_INDEX_PREFIX = "opensearch-ad-plugin-result-";

// ======================================
// Insights job
// ======================================
// The AD Insights job name
public static final String INSIGHTS_JOB_NAME = "ad_insights_job";
}
7 changes: 6 additions & 1 deletion src/main/java/org/opensearch/ad/indices/ADIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ public enum ADIndex implements TimeSeriesIndex {
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getCheckpointMappings)
),
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings)),
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),;
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),
CUSTOM_INSIGHTS_RESULT(
ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS,
true,
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getInsightsResultMappings)
);

private final String indexName;
// whether we use an alias for the index
Expand Down
126 changes: 126 additions & 0 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,35 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.INSIGHTS_RESULT_INDEX_MAPPING_FILE;

import java.io.IOException;
import java.util.EnumMap;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.transport.client.Client;
Expand All @@ -54,6 +60,9 @@
public class ADIndexManagement extends IndexManagement<ADIndex> {
private static final Logger logger = LogManager.getLogger(ADIndexManagement.class);

// cache the insights result mapping configs to avoid repeated parsing
private volatile Map<String, Object> INSIGHTS_RESULT_FIELD_CONFIGS;

// The index name pattern to query all the AD result history indices
public static final String AD_RESULT_HISTORY_INDEX_PATTERN = "<.opendistro-anomaly-results-history-{now/d}-1>";

Expand Down Expand Up @@ -142,6 +151,58 @@ public static String getFlattenedResultMappings() throws IOException {
return objectMapper.writeValueAsString(mapping);
}

/**
* Get insights result index mapping json content.
*
* @return insights result index mapping
* @throws IOException IOException if mapping file can't be read correctly
*/
public static String getInsightsResultMappings() throws IOException {
return getMappings(INSIGHTS_RESULT_INDEX_MAPPING_FILE);
}

private void initInsightsResultMapping() throws IOException {
if (INSIGHTS_RESULT_FIELD_CONFIGS != null) {
return;
}

String mappingJson = getInsightsResultMappings();
Map<String, Object> asMap = XContentHelper.convertToMap(new BytesArray(mappingJson), false, XContentType.JSON).v2();
Object properties = asMap.get(CommonName.PROPERTIES);
if (properties instanceof Map) {
INSIGHTS_RESULT_FIELD_CONFIGS = (Map<String, Object>) properties;
} else {
logger.error("Fail to read insights result mapping file.");
}
}

/**
* Validate insights result index mapping against the insights mapping file.
*
* @param resultIndexOrAlias insights result index or alias
* @param thenDo listener returns true if insights result index mapping is valid
*/
public void validateInsightsResultIndexMapping(String resultIndexOrAlias, ActionListener<Boolean> thenDo) {
getConcreteIndex(resultIndexOrAlias, ActionListener.wrap(concreteIndex -> {
if (concreteIndex == null) {
thenDo.onResponse(false);
return;
}
try {
initInsightsResultMapping();
if (INSIGHTS_RESULT_FIELD_CONFIGS == null) {
// failed to populate the field
thenDo.onResponse(false);
return;
}
validateIndexMapping(concreteIndex, INSIGHTS_RESULT_FIELD_CONFIGS, "insights result index", thenDo);
} catch (Exception e) {
logger.error("Failed to validate insights result index mapping for index " + concreteIndex, e);
thenDo.onResponse(false);
}
}, thenDo::onFailure));
}

/**
* Get anomaly detector state index mapping json content.
*
Expand Down Expand Up @@ -213,6 +274,52 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
);
}

/**
* Check if insights result index alias exists.
*
* @return true if insights result index alias exists
*/
public boolean doesInsightsResultIndexExist() {
return doesAliasExist(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS);
}

/**
* Create insights result index directly.
* Uses the same rollover pattern as custom result indices.
*
* @param actionListener action called after create index
*/
public void initInsightsResultIndexDirectly(ActionListener<CreateIndexResponse> actionListener) {
try {
String insightsResultIndexPattern = getRolloverIndexPattern(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS);
String mapping = getInsightsResultMappings();

CreateIndexRequest request = new CreateIndexRequest(insightsResultIndexPattern)
.mapping(mapping, XContentType.JSON)
.alias(new Alias(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS).writeIndex(true));

request.settings(Settings.builder().put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, customResultIndexAutoExpandReplica));

adminClient.indices().create(request, actionListener);
} catch (IOException e) {
logger.error("Failed to init insights result index", e);
actionListener.onFailure(e);
}
}

/**
* Create insights result index if it does not exist.
*
* @param actionListener action called after create index
*/
public void initInsightsResultIndexIfAbsent(ActionListener<CreateIndexResponse> actionListener) {
if (!doesInsightsResultIndexExist()) {
initInsightsResultIndexDirectly(actionListener);
} else {
actionListener.onResponse(null);
}
}

/**
* Create the state index.
*
Expand Down Expand Up @@ -252,12 +359,31 @@ public void initCheckpointIndex(ActionListener<CreateIndexResponse> actionListen

@Override
protected void rolloverAndDeleteHistoryIndex() {
// rollover anomaly result index
rolloverAndDeleteHistoryIndex(
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
ALL_AD_RESULTS_INDEX_PATTERN,
AD_RESULT_HISTORY_INDEX_PATTERN,
ADIndex.RESULT
);

// rollover insights result index
rolloverAndDeleteInsightsHistoryIndex();
}

/**
* rollover and delete old insights result indices.
* Uses same retention policy as system result index.
*/
protected void rolloverAndDeleteInsightsHistoryIndex() {
if (doesInsightsResultIndexExist()) {
rolloverAndDeleteHistoryIndex(
ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS,
getAllHistoryIndexPattern(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS),
getRolloverIndexPattern(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS),
ADIndex.CUSTOM_INSIGHTS_RESULT
);
}
}

/**
Expand Down
Loading
Loading