Skip to content

Commit d088934

Browse files
authored
Adds workflows model and workflow actions for Alerting Plugin (#439)
* Adds workflows model and workflow actions for Alerting Plugin (#436) Signed-off-by: Surya Sashank Nistala <[email protected]> * adds tests for workflow request and response objects Signed-off-by: Surya Sashank Nistala <[email protected]> --------- Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 85c9b35 commit d088934

27 files changed

+1847
-4
lines changed

src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
1414
import org.opensearch.commons.alerting.action.AlertingActions
1515
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
1616
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
17+
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
18+
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
1719
import org.opensearch.commons.alerting.action.GetAlertsRequest
1820
import org.opensearch.commons.alerting.action.GetAlertsResponse
1921
import org.opensearch.commons.alerting.action.GetFindingsRequest
2022
import org.opensearch.commons.alerting.action.GetFindingsResponse
23+
import org.opensearch.commons.alerting.action.GetWorkflowRequest
24+
import org.opensearch.commons.alerting.action.GetWorkflowResponse
2125
import org.opensearch.commons.alerting.action.IndexMonitorRequest
2226
import org.opensearch.commons.alerting.action.IndexMonitorResponse
27+
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
28+
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
2329
import org.opensearch.commons.alerting.action.PublishFindingsRequest
2430
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
2531
import org.opensearch.commons.notifications.action.BaseResponse
@@ -55,6 +61,7 @@ object AlertingPluginInterface {
5561
}
5662
)
5763
}
64+
5865
fun deleteMonitor(
5966
client: NodeClient,
6067
request: DeleteMonitorRequest,
@@ -73,6 +80,49 @@ object AlertingPluginInterface {
7380
)
7481
}
7582

83+
/**
84+
* Index monitor interface.
85+
* @param client Node client for making transport action
86+
* @param request The request object
87+
* @param namedWriteableRegistry Registry for building aggregations
88+
* @param listener The listener for getting response
89+
*/
90+
fun indexWorkflow(
91+
client: NodeClient,
92+
request: IndexWorkflowRequest,
93+
listener: ActionListener<IndexWorkflowResponse>
94+
) {
95+
client.execute(
96+
AlertingActions.INDEX_WORKFLOW_ACTION_TYPE,
97+
request,
98+
wrapActionListener(listener) { response ->
99+
recreateObject(response) {
100+
IndexWorkflowResponse(
101+
it
102+
)
103+
}
104+
}
105+
)
106+
}
107+
108+
fun deleteWorkflow(
109+
client: NodeClient,
110+
request: DeleteWorkflowRequest,
111+
listener: ActionListener<DeleteWorkflowResponse>
112+
) {
113+
client.execute(
114+
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE,
115+
request,
116+
wrapActionListener(listener) { response ->
117+
recreateObject(response) {
118+
DeleteWorkflowResponse(
119+
it
120+
)
121+
}
122+
}
123+
)
124+
}
125+
76126
/**
77127
* Get Alerts interface.
78128
* @param client Node client for making transport action
@@ -97,6 +147,30 @@ object AlertingPluginInterface {
97147
)
98148
}
99149

150+
/**
151+
* Get Workflow interface.
152+
* @param client Node client for making transport action
153+
* @param request The request object
154+
* @param listener The listener for getting response
155+
*/
156+
fun getWorkflow(
157+
client: NodeClient,
158+
request: GetWorkflowRequest,
159+
listener: ActionListener<GetWorkflowResponse>
160+
) {
161+
client.execute(
162+
AlertingActions.GET_WORKFLOW_ACTION_TYPE,
163+
request,
164+
wrapActionListener(listener) { response ->
165+
recreateObject(response) {
166+
GetWorkflowResponse(
167+
it
168+
)
169+
}
170+
}
171+
)
172+
}
173+
100174
/**
101175
* Get Findings interface.
102176
* @param client Node client for making transport action

src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import org.opensearch.action.ActionType
88

99
object AlertingActions {
1010
const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write"
11+
const val INDEX_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/write"
1112
const val GET_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/get"
13+
const val GET_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/get"
1214
const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete"
15+
const val DELETE_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/delete"
1316
const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get"
1417
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
1518
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
@@ -18,12 +21,22 @@ object AlertingActions {
1821
val INDEX_MONITOR_ACTION_TYPE =
1922
ActionType(INDEX_MONITOR_ACTION_NAME, ::IndexMonitorResponse)
2023
@JvmField
24+
val INDEX_WORKFLOW_ACTION_TYPE =
25+
ActionType(INDEX_WORKFLOW_ACTION_NAME, ::IndexWorkflowResponse)
26+
@JvmField
2127
val GET_ALERTS_ACTION_TYPE =
2228
ActionType(GET_ALERTS_ACTION_NAME, ::GetAlertsResponse)
2329
@JvmField
30+
val GET_WORKFLOW_ACTION_TYPE =
31+
ActionType(GET_WORKFLOW_ACTION_NAME, ::GetWorkflowResponse)
32+
33+
@JvmField
2434
val DELETE_MONITOR_ACTION_TYPE =
2535
ActionType(DELETE_MONITOR_ACTION_NAME, ::DeleteMonitorResponse)
2636
@JvmField
37+
val DELETE_WORKFLOW_ACTION_TYPE =
38+
ActionType(DELETE_WORKFLOW_ACTION_NAME, ::DeleteWorkflowResponse)
39+
@JvmField
2740
val GET_FINDINGS_ACTION_TYPE =
2841
ActionType(GET_FINDINGS_ACTION_NAME, ::GetFindingsResponse)
2942
@JvmField
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.opensearch.commons.alerting.action
2+
3+
import org.opensearch.action.ActionRequest
4+
import org.opensearch.action.ActionRequestValidationException
5+
import org.opensearch.common.io.stream.StreamInput
6+
import org.opensearch.common.io.stream.StreamOutput
7+
import java.io.IOException
8+
9+
class DeleteWorkflowRequest : ActionRequest {
10+
11+
val workflowId: String
12+
/**
13+
* Flag that indicates whether the delegate monitors should be deleted or not.
14+
* If the flag is set to true, Delegate monitors will be deleted only in the case when they are part of the specified workflow and no other.
15+
*/
16+
val deleteDelegateMonitors: Boolean?
17+
18+
constructor(workflowId: String, deleteDelegateMonitors: Boolean?) : super() {
19+
this.workflowId = workflowId
20+
this.deleteDelegateMonitors = deleteDelegateMonitors
21+
}
22+
23+
@Throws(IOException::class)
24+
constructor(sin: StreamInput) : this(
25+
workflowId = sin.readString(),
26+
deleteDelegateMonitors = sin.readOptionalBoolean()
27+
)
28+
29+
override fun validate(): ActionRequestValidationException? {
30+
return null
31+
}
32+
33+
@Throws(IOException::class)
34+
override fun writeTo(out: StreamOutput) {
35+
out.writeString(workflowId)
36+
out.writeOptionalBoolean(deleteDelegateMonitors)
37+
}
38+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.opensearch.commons.alerting.action
2+
3+
import org.opensearch.common.io.stream.StreamInput
4+
import org.opensearch.common.io.stream.StreamOutput
5+
import org.opensearch.commons.alerting.util.IndexUtils
6+
import org.opensearch.commons.notifications.action.BaseResponse
7+
import org.opensearch.core.xcontent.ToXContent
8+
import org.opensearch.core.xcontent.XContentBuilder
9+
10+
class DeleteWorkflowResponse : BaseResponse {
11+
var id: String
12+
var version: Long
13+
var nonDeletedMonitors: List<String>? = null
14+
15+
constructor(
16+
id: String,
17+
version: Long,
18+
nonDeletedMonitors: List<String>? = null
19+
) : super() {
20+
this.id = id
21+
this.version = version
22+
this.nonDeletedMonitors = nonDeletedMonitors
23+
}
24+
25+
constructor(sin: StreamInput) : this(
26+
sin.readString(), // id
27+
sin.readLong(), // version
28+
sin.readOptionalStringList()
29+
)
30+
31+
override fun writeTo(out: StreamOutput) {
32+
out.writeString(id)
33+
out.writeLong(version)
34+
out.writeOptionalStringCollection(nonDeletedMonitors)
35+
}
36+
37+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
38+
return builder.startObject()
39+
.field(IndexUtils._ID, id)
40+
.field(IndexUtils._VERSION, version)
41+
.field(NON_DELETED_MONITORS, nonDeletedMonitors)
42+
.endObject()
43+
}
44+
45+
companion object {
46+
const val NON_DELETED_MONITORS = "NON_DELETED_MONITORS"
47+
}
48+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.alerting.action
7+
8+
import org.opensearch.action.ActionRequest
9+
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.common.io.stream.StreamInput
11+
import org.opensearch.common.io.stream.StreamOutput
12+
import org.opensearch.rest.RestRequest
13+
import java.io.IOException
14+
15+
class GetWorkflowRequest : ActionRequest {
16+
val workflowId: String
17+
val method: RestRequest.Method
18+
19+
constructor(
20+
workflowId: String,
21+
method: RestRequest.Method
22+
) : super() {
23+
this.workflowId = workflowId
24+
this.method = method
25+
}
26+
27+
@Throws(IOException::class)
28+
constructor(sin: StreamInput) : this(
29+
sin.readString(), // workflowId
30+
sin.readEnum(RestRequest.Method::class.java) // method
31+
)
32+
33+
override fun validate(): ActionRequestValidationException? {
34+
return null
35+
}
36+
37+
@Throws(IOException::class)
38+
override fun writeTo(out: StreamOutput) {
39+
out.writeString(workflowId)
40+
out.writeEnum(method)
41+
}
42+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.commons.alerting.action
7+
8+
import org.opensearch.common.io.stream.StreamInput
9+
import org.opensearch.common.io.stream.StreamOutput
10+
import org.opensearch.commons.alerting.model.Workflow
11+
import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID
12+
import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM
13+
import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO
14+
import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION
15+
import org.opensearch.commons.notifications.action.BaseResponse
16+
import org.opensearch.core.xcontent.ToXContent
17+
import org.opensearch.core.xcontent.XContentBuilder
18+
import org.opensearch.rest.RestStatus
19+
import java.io.IOException
20+
21+
class GetWorkflowResponse : BaseResponse {
22+
var id: String
23+
var version: Long
24+
var seqNo: Long
25+
var primaryTerm: Long
26+
private var status: RestStatus
27+
var workflow: Workflow?
28+
29+
constructor(
30+
id: String,
31+
version: Long,
32+
seqNo: Long,
33+
primaryTerm: Long,
34+
status: RestStatus,
35+
workflow: Workflow?
36+
) : super() {
37+
this.id = id
38+
this.version = version
39+
this.seqNo = seqNo
40+
this.primaryTerm = primaryTerm
41+
this.status = status
42+
this.workflow = workflow
43+
}
44+
45+
@Throws(IOException::class)
46+
constructor(sin: StreamInput) : this(
47+
sin.readString(), // id
48+
sin.readLong(), // version
49+
sin.readLong(), // seqNo
50+
sin.readLong(), // primaryTerm
51+
sin.readEnum(RestStatus::class.java), // RestStatus
52+
if (sin.readBoolean()) {
53+
Workflow.readFrom(sin) // monitor
54+
} else null
55+
)
56+
57+
@Throws(IOException::class)
58+
override fun writeTo(out: StreamOutput) {
59+
out.writeString(id)
60+
out.writeLong(version)
61+
out.writeLong(seqNo)
62+
out.writeLong(primaryTerm)
63+
out.writeEnum(status)
64+
if (workflow != null) {
65+
out.writeBoolean(true)
66+
workflow?.writeTo(out)
67+
} else {
68+
out.writeBoolean(false)
69+
}
70+
}
71+
72+
@Throws(IOException::class)
73+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
74+
builder.startObject()
75+
.field(_ID, id)
76+
.field(_VERSION, version)
77+
.field(_SEQ_NO, seqNo)
78+
.field(_PRIMARY_TERM, primaryTerm)
79+
if (workflow != null)
80+
builder.field("workflow", workflow)
81+
82+
return builder.endObject()
83+
}
84+
85+
override fun getStatus(): RestStatus {
86+
return this.status
87+
}
88+
}

0 commit comments

Comments
 (0)