Skip to content

Commit fbb297b

Browse files
jowg-amazonsbcd90
andauthored
[Backport 2.x] Add support for remote monitors (#694)
* changes to add support for remote monitors in alerting (#661) Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> * changes to add support for remote monitors in alerting (#662) * changes to add support for remote monitors in alerting Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> * add tests for moved classes Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> --------- Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> * changes to support generic inputs and triggers in remote monitors (#664) Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> * add remote doc level monitor input (#665) Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> * fix serde of RemoteDocLevelMonitorInput (#666) Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> * fix serde for monitor (#692) Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> --------- Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> Co-authored-by: Subhobrata Dey <sbcd90@gmail.com>
1 parent 413ae0f commit fbb297b

30 files changed

+2413
-26
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.alerting.action
7+
8+
import org.opensearch.action.ActionType
9+
10+
class DocLevelMonitorFanOutAction private constructor() : ActionType<DocLevelMonitorFanOutResponse>(NAME, ::DocLevelMonitorFanOutResponse) {
11+
companion object {
12+
val INSTANCE = DocLevelMonitorFanOutAction()
13+
const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout"
14+
}
15+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.alerting.action
7+
8+
import org.opensearch.action.ActionRequest
9+
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.commons.alerting.model.IndexExecutionContext
11+
import org.opensearch.commons.alerting.model.Monitor
12+
import org.opensearch.commons.alerting.model.MonitorMetadata
13+
import org.opensearch.commons.alerting.model.WorkflowRunContext
14+
import org.opensearch.core.common.io.stream.StreamInput
15+
import org.opensearch.core.common.io.stream.StreamOutput
16+
import org.opensearch.core.index.shard.ShardId
17+
import org.opensearch.core.xcontent.ToXContent
18+
import org.opensearch.core.xcontent.ToXContentObject
19+
import org.opensearch.core.xcontent.XContentBuilder
20+
import java.io.IOException
21+
22+
class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
23+
val monitor: Monitor
24+
val dryRun: Boolean
25+
val monitorMetadata: MonitorMetadata
26+
val executionId: String
27+
val indexExecutionContext: IndexExecutionContext?
28+
val shardIds: List<ShardId>
29+
val concreteIndicesSeenSoFar: List<String>
30+
val workflowRunContext: WorkflowRunContext?
31+
32+
constructor(
33+
monitor: Monitor,
34+
dryRun: Boolean,
35+
monitorMetadata: MonitorMetadata,
36+
executionId: String,
37+
indexExecutionContext: IndexExecutionContext?,
38+
shardIds: List<ShardId>,
39+
concreteIndicesSeenSoFar: List<String>,
40+
workflowRunContext: WorkflowRunContext?
41+
) : super() {
42+
this.monitor = monitor
43+
this.dryRun = dryRun
44+
this.monitorMetadata = monitorMetadata
45+
this.executionId = executionId
46+
this.indexExecutionContext = indexExecutionContext
47+
this.shardIds = shardIds
48+
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
49+
this.workflowRunContext = workflowRunContext
50+
require(false == shardIds.isEmpty()) { }
51+
}
52+
53+
@Throws(IOException::class)
54+
constructor(sin: StreamInput) : this(
55+
monitor = Monitor.readFrom(sin)!!,
56+
dryRun = sin.readBoolean(),
57+
monitorMetadata = MonitorMetadata.readFrom(sin),
58+
executionId = sin.readString(),
59+
shardIds = sin.readList(::ShardId),
60+
concreteIndicesSeenSoFar = sin.readStringList(),
61+
workflowRunContext = if (sin.readBoolean()) {
62+
WorkflowRunContext(sin)
63+
} else { null },
64+
indexExecutionContext = IndexExecutionContext(sin)
65+
)
66+
67+
@Throws(IOException::class)
68+
override fun writeTo(out: StreamOutput) {
69+
monitor.writeTo(out)
70+
out.writeBoolean(dryRun)
71+
monitorMetadata.writeTo(out)
72+
out.writeString(executionId)
73+
out.writeCollection(shardIds)
74+
out.writeStringCollection(concreteIndicesSeenSoFar)
75+
out.writeBoolean(workflowRunContext != null)
76+
workflowRunContext?.writeTo(out)
77+
indexExecutionContext?.writeTo(out)
78+
}
79+
80+
override fun validate(): ActionRequestValidationException? {
81+
var actionValidationException: ActionRequestValidationException? = null
82+
if (shardIds.isEmpty()) {
83+
actionValidationException = ActionRequestValidationException()
84+
actionValidationException.addValidationError("shard_ids is null or empty")
85+
}
86+
return actionValidationException
87+
}
88+
89+
@Throws(IOException::class)
90+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
91+
builder.startObject()
92+
.field("monitor", monitor)
93+
.field("dry_run", dryRun)
94+
.field("execution_id", executionId)
95+
.field("index_execution_context", indexExecutionContext)
96+
.field("shard_ids", shardIds)
97+
.field("concrete_indices", concreteIndicesSeenSoFar)
98+
.field("workflow_run_context", workflowRunContext)
99+
return builder.endObject()
100+
}
101+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.alerting.action
7+
8+
import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
9+
import org.opensearch.commons.alerting.model.InputRunResults
10+
import org.opensearch.commons.alerting.util.AlertingException
11+
import org.opensearch.core.action.ActionResponse
12+
import org.opensearch.core.common.io.stream.StreamInput
13+
import org.opensearch.core.common.io.stream.StreamOutput
14+
import org.opensearch.core.xcontent.ToXContent
15+
import org.opensearch.core.xcontent.ToXContentObject
16+
import org.opensearch.core.xcontent.XContentBuilder
17+
import java.io.IOException
18+
19+
class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
20+
val nodeId: String
21+
val executionId: String
22+
val monitorId: String
23+
val lastRunContexts: MutableMap<String, Any>
24+
val inputResults: InputRunResults
25+
val triggerResults: Map<String, DocumentLevelTriggerRunResult>
26+
val exception: AlertingException?
27+
28+
@Throws(IOException::class)
29+
constructor(sin: StreamInput) : this(
30+
nodeId = sin.readString(),
31+
executionId = sin.readString(),
32+
monitorId = sin.readString(),
33+
lastRunContexts = sin.readMap()!! as MutableMap<String, Any>,
34+
inputResults = InputRunResults.readFrom(sin),
35+
triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom)),
36+
exception = sin.readException()
37+
)
38+
39+
constructor(
40+
nodeId: String,
41+
executionId: String,
42+
monitorId: String,
43+
lastRunContexts: MutableMap<String, Any>,
44+
inputResults: InputRunResults = InputRunResults(), // partial,
45+
triggerResults: Map<String, DocumentLevelTriggerRunResult> = mapOf(),
46+
exception: AlertingException? = null
47+
) : super() {
48+
this.nodeId = nodeId
49+
this.executionId = executionId
50+
this.monitorId = monitorId
51+
this.lastRunContexts = lastRunContexts
52+
this.inputResults = inputResults
53+
this.triggerResults = triggerResults
54+
this.exception = exception
55+
}
56+
57+
@Throws(IOException::class)
58+
override fun writeTo(out: StreamOutput) {
59+
out.writeString(nodeId)
60+
out.writeString(executionId)
61+
out.writeString(monitorId)
62+
out.writeMap(lastRunContexts)
63+
inputResults.writeTo(out)
64+
out.writeMap(
65+
triggerResults,
66+
StreamOutput::writeString,
67+
{ stream, stats -> stats.writeTo(stream) }
68+
)
69+
out.writeException(exception)
70+
}
71+
72+
@Throws(IOException::class)
73+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
74+
builder.startObject()
75+
.field("node_id", nodeId)
76+
.field("execution_id", executionId)
77+
.field("monitor_id", monitorId)
78+
.field("last_run_contexts", lastRunContexts)
79+
.field("input_results", inputResults)
80+
.field("trigger_results", triggerResults)
81+
.field("exception", exception)
82+
.endObject()
83+
return builder
84+
}
85+
86+
companion object {
87+
@Suppress("UNCHECKED_CAST")
88+
fun suppressWarning(map: MutableMap<String?, Any?>?): Map<String, DocumentLevelTriggerRunResult> {
89+
return map as Map<String, DocumentLevelTriggerRunResult>
90+
}
91+
}
92+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.alerting.model
7+
8+
import org.opensearch.core.common.io.stream.StreamInput
9+
import org.opensearch.core.common.io.stream.StreamOutput
10+
import org.opensearch.core.xcontent.ToXContent
11+
import org.opensearch.core.xcontent.XContentBuilder
12+
import java.io.IOException
13+
14+
data class BucketLevelTriggerRunResult(
15+
override var triggerName: String,
16+
override var error: Exception? = null,
17+
var aggregationResultBuckets: Map<String, AggregationResultBucket>,
18+
var actionResultsMap: MutableMap<String, MutableMap<String, ActionRunResult>> = mutableMapOf()
19+
) : TriggerRunResult(triggerName, error) {
20+
21+
@Throws(IOException::class)
22+
@Suppress("UNCHECKED_CAST")
23+
constructor(sin: StreamInput) : this(
24+
sin.readString(),
25+
sin.readException() as Exception?, // error
26+
sin.readMap(StreamInput::readString, ::AggregationResultBucket),
27+
sin.readMap() as MutableMap<String, MutableMap<String, ActionRunResult>>
28+
)
29+
30+
override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
31+
return builder
32+
.field(AGG_RESULT_BUCKETS, aggregationResultBuckets)
33+
.field(ACTIONS_RESULTS, actionResultsMap as Map<String, Any>)
34+
}
35+
36+
@Throws(IOException::class)
37+
@Suppress("UNCHECKED_CAST")
38+
override fun writeTo(out: StreamOutput) {
39+
super.writeTo(out)
40+
out.writeMap(aggregationResultBuckets, StreamOutput::writeString) {
41+
valueOut: StreamOutput, aggResultBucket: AggregationResultBucket ->
42+
aggResultBucket.writeTo(valueOut)
43+
}
44+
out.writeMap(actionResultsMap as Map<String, Any>)
45+
}
46+
47+
companion object {
48+
const val AGG_RESULT_BUCKETS = "agg_result_buckets"
49+
const val ACTIONS_RESULTS = "action_results"
50+
51+
@JvmStatic
52+
@Throws(IOException::class)
53+
fun readFrom(sin: StreamInput): TriggerRunResult {
54+
return BucketLevelTriggerRunResult(sin)
55+
}
56+
}
57+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.alerting.model
7+
8+
import org.opensearch.commons.alerting.alerts.AlertError
9+
import org.opensearch.core.common.io.stream.StreamInput
10+
import org.opensearch.core.common.io.stream.StreamOutput
11+
import org.opensearch.core.xcontent.ToXContent
12+
import org.opensearch.core.xcontent.XContentBuilder
13+
import org.opensearch.script.ScriptException
14+
import java.io.IOException
15+
import java.time.Instant
16+
17+
data class ChainedAlertTriggerRunResult(
18+
override var triggerName: String,
19+
var triggered: Boolean,
20+
override var error: Exception?,
21+
var actionResults: MutableMap<String, ActionRunResult> = mutableMapOf(),
22+
val associatedAlertIds: Set<String>
23+
) : TriggerRunResult(triggerName, error) {
24+
25+
@Throws(IOException::class)
26+
@Suppress("UNCHECKED_CAST")
27+
constructor(sin: StreamInput) : this(
28+
triggerName = sin.readString(),
29+
error = sin.readException(),
30+
triggered = sin.readBoolean(),
31+
actionResults = sin.readMap() as MutableMap<String, ActionRunResult>,
32+
associatedAlertIds = sin.readStringList().toSet()
33+
)
34+
35+
override fun alertError(): AlertError? {
36+
if (error != null) {
37+
return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}")
38+
}
39+
for (actionResult in actionResults.values) {
40+
if (actionResult.error != null) {
41+
return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}")
42+
}
43+
}
44+
return null
45+
}
46+
47+
override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
48+
if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error)
49+
return builder
50+
.field("triggered", triggered)
51+
.field("action_results", actionResults as Map<String, ActionRunResult>)
52+
}
53+
54+
@Throws(IOException::class)
55+
override fun writeTo(out: StreamOutput) {
56+
super.writeTo(out)
57+
out.writeBoolean(triggered)
58+
out.writeMap(actionResults as Map<String, ActionRunResult>)
59+
out.writeStringCollection(associatedAlertIds)
60+
}
61+
62+
companion object {
63+
@JvmStatic
64+
@Throws(IOException::class)
65+
fun readFrom(sin: StreamInput): TriggerRunResult {
66+
return ChainedAlertTriggerRunResult(sin)
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)