Skip to content

Commit 5c9dabd

Browse files
committed
address comments
Signed-off-by: Jackie <jkhanjob@gmail.com>
1 parent 7525e0d commit 5c9dabd

19 files changed

+723
-395
lines changed

build.gradle

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,6 @@ buildscript {
6868
// It is useful to record intermediately information like prediction precision and recall.
6969
// This option turn on log printing during tests.
7070
printLogs = "true" == System.getProperty("test.logs", "false")
71-
72-
// OpenSearch 3.5.0-SNAPSHOT pulls Jackson 2.20.1 for some modules (e.g., Smile, Core), while
73-
// `jackson-annotations` 2.20.x is not published on Maven Central yet.
74-
// Keep core/databind aligned with OpenSearch, and pin annotations to the latest published 2.19.x.
75-
jackson_core_version = System.getProperty("jackson.core.version", "2.20.1")
76-
jackson_databind_version = System.getProperty("jackson.databind.version", "2.20.1")
77-
jackson_annotations_version = System.getProperty("jackson.annotations.version", "2.19.0")
7871
}
7972

8073
repositories {
@@ -169,8 +162,8 @@ dependencies {
169162

170163

171164
// we inherit jackson-core from opensearch core
172-
implementation("com.fasterxml.jackson.core:jackson-databind:${jackson_databind_version}")
173-
implementation("com.fasterxml.jackson.core:jackson-annotations:${jackson_annotations_version}")
165+
implementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}")
166+
implementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}")
174167

175168
// used for serializing/deserializing rcf models.
176169
implementation 'io.protostuff:protostuff-core:1.8.0'
@@ -277,9 +270,7 @@ configurations.all {
277270
force "junit:junit:4.13.2"
278271

279272
force "com.google.guava:guava:33.4.5-jre" // CVE for 31.1
280-
force("com.fasterxml.jackson.core:jackson-core:${jackson_core_version}")
281-
force("com.fasterxml.jackson.core:jackson-annotations:${jackson_annotations_version}")
282-
force("com.fasterxml.jackson.core:jackson-databind:${jackson_databind_version}")
273+
force("com.fasterxml.jackson.core:jackson-core:${versions.jackson}")
283274
force "org.ow2.asm:asm:9.7.1"
284275
}
285276
}

src/main/java/org/opensearch/ad/InsightsJobProcessor.java

Lines changed: 180 additions & 67 deletions
Large diffs are not rendered by default.

src/main/java/org/opensearch/ad/ml/InsightsGenerator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
import java.util.Locale;
1717
import java.util.Map;
1818
import java.util.Set;
19-
import java.util.UUID;
2019

21-
import org.opensearch.ad.constant.ADCommonName;
2220
import org.opensearch.ad.correlation.Anomaly;
2321
import org.opensearch.ad.correlation.AnomalyCorrelation;
2422
import org.opensearch.ad.model.AnomalyResult;
@@ -151,7 +149,6 @@ public static XContentBuilder generateInsightsFromClusters(
151149
XContentBuilder builder = XContentFactory.jsonBuilder();
152150
builder.startObject();
153151

154-
builder.field("task_id", "task_" + ADCommonName.INSIGHTS_JOB_NAME + "_" + UUID.randomUUID().toString());
155152
builder.field("window_start", executionStartTime.toEpochMilli());
156153
builder.field("window_end", executionEndTime.toEpochMilli());
157154
builder.field("generated_at", Instant.now().toEpochMilli());

src/main/java/org/opensearch/ad/rest/RestInsightsJobAction.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,10 @@
88
package org.opensearch.ad.rest;
99

1010
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_REQUEST_TIMEOUT;
11-
import static org.opensearch.timeseries.util.RestHandlerUtils.DETECTOR_ID;
1211
import static org.opensearch.timeseries.util.RestHandlerUtils.FREQUENCY;
13-
import static org.opensearch.timeseries.util.RestHandlerUtils.FROM;
14-
import static org.opensearch.timeseries.util.RestHandlerUtils.INDEX;
15-
import static org.opensearch.timeseries.util.RestHandlerUtils.INSIGHTS_RESULTS;
1612
import static org.opensearch.timeseries.util.RestHandlerUtils.INSIGHTS_START;
1713
import static org.opensearch.timeseries.util.RestHandlerUtils.INSIGHTS_STATUS;
1814
import static org.opensearch.timeseries.util.RestHandlerUtils.INSIGHTS_STOP;
19-
import static org.opensearch.timeseries.util.RestHandlerUtils.SIZE;
2015

2116
import java.io.IOException;
2217
import java.util.List;
@@ -43,7 +38,6 @@
4338
* This class consists of the REST handler to handle request to start, get results, check status, and stop insights job.
4439
* POST /_plugins/_anomaly_detection/insights/_start - Start insights job
4540
* GET /_plugins/_anomaly_detection/insights/_status - Get insights job status
46-
* GET /_plugins/_anomaly_detection/insights/_results - Get latest insights results
4741
* POST /_plugins/_anomaly_detection/insights/_stop - Stop insights job
4842
*/
4943
public class RestInsightsJobAction extends RestJobAction {
@@ -81,11 +75,6 @@ public List<Route> routes() {
8175
new Route(
8276
RestRequest.Method.POST,
8377
String.format(Locale.ROOT, "%s/insights/%s", TimeSeriesAnalyticsPlugin.AD_BASE_URI, INSIGHTS_STOP)
84-
),
85-
// Get insights results
86-
new Route(
87-
RestRequest.Method.GET,
88-
String.format(Locale.ROOT, "%s/insights/%s", TimeSeriesAnalyticsPlugin.AD_BASE_URI, INSIGHTS_RESULTS)
8978
)
9079
);
9180
}
@@ -109,13 +98,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
10998
insightsJobRequest = parseStartRequest(request, rawPath);
11099
} else if (rawPath.contains(INSIGHTS_STATUS)) {
111100
insightsJobRequest = new InsightsJobRequest(rawPath);
112-
} else if (rawPath.contains(INSIGHTS_RESULTS)) {
113-
String detectorId = request.param(DETECTOR_ID);
114-
String index = request.param(INDEX);
115-
int from = request.paramAsInt(FROM, 0);
116-
int size = request.paramAsInt(SIZE, 20);
117-
118-
insightsJobRequest = new InsightsJobRequest(detectorId, index, from, size, rawPath);
119101
} else if (rawPath.contains(INSIGHTS_STOP)) {
120102
insightsJobRequest = new InsightsJobRequest(rawPath);
121103
} else {

src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import org.opensearch.ad.constant.ADCommonName;
3131
import org.opensearch.ad.indices.ADIndexManagement;
3232
import org.opensearch.ad.transport.InsightsJobResponse;
33+
import org.opensearch.common.settings.Settings;
3334
import org.opensearch.common.unit.TimeValue;
3435
import org.opensearch.common.util.concurrent.ThreadContext;
3536
import org.opensearch.common.xcontent.XContentFactory;
37+
import org.opensearch.commons.InjectSecurity;
3638
import org.opensearch.commons.authuser.User;
3739
import org.opensearch.core.action.ActionListener;
3840
import org.opensearch.core.rest.RestStatus;
@@ -63,16 +65,19 @@ public class InsightsJobActionHandler {
6365
private final NamedXContentRegistry xContentRegistry;
6466
private final ADIndexManagement indexManagement;
6567
private final TimeValue requestTimeout;
68+
private final Settings settings;
6669

6770
public InsightsJobActionHandler(
6871
Client client,
6972
NamedXContentRegistry xContentRegistry,
7073
ADIndexManagement indexManagement,
74+
Settings settings,
7175
TimeValue requestTimeout
7276
) {
7377
this.client = client;
7478
this.xContentRegistry = xContentRegistry;
7579
this.indexManagement = indexManagement;
80+
this.settings = settings;
7681
this.requestTimeout = requestTimeout;
7782
}
7883

@@ -88,15 +93,35 @@ public void startInsightsJob(String frequency, ActionListener<InsightsJobRespons
8893
// Get user context from current request (will be stored in job and used during execution)
8994
User user = ParseUtils.getUserContext(client);
9095

91-
// init insights-results index
92-
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
96+
// init insights-results index (customer-owned index). Use user credentials when available.
97+
if (user == null) {
98+
// Security disabled / no user context. Proceed without injection.
9399
indexManagement.initInsightsResultIndexIfAbsent(ActionListener.wrap(createIndexResponse -> {
94-
// create insights job
95-
ensureJobIndexAndCreateJob(frequency, user, listener);
100+
ensureJobIndexAndCreateJob(frequency, null, listener);
96101
}, e -> {
97102
logger.error("Failed to initialize insights result index", e);
98103
listener.onFailure(e);
99104
}));
105+
return;
106+
}
107+
108+
InjectSecurity injectSecurity = new InjectSecurity(ADCommonName.INSIGHTS_JOB_NAME, settings, client.threadPool().getThreadContext());
109+
try {
110+
injectSecurity.inject(user.getName(), user.getRoles());
111+
indexManagement
112+
.initInsightsResultIndexIfAbsent(
113+
ActionListener.runBefore(ActionListener.wrap(createIndexResponse -> {
114+
// create insights job
115+
ensureJobIndexAndCreateJob(frequency, user, listener);
116+
}, e -> {
117+
logger.error("Failed to initialize insights result index", e);
118+
listener.onFailure(e);
119+
}), injectSecurity::close)
120+
);
121+
} catch (Exception e) {
122+
injectSecurity.close();
123+
logger.error("Failed to inject security context for insights result index initialization", e);
124+
listener.onFailure(e);
100125
}
101126
}
102127

src/main/java/org/opensearch/ad/transport/InsightsJobTransportAction.java

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
import org.apache.logging.log4j.LogManager;
1818
import org.apache.logging.log4j.Logger;
19-
import org.opensearch.action.search.SearchRequest;
20-
import org.opensearch.action.search.SearchResponse;
2119
import org.opensearch.action.support.ActionFilters;
2220
import org.opensearch.action.support.HandledTransportAction;
2321
import org.opensearch.ad.constant.ADCommonName;
@@ -29,11 +27,6 @@
2927
import org.opensearch.common.settings.Settings;
3028
import org.opensearch.core.action.ActionListener;
3129
import org.opensearch.core.xcontent.NamedXContentRegistry;
32-
import org.opensearch.index.query.BoolQueryBuilder;
33-
import org.opensearch.index.query.QueryBuilders;
34-
import org.opensearch.search.SearchHit;
35-
import org.opensearch.search.builder.SearchSourceBuilder;
36-
import org.opensearch.search.sort.SortOrder;
3730
import org.opensearch.tasks.Task;
3831
import org.opensearch.timeseries.transport.InsightsJobRequest;
3932
import org.opensearch.transport.TransportService;
@@ -61,6 +54,7 @@ public InsightsJobTransportAction(
6154
client,
6255
xContentRegistry,
6356
indexManagement,
57+
settings,
6458
AnomalyDetectorSettings.AD_REQUEST_TIMEOUT.get(settings)
6559
);
6660
}
@@ -73,8 +67,6 @@ protected void doExecute(Task task, InsightsJobRequest request, ActionListener<I
7367
handleStatusOperation(request, listener);
7468
} else if (request.isStopOperation()) {
7569
handleStopOperation(request, listener);
76-
} else if (request.isResultsOperation()) {
77-
handleResultsOperation(request, listener);
7870
} else {
7971
listener.onFailure(new IllegalArgumentException("Unknown operation"));
8072
}
@@ -93,64 +85,4 @@ private void handleStatusOperation(InsightsJobRequest request, ActionListener<In
9385
private void handleStopOperation(InsightsJobRequest request, ActionListener<InsightsJobResponse> listener) {
9486
jobHandler.stopInsightsJob(listener);
9587
}
96-
97-
private void handleResultsOperation(InsightsJobRequest request, ActionListener<InsightsJobResponse> listener) {
98-
try {
99-
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
100-
101-
if (request.getDetectorId() != null && !request.getDetectorId().isEmpty()) {
102-
boolQuery.must(QueryBuilders.termQuery("doc_detector_ids", request.getDetectorId()));
103-
}
104-
105-
if (request.getIndex() != null && !request.getIndex().isEmpty()) {
106-
boolQuery.must(QueryBuilders.termQuery("doc_indices", request.getIndex()));
107-
}
108-
109-
if (!boolQuery.hasClauses()) {
110-
boolQuery.must(QueryBuilders.matchAllQuery());
111-
}
112-
113-
SearchRequest searchRequest = new SearchRequest(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS)
114-
.source(
115-
new SearchSourceBuilder()
116-
.query(boolQuery)
117-
.from(request.getFrom())
118-
.size(request.getSize())
119-
.sort("generated_at", SortOrder.DESC)
120-
);
121-
122-
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
123-
long totalHits = searchResponse.getHits().getTotalHits() != null ? searchResponse.getHits().getTotalHits().value() : 0;
124-
log.debug("Search completed, found {} hits", totalHits);
125-
handleSearchResponse(searchResponse, listener);
126-
}, e -> {
127-
if (e.getMessage() != null && e.getMessage().contains("No mapping found")) {
128-
listener.onResponse(new InsightsJobResponse(new ArrayList<>(), 0L));
129-
} else {
130-
log.error("Failed to search insights results", e);
131-
listener.onFailure(e);
132-
}
133-
}));
134-
135-
} catch (Exception e) {
136-
log.error("Error building search request for insights results", e);
137-
listener.onFailure(e);
138-
}
139-
}
140-
141-
private void handleSearchResponse(SearchResponse searchResponse, ActionListener<InsightsJobResponse> listener) {
142-
try {
143-
List<String> results = new ArrayList<>();
144-
145-
for (SearchHit hit : searchResponse.getHits().getHits()) {
146-
results.add(hit.getSourceAsString());
147-
}
148-
149-
long totalHits = searchResponse.getHits().getTotalHits() != null ? searchResponse.getHits().getTotalHits().value() : 0;
150-
listener.onResponse(new InsightsJobResponse(results, totalHits));
151-
} catch (Exception e) {
152-
log.error("Error processing search response", e);
153-
listener.onFailure(e);
154-
}
155-
}
15688
}

src/main/java/org/opensearch/timeseries/indices/IndexManagement.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,12 @@ protected void validateIndexMapping(
12531253
boolean correctMapping = true;
12541254
for (String fieldName : expectedFieldConfigs.keySet()) {
12551255
Object expectedField = expectedFieldConfigs.get(fieldName);
1256+
// the field might be a map or map of map
1257+
// example: map: {type=date, format=strict_date_time||epoch_millis}
1258+
// map of map: {type=nested, properties={likelihood={type=double}, value_list={type=nested,
1259+
// properties={data={type=double},
1260+
// feature_id={type=keyword}}}}}
1261+
// if it is a map of map, Object.equals can compare them regardless of order
12561262
if (!actualSchema.containsKey(fieldName)) {
12571263
logger.warn("mapping mismatch due to missing {}", fieldName);
12581264
correctMapping = false;

src/main/java/org/opensearch/timeseries/transport/InsightsJobRequest.java

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
public class InsightsJobRequest extends ActionRequest {
2222

2323
private String frequency;
24-
private String detectorId;
25-
private String index;
2624
private int from;
2725
private int size;
2826
private String rawPath;
@@ -40,23 +38,6 @@ public InsightsJobRequest(String frequency, String rawPath) {
4038
this.size = 20;
4139
}
4240

43-
/**
44-
* Constructor for get results operation with filters
45-
* @param detectorId
46-
* @param index
47-
* @param from
48-
* @param size
49-
* @param rawPath
50-
*/
51-
public InsightsJobRequest(String detectorId, String index, int from, int size, String rawPath) {
52-
super();
53-
this.detectorId = detectorId;
54-
this.index = index;
55-
this.from = from;
56-
this.size = size;
57-
this.rawPath = rawPath;
58-
}
59-
6041
/**
6142
* Constructor for stop operation
6243
* @param rawPath
@@ -71,8 +52,6 @@ public InsightsJobRequest(String rawPath) {
7152
public InsightsJobRequest(StreamInput in) throws IOException {
7253
super(in);
7354
this.frequency = in.readOptionalString();
74-
this.detectorId = in.readOptionalString();
75-
this.index = in.readOptionalString();
7655
this.from = in.readInt();
7756
this.size = in.readInt();
7857
this.rawPath = in.readString();
@@ -82,8 +61,6 @@ public InsightsJobRequest(StreamInput in) throws IOException {
8261
public void writeTo(StreamOutput out) throws IOException {
8362
super.writeTo(out);
8463
out.writeOptionalString(frequency);
85-
out.writeOptionalString(detectorId);
86-
out.writeOptionalString(index);
8764
out.writeInt(from);
8865
out.writeInt(size);
8966
out.writeString(rawPath);
@@ -93,33 +70,13 @@ public void writeTo(StreamOutput out) throws IOException {
9370
public ActionRequestValidationException validate() {
9471
ActionRequestValidationException validationException = null;
9572

96-
if (rawPath != null && rawPath.contains("_results")) {
97-
if (from < 0) {
98-
validationException = new ActionRequestValidationException();
99-
validationException.addValidationError("from parameter must be non-negative");
100-
}
101-
if (size <= 0) {
102-
if (validationException == null) {
103-
validationException = new ActionRequestValidationException();
104-
}
105-
validationException.addValidationError("size parameter must be positive");
106-
}
107-
}
10873
return validationException;
10974
}
11075

11176
public String getFrequency() {
11277
return frequency;
11378
}
11479

115-
public String getDetectorId() {
116-
return detectorId;
117-
}
118-
119-
public String getIndex() {
120-
return index;
121-
}
122-
12380
public int getFrom() {
12481
return from;
12582
}
@@ -143,8 +100,4 @@ public boolean isStatusOperation() {
143100
public boolean isStopOperation() {
144101
return rawPath != null && rawPath.contains("_stop");
145102
}
146-
147-
public boolean isResultsOperation() {
148-
return rawPath != null && rawPath.contains("_results");
149-
}
150103
}

src/main/resources/mappings/insights-results.json

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44
"schema_version": 1
55
},
66
"properties": {
7-
"task_id": {
8-
"type": "keyword"
9-
},
107
"window_start": {
118
"type": "date",
129
"format": "strict_date_time||epoch_millis"

0 commit comments

Comments
 (0)