Skip to content

Commit fd151de

Browse files
PPL Alerting: Delete Monitor, More V1/V2 Separation (#1968)
* PPL Alerting: Delete Monitor, More V1/V2 Separation Signed-off-by: Dennis Toepker <[email protected]> * making v1 v2 separation error messaging more actionable Signed-off-by: Dennis Toepker <[email protected]> --------- Signed-off-by: Dennis Toepker <[email protected]> Co-authored-by: Dennis Toepker <[email protected]>
1 parent 819599f commit fd151de

File tree

15 files changed

+482
-75
lines changed

15 files changed

+482
-75
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.opensearch.alerting.action.GetEmailGroupAction
1414
import org.opensearch.alerting.action.GetRemoteIndexesAction
1515
import org.opensearch.alerting.action.SearchEmailAccountAction
1616
import org.opensearch.alerting.action.SearchEmailGroupAction
17+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
1718
import org.opensearch.alerting.actionv2.GetMonitorV2Action
1819
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
1920
import org.opensearch.alerting.actionv2.SearchMonitorV2Action
@@ -55,6 +56,7 @@ import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
5556
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
5657
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
5758
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
59+
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
5860
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
5961
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
6062
import org.opensearch.alerting.resthandlerv2.RestSearchMonitorV2Action
@@ -90,6 +92,7 @@ import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
9092
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
9193
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
9294
import org.opensearch.alerting.transport.TransportSearchMonitorAction
95+
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
9396
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
9497
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
9598
import org.opensearch.alerting.transportv2.TransportSearchMonitorV2Action
@@ -233,6 +236,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
233236

234237
// Alerting V2
235238
RestIndexMonitorV2Action(),
239+
RestDeleteMonitorV2Action(),
236240
RestGetMonitorV2Action(),
237241
RestSearchMonitorV2Action(settings, clusterService),
238242
)
@@ -273,6 +277,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
273277
ActionPlugin.ActionHandler(IndexMonitorV2Action.INSTANCE, TransportIndexMonitorV2Action::class.java),
274278
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
275279
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
280+
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
276281
)
277282
}
278283

alerting/src/main/kotlin/org/opensearch/alerting/AlertingV2Utils.kt

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import org.apache.lucene.search.TotalHits
99
import org.apache.lucene.search.TotalHits.Relation
1010
import org.opensearch.action.search.SearchResponse
1111
import org.opensearch.action.search.ShardSearchFailure
12+
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_BASE_URI
13+
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_V2_BASE_URI
1214
import org.opensearch.alerting.modelv2.MonitorV2
1315
import org.opensearch.commons.alerting.model.Monitor
1416
import org.opensearch.commons.alerting.model.ScheduledJob
@@ -27,9 +29,16 @@ object AlertingV2Utils {
2729
// returns the exception to pass into actionListener.onFailure if not.
2830
fun validateMonitorV1(scheduledJob: ScheduledJob): Exception? {
2931
if (scheduledJob is MonitorV2) {
30-
return IllegalStateException("The ID given corresponds to a V2 Monitor, but a V1 Monitor was expected")
32+
return IllegalStateException(
33+
"The ID given corresponds to an Alerting V2 Monitor, but a V1 Monitor was expected. " +
34+
"If you wish to operate on a V1 Monitor (e.g. Per Query, Per Document, etc), please use " +
35+
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI."
36+
)
3137
} else if (scheduledJob !is Monitor && scheduledJob !is Workflow) {
32-
return IllegalStateException("The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}")
38+
return IllegalStateException(
39+
"The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}. " +
40+
"Please validate the ID and ensure it corresponds to a valid Monitor."
41+
)
3342
}
3443
return null
3544
}
@@ -38,9 +47,16 @@ object AlertingV2Utils {
3847
// returns the exception to pass into actionListener.onFailure if not.
3948
fun validateMonitorV2(scheduledJob: ScheduledJob): Exception? {
4049
if (scheduledJob is Monitor || scheduledJob is Workflow) {
41-
return IllegalStateException("The ID given corresponds to a V1 Monitor, but a V2 Monitor was expected")
50+
return IllegalStateException(
51+
"The ID given corresponds to an Alerting V1 Monitor, but a V2 Monitor was expected. " +
52+
"If you wish to operate on a V2 Monitor (e.g. PPL Monitor), please use " +
53+
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI."
54+
)
4255
} else if (scheduledJob !is MonitorV2) {
43-
return IllegalStateException("The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}")
56+
return IllegalStateException(
57+
"The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}. " +
58+
"Please validate the ID and ensure it corresponds to a valid Monitor."
59+
)
4460
}
4561
return null
4662
}

alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
1111
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
1212
import org.opensearch.action.search.SearchRequest
1313
import org.opensearch.action.search.SearchResponse
14+
import org.opensearch.alerting.AlertingV2Utils.validateMonitorV1
1415
import org.opensearch.alerting.opensearchapi.suspendUntil
1516
import org.opensearch.common.xcontent.LoggingDeprecationHandler
1617
import org.opensearch.common.xcontent.XContentType
@@ -132,7 +133,12 @@ class WorkflowService(
132133
xContentRegistry,
133134
LoggingDeprecationHandler.INSTANCE, hit.sourceAsString
134135
).use { hitsParser ->
135-
val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) as Monitor
136+
val scheduledJob = ScheduledJob.parse(hitsParser, hit.id, hit.version)
137+
validateMonitorV1(scheduledJob)?.let {
138+
throw OpenSearchException(it)
139+
}
140+
141+
val monitor = scheduledJob as Monitor
136142
monitors.add(monitor)
137143
}
138144
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionType
9+
10+
class DeleteMonitorV2Action private constructor() : ActionType<DeleteMonitorV2Response>(NAME, ::DeleteMonitorV2Response) {
11+
companion object {
12+
val INSTANCE = DeleteMonitorV2Action()
13+
const val NAME = "cluster:admin/opensearch/alerting/v2/monitor/delete"
14+
}
15+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionRequest
9+
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.action.support.WriteRequest
11+
import org.opensearch.core.common.io.stream.StreamInput
12+
import org.opensearch.core.common.io.stream.StreamOutput
13+
import java.io.IOException
14+
15+
class DeleteMonitorV2Request : ActionRequest {
16+
val monitorV2Id: String
17+
val refreshPolicy: WriteRequest.RefreshPolicy
18+
19+
constructor(monitorV2Id: String, refreshPolicy: WriteRequest.RefreshPolicy) : super() {
20+
this.monitorV2Id = monitorV2Id
21+
this.refreshPolicy = refreshPolicy
22+
}
23+
24+
@Throws(IOException::class)
25+
constructor(sin: StreamInput) : this(
26+
monitorV2Id = sin.readString(),
27+
refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin)
28+
)
29+
30+
override fun validate(): ActionRequestValidationException? {
31+
return null
32+
}
33+
34+
@Throws(IOException::class)
35+
override fun writeTo(out: StreamOutput) {
36+
out.writeString(monitorV2Id)
37+
refreshPolicy.writeTo(out)
38+
}
39+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.commons.alerting.util.IndexUtils
9+
import org.opensearch.commons.notifications.action.BaseResponse
10+
import org.opensearch.core.common.io.stream.StreamInput
11+
import org.opensearch.core.common.io.stream.StreamOutput
12+
import org.opensearch.core.xcontent.ToXContent
13+
import org.opensearch.core.xcontent.XContentBuilder
14+
15+
class DeleteMonitorV2Response : BaseResponse {
16+
var id: String
17+
var version: Long
18+
19+
constructor(
20+
id: String,
21+
version: Long
22+
) : super() {
23+
this.id = id
24+
this.version = version
25+
}
26+
27+
constructor(sin: StreamInput) : this(
28+
sin.readString(), // id
29+
sin.readLong() // version
30+
)
31+
32+
override fun writeTo(out: StreamOutput) {
33+
out.writeString(id)
34+
out.writeLong(version)
35+
}
36+
37+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
38+
return builder.startObject()
39+
.field(IndexUtils._ID, id)
40+
.field(IndexUtils._VERSION, version)
41+
.endObject()
42+
}
43+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.resthandlerv2
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.apache.logging.log4j.Logger
10+
import org.opensearch.action.support.WriteRequest.RefreshPolicy
11+
import org.opensearch.alerting.AlertingPlugin
12+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
13+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Request
14+
import org.opensearch.alerting.util.REFRESH
15+
import org.opensearch.rest.BaseRestHandler
16+
import org.opensearch.rest.RestHandler.Route
17+
import org.opensearch.rest.RestRequest
18+
import org.opensearch.rest.RestRequest.Method.DELETE
19+
import org.opensearch.rest.action.RestToXContentListener
20+
import org.opensearch.transport.client.node.NodeClient
21+
import java.io.IOException
22+
23+
private val log: Logger = LogManager.getLogger(RestDeleteMonitorV2Action::class.java)
24+
25+
class RestDeleteMonitorV2Action : BaseRestHandler() {
26+
27+
override fun getName(): String {
28+
return "delete_monitor_v2_action"
29+
}
30+
31+
override fun routes(): List<Route> {
32+
return mutableListOf(
33+
Route(
34+
DELETE,
35+
"${AlertingPlugin.MONITOR_V2_BASE_URI}/{monitorV2Id}"
36+
)
37+
)
38+
}
39+
40+
@Throws(IOException::class)
41+
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
42+
val monitorV2Id = request.param("monitorV2Id")
43+
log.info("${request.method()} ${AlertingPlugin.MONITOR_V2_BASE_URI}/$monitorV2Id")
44+
45+
val refreshPolicy = RefreshPolicy.parse(request.param(REFRESH, RefreshPolicy.IMMEDIATE.value))
46+
val deleteMonitorV2Request = DeleteMonitorV2Request(monitorV2Id, refreshPolicy)
47+
48+
return RestChannelConsumer { channel ->
49+
client.execute(DeleteMonitorV2Action.INSTANCE, deleteMonitorV2Request, RestToXContentListener(channel))
50+
}
51+
}
52+
}

alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.opensearch.action.support.IndicesOptions
2222
import org.opensearch.action.support.WriteRequest.RefreshPolicy
2323
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
2424
import org.opensearch.alerting.MonitorMetadataService
25+
import org.opensearch.alerting.actionv2.DeleteMonitorV2Response
2526
import org.opensearch.alerting.core.lock.LockModel
2627
import org.opensearch.alerting.core.lock.LockService
2728
import org.opensearch.alerting.opensearchapi.suspendUntil
@@ -74,6 +75,19 @@ object DeleteMonitorService :
7475
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
7576
}
7677

78+
/**
79+
* Deletes the monitorV2, which does not come with other metadata and queries
80+
* like doc level monitors
81+
* @param monitorV2Id monitorV2 ID to be deleted
82+
* @param refreshPolicy
83+
*/
84+
suspend fun deleteMonitorV2(monitorV2Id: String, refreshPolicy: RefreshPolicy): DeleteMonitorV2Response {
85+
val deleteResponse = deleteMonitor(monitorV2Id, refreshPolicy)
86+
deleteLock(monitorV2Id)
87+
return DeleteMonitorV2Response(deleteResponse.id, deleteResponse.version)
88+
}
89+
90+
// both Alerting v1 and v2 workflows flow through this function
7791
private suspend fun deleteMonitor(monitorId: String, refreshPolicy: RefreshPolicy): DeleteResponse {
7892
val deleteMonitorRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
7993
.setRefreshPolicy(refreshPolicy)
@@ -167,7 +181,12 @@ object DeleteMonitorService :
167181
}
168182

169183
private suspend fun deleteLock(monitor: Monitor) {
170-
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
184+
deleteLock(monitor.id)
185+
}
186+
187+
// both Alerting v1 and v2 workflows flow through this function
188+
private suspend fun deleteLock(monitorId: String) {
189+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitorId), it) }
171190
}
172191

173192
/**

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import org.opensearch.action.get.GetResponse
1616
import org.opensearch.action.support.ActionFilters
1717
import org.opensearch.action.support.HandledTransportAction
1818
import org.opensearch.action.support.WriteRequest.RefreshPolicy
19+
import org.opensearch.alerting.AlertingV2Utils.validateMonitorV1
1920
import org.opensearch.alerting.opensearchapi.suspendUntil
2021
import org.opensearch.alerting.service.DeleteMonitorService
2122
import org.opensearch.alerting.settings.AlertingSettings
@@ -87,7 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor(
8788
) {
8889
suspend fun resolveUserAndStart(refreshPolicy: RefreshPolicy) {
8990
try {
90-
val monitor = getMonitor()
91+
val monitor = getMonitor() ?: return // null means there was an issue retrieving the Monitor
9192

9293
val canDelete = user == null || !doFilterForUser(user) ||
9394
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)
@@ -115,11 +116,11 @@ class TransportDeleteMonitorAction @Inject constructor(
115116
}
116117
}
117118

118-
private suspend fun getMonitor(): Monitor {
119+
private suspend fun getMonitor(): Monitor? {
119120
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
120121

121122
val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
122-
if (getResponse.isExists == false) {
123+
if (!getResponse.isExists) {
123124
actionListener.onFailure(
124125
AlertingException.wrap(
125126
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
@@ -130,7 +131,16 @@ class TransportDeleteMonitorAction @Inject constructor(
130131
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
131132
getResponse.sourceAsBytesRef, XContentType.JSON
132133
)
133-
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
134+
val scheduledJob = ScheduledJob.parse(xcp, getResponse.id, getResponse.version)
135+
136+
validateMonitorV1(scheduledJob)?.let {
137+
actionListener.onFailure(AlertingException.wrap(it))
138+
return null
139+
}
140+
141+
val monitor = scheduledJob as Monitor
142+
143+
return monitor
134144
}
135145
}
136146
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.action.search.SearchResponse
2323
import org.opensearch.action.support.ActionFilters
2424
import org.opensearch.action.support.HandledTransportAction
2525
import org.opensearch.action.support.WriteRequest.RefreshPolicy
26+
import org.opensearch.alerting.AlertingV2Utils.validateMonitorV1
2627
import org.opensearch.alerting.core.lock.LockModel
2728
import org.opensearch.alerting.core.lock.LockService
2829
import org.opensearch.alerting.opensearchapi.addFilter
@@ -299,7 +300,13 @@ class TransportDeleteWorkflowAction @Inject constructor(
299300
xContentRegistry,
300301
LoggingDeprecationHandler.INSTANCE, hit.sourceAsString
301302
).use { hitsParser ->
302-
val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) as Monitor
303+
val scheduledJob = ScheduledJob.parse(hitsParser, hit.id, hit.version)
304+
305+
validateMonitorV1(scheduledJob)?.let {
306+
throw OpenSearchException(it)
307+
}
308+
309+
val monitor = scheduledJob as Monitor
303310
deletableMonitors.add(monitor)
304311
}
305312
}
@@ -327,12 +334,17 @@ class TransportDeleteWorkflowAction @Inject constructor(
327334
)
328335
}
329336

330-
private fun parseWorkflow(getResponse: GetResponse): Workflow {
337+
private fun parseWorkflow(getResponse: GetResponse): Workflow? {
331338
val xcp = XContentHelper.createParser(
332339
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
333340
getResponse.sourceAsBytesRef, XContentType.JSON
334341
)
335-
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow
342+
val scheduledJob = ScheduledJob.parse(xcp, getResponse.id, getResponse.version)
343+
validateMonitorV1(scheduledJob)?.let {
344+
actionListener.onFailure(AlertingException.wrap(it))
345+
return null
346+
}
347+
return scheduledJob as Workflow
336348
}
337349

338350
private suspend fun deleteWorkflow(deleteRequest: DeleteRequest): DeleteResponse {

0 commit comments

Comments
 (0)