Skip to content

Commit 5ef9b55

Browse files
authored
Adds Chained alerts triggers for workflows (#456)
* adds models for chained alert trigger and chained alerts (#426) Signed-off-by: Surya Sashank Nistala <[email protected]> * fix execution id field in alerts (#429) Signed-off-by: Surya Sashank Nistala <[email protected]> * accept workflow argument in chained alert constructor (#435) Signed-off-by: Surya Sashank Nistala <[email protected]> * add tests for chained alert Signed-off-by: Surya Sashank Nistala <[email protected]> --------- Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 1484bff commit 5ef9b55

File tree

11 files changed

+311
-22
lines changed

11 files changed

+311
-22
lines changed

src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ data class Alert(
3838
val errorHistory: List<AlertError>,
3939
val severity: String,
4040
val actionExecutionResults: List<ActionExecutionResult>,
41-
val aggregationResultBucket: AggregationResultBucket? = null
41+
val aggregationResultBucket: AggregationResultBucket? = null,
42+
val executionId: String? = null,
4243
) : Writeable, ToXContent {
4344

4445
init {
@@ -47,6 +48,24 @@ data class Alert(
4748
}
4849
}
4950

51+
constructor(
52+
startTime: Instant,
53+
lastNotificationTime: Instant?,
54+
state: State = State.ACTIVE,
55+
errorMessage: String? = null,
56+
schemaVersion: Int = NO_SCHEMA_VERSION,
57+
executionId: String,
58+
chainedAlertTrigger: ChainedAlertTrigger,
59+
workflow: Workflow
60+
) : this(
61+
monitorId = NO_ID, monitorName = "", monitorVersion = NO_VERSION, monitorUser = workflow.user,
62+
triggerId = chainedAlertTrigger.id, triggerName = chainedAlertTrigger.name, state = state, startTime = startTime,
63+
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = emptyList(),
64+
severity = chainedAlertTrigger.severity, actionExecutionResults = emptyList(), schemaVersion = schemaVersion,
65+
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(),
66+
executionId = executionId
67+
)
68+
5069
constructor(
5170
monitor: Monitor,
5271
trigger: QueryLevelTrigger,
@@ -56,13 +75,15 @@ data class Alert(
5675
errorMessage: String? = null,
5776
errorHistory: List<AlertError> = mutableListOf(),
5877
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
59-
schemaVersion: Int = NO_SCHEMA_VERSION
78+
schemaVersion: Int = NO_SCHEMA_VERSION,
79+
executionId: String? = null
6080
) : this(
6181
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
6282
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
6383
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
6484
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
65-
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList()
85+
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(),
86+
executionId = executionId
6687
)
6788

6889
constructor(
@@ -75,13 +96,15 @@ data class Alert(
7596
errorHistory: List<AlertError> = mutableListOf(),
7697
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
7798
schemaVersion: Int = NO_SCHEMA_VERSION,
78-
findingIds: List<String> = emptyList()
99+
findingIds: List<String> = emptyList(),
100+
executionId: String? = null
79101
) : this(
80102
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
81103
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
82104
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
83105
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
84-
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList()
106+
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList(),
107+
executionId = executionId
85108
)
86109

87110
constructor(
@@ -95,13 +118,15 @@ data class Alert(
95118
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
96119
schemaVersion: Int = NO_SCHEMA_VERSION,
97120
aggregationResultBucket: AggregationResultBucket,
98-
findingIds: List<String> = emptyList()
121+
findingIds: List<String> = emptyList(),
122+
executionId: String? = null
99123
) : this(
100124
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
101125
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
102126
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
103127
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
104-
aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList()
128+
aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList(),
129+
executionId = executionId
105130
)
106131

107132
constructor(
@@ -116,13 +141,15 @@ data class Alert(
116141
errorMessage: String? = null,
117142
errorHistory: List<AlertError> = mutableListOf(),
118143
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
119-
schemaVersion: Int = NO_SCHEMA_VERSION
144+
schemaVersion: Int = NO_SCHEMA_VERSION,
145+
executionId: String? = null
120146
) : this(
121147
id = id, monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
122148
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
123149
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
124150
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
125-
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds
151+
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds,
152+
executionId = executionId
126153
)
127154

128155
constructor(
@@ -171,7 +198,8 @@ data class Alert(
171198
errorHistory = sin.readList(::AlertError),
172199
severity = sin.readString(),
173200
actionExecutionResults = sin.readList(::ActionExecutionResult),
174-
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null
201+
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
202+
executionId = sin.readOptionalString()
175203
)
176204

177205
fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
@@ -205,6 +233,7 @@ data class Alert(
205233
} else {
206234
out.writeBoolean(false)
207235
}
236+
out.writeOptionalString(executionId)
208237
}
209238

210239
companion object {
@@ -229,6 +258,7 @@ data class Alert(
229258
const val ALERT_HISTORY_FIELD = "alert_history"
230259
const val SEVERITY_FIELD = "severity"
231260
const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"
261+
const val EXECUTION_ID_FIELD = "execution_id"
232262
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
233263
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
234264
const val NO_ID = ""
@@ -254,6 +284,7 @@ data class Alert(
254284
var lastNotificationTime: Instant? = null
255285
var acknowledgedTime: Instant? = null
256286
var errorMessage: String? = null
287+
var executionId: String? = null
257288
val errorHistory: MutableList<AlertError> = mutableListOf()
258289
val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
259290
var aggAlertBucket: AggregationResultBucket? = null
@@ -288,6 +319,7 @@ data class Alert(
288319
LAST_NOTIFICATION_TIME_FIELD -> lastNotificationTime = xcp.instant()
289320
ACKNOWLEDGED_TIME_FIELD -> acknowledgedTime = xcp.instant()
290321
ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull()
322+
EXECUTION_ID_FIELD -> executionId = xcp.textOrNull()
291323
ALERT_HISTORY_FIELD -> {
292324
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
293325
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
@@ -323,7 +355,7 @@ data class Alert(
323355
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
324356
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
325357
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket, findingIds = findingIds,
326-
relatedDocIds = relatedDocIds
358+
relatedDocIds = relatedDocIds, executionId = executionId
327359
)
328360
}
329361

@@ -349,6 +381,7 @@ data class Alert(
349381
.field(SCHEMA_VERSION_FIELD, schemaVersion)
350382
.field(MONITOR_VERSION_FIELD, monitorVersion)
351383
.field(MONITOR_NAME_FIELD, monitorName)
384+
.field(EXECUTION_ID_FIELD, executionId)
352385

353386
if (!secure) {
354387
builder.optionalUserField(MONITOR_USER_FIELD, monitorUser)
@@ -379,6 +412,7 @@ data class Alert(
379412
ALERT_VERSION_FIELD to version,
380413
END_TIME_FIELD to endTime?.toEpochMilli(),
381414
ERROR_MESSAGE_FIELD to errorMessage,
415+
EXECUTION_ID_FIELD to executionId,
382416
LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(),
383417
SEVERITY_FIELD to severity,
384418
START_TIME_FIELD to startTime.toEpochMilli(),
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package org.opensearch.commons.alerting.model
2+
3+
import org.opensearch.common.CheckedFunction
4+
import org.opensearch.common.UUIDs
5+
import org.opensearch.common.io.stream.StreamInput
6+
import org.opensearch.common.io.stream.StreamOutput
7+
import org.opensearch.common.xcontent.XContentParserUtils
8+
import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD
9+
import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD
10+
import org.opensearch.commons.alerting.model.Trigger.Companion.NAME_FIELD
11+
import org.opensearch.commons.alerting.model.Trigger.Companion.SEVERITY_FIELD
12+
import org.opensearch.commons.alerting.model.action.Action
13+
import org.opensearch.core.ParseField
14+
import org.opensearch.core.xcontent.NamedXContentRegistry
15+
import org.opensearch.core.xcontent.ToXContent
16+
import org.opensearch.core.xcontent.XContentBuilder
17+
import org.opensearch.core.xcontent.XContentParser
18+
import org.opensearch.script.Script
19+
import java.io.IOException
20+
21+
data class ChainedAlertTrigger(
22+
override val id: String = UUIDs.base64UUID(),
23+
override val name: String,
24+
override val severity: String,
25+
override val actions: List<Action>,
26+
val condition: Script
27+
) : Trigger {
28+
29+
@Throws(IOException::class)
30+
constructor(sin: StreamInput) : this(
31+
sin.readString(), // id
32+
sin.readString(), // name
33+
sin.readString(), // severity
34+
sin.readList(::Action), // actions
35+
Script(sin)
36+
)
37+
38+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
39+
builder.startObject()
40+
.startObject(CHAINED_ALERT_TRIGGER_FIELD)
41+
.field(ID_FIELD, id)
42+
.field(NAME_FIELD, name)
43+
.field(SEVERITY_FIELD, severity)
44+
.startObject(CONDITION_FIELD)
45+
.field(SCRIPT_FIELD, condition)
46+
.endObject()
47+
.field(ACTIONS_FIELD, actions.toTypedArray())
48+
.endObject()
49+
.endObject()
50+
return builder
51+
}
52+
53+
override fun name(): String {
54+
return CHAINED_ALERT_TRIGGER_FIELD
55+
}
56+
57+
/** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */
58+
fun asTemplateArg(): Map<String, Any> {
59+
return mapOf(
60+
ID_FIELD to id,
61+
NAME_FIELD to name,
62+
SEVERITY_FIELD to severity,
63+
ACTIONS_FIELD to actions.map { it.asTemplateArg() }
64+
)
65+
}
66+
67+
@Throws(IOException::class)
68+
override fun writeTo(out: StreamOutput) {
69+
out.writeString(id)
70+
out.writeString(name)
71+
out.writeString(severity)
72+
out.writeCollection(actions)
73+
condition.writeTo(out)
74+
}
75+
76+
companion object {
77+
const val CHAINED_ALERT_TRIGGER_FIELD = "chained_alert_trigger"
78+
const val CONDITION_FIELD = "condition"
79+
const val SCRIPT_FIELD = "script"
80+
const val QUERY_IDS_FIELD = "query_ids"
81+
82+
val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(
83+
Trigger::class.java,
84+
ParseField(CHAINED_ALERT_TRIGGER_FIELD),
85+
CheckedFunction { parseInner(it) }
86+
)
87+
88+
@JvmStatic
89+
@Throws(IOException::class)
90+
fun parseInner(xcp: XContentParser): ChainedAlertTrigger {
91+
var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified
92+
lateinit var name: String
93+
lateinit var severity: String
94+
lateinit var condition: Script
95+
val actions: MutableList<Action> = mutableListOf()
96+
97+
if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) {
98+
XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation)
99+
}
100+
101+
// If the parser began on START_OBJECT, move to the next token so that the while loop enters on
102+
// the fieldName (or END_OBJECT if it's empty).
103+
if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken()
104+
105+
while (xcp.currentToken() != XContentParser.Token.END_OBJECT) {
106+
val fieldName = xcp.currentName()
107+
108+
xcp.nextToken()
109+
when (fieldName) {
110+
ID_FIELD -> id = xcp.text()
111+
NAME_FIELD -> name = xcp.text()
112+
SEVERITY_FIELD -> severity = xcp.text()
113+
CONDITION_FIELD -> {
114+
xcp.nextToken()
115+
condition = Script.parse(xcp)
116+
require(condition.lang == Script.DEFAULT_SCRIPT_LANG) {
117+
"Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]"
118+
}
119+
xcp.nextToken()
120+
}
121+
ACTIONS_FIELD -> {
122+
XContentParserUtils.ensureExpectedToken(
123+
XContentParser.Token.START_ARRAY,
124+
xcp.currentToken(),
125+
xcp
126+
)
127+
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
128+
actions.add(Action.parse(xcp))
129+
}
130+
}
131+
}
132+
xcp.nextToken()
133+
}
134+
135+
return ChainedAlertTrigger(
136+
name = requireNotNull(name) { "Trigger name is null" },
137+
severity = requireNotNull(severity) { "Trigger severity is null" },
138+
condition = requireNotNull(condition) { "Trigger condition is null" },
139+
actions = requireNotNull(actions) { "Trigger actions are null" },
140+
id = requireNotNull(id) { "Trigger id is null." }
141+
)
142+
}
143+
144+
@JvmStatic
145+
@Throws(IOException::class)
146+
fun readFrom(sin: StreamInput): ChainedAlertTrigger {
147+
return ChainedAlertTrigger(sin)
148+
}
149+
}
150+
}

src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ interface Trigger : BaseModel {
1313
DOCUMENT_LEVEL_TRIGGER(DocumentLevelTrigger.DOCUMENT_LEVEL_TRIGGER_FIELD),
1414
QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD),
1515
BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD),
16-
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD);
16+
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD),
17+
CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD);
1718

1819
override fun toString(): String {
1920
return value
@@ -53,6 +54,7 @@ interface Trigger : BaseModel {
5354
Type.QUERY_LEVEL_TRIGGER -> QueryLevelTrigger(sin)
5455
Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin)
5556
Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin)
57+
Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin)
5658
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
5759
// enum can be null in Java
5860
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")

0 commit comments

Comments
 (0)