Skip to content

Commit d9614ba

Browse files
PPL Alerting: Get Alerts and Alert Lifecycle (#1972)
* PPL Alerting: Get Alerts and Alert Lifecycle Signed-off-by: Dennis Toepker <[email protected]> * updating scheduled jobs schema version in alert indices IT Signed-off-by: Dennis Toepker <[email protected]> * enabling alerting v2 by default Signed-off-by: Dennis Toepker <[email protected]> * making get alerts response fields snake cased Signed-off-by: Dennis Toepker <[email protected]> * removing unused get alerts params and adding serde tests for alertsv2 request and response Signed-off-by: Dennis Toepker <[email protected]> * removing misleading comments Signed-off-by: Dennis Toepker <[email protected]> * minor changes Signed-off-by: Dennis Toepker <[email protected]> * changed some setting names to go under alerting prefix, and optimizing alert expire logic to search for expired alerts in OS query instead of scanning Alerts for expiration in memory Signed-off-by: Dennis Toepker <[email protected]> --------- Signed-off-by: Dennis Toepker <[email protected]> Co-authored-by: Dennis Toepker <[email protected]>
1 parent fd151de commit d9614ba

File tree

23 files changed

+1901
-38
lines changed

23 files changed

+1901
-38
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
1515
import org.opensearch.alerting.action.SearchEmailAccountAction
1616
import org.opensearch.alerting.action.SearchEmailGroupAction
1717
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
18+
import org.opensearch.alerting.actionv2.GetAlertsV2Action
1819
import org.opensearch.alerting.actionv2.GetMonitorV2Action
1920
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
2021
import org.opensearch.alerting.actionv2.SearchMonitorV2Action
2122
import org.opensearch.alerting.alerts.AlertIndices
2223
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
24+
import org.opensearch.alerting.alertsv2.AlertV2Indices
25+
import org.opensearch.alerting.alertsv2.AlertV2Indices.Companion.ALL_ALERT_V2_INDEX_PATTERN
26+
import org.opensearch.alerting.alertsv2.AlertV2Mover
2327
import org.opensearch.alerting.comments.CommentsIndices
2428
import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN
2529
import org.opensearch.alerting.core.JobSweeper
@@ -29,6 +33,7 @@ import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportActio
2933
import org.opensearch.alerting.core.lock.LockService
3034
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
3135
import org.opensearch.alerting.core.schedule.JobScheduler
36+
import org.opensearch.alerting.core.settings.AlertingV2Settings
3237
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
3338
import org.opensearch.alerting.core.settings.ScheduledJobSettings
3439
import org.opensearch.alerting.modelv2.MonitorV2
@@ -57,6 +62,7 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
5762
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
5863
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
5964
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
65+
import org.opensearch.alerting.resthandlerv2.RestGetAlertsV2Action
6066
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
6167
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
6268
import org.opensearch.alerting.resthandlerv2.RestSearchMonitorV2Action
@@ -93,6 +99,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
9399
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
94100
import org.opensearch.alerting.transport.TransportSearchMonitorAction
95101
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
102+
import org.opensearch.alerting.transportv2.TransportGetAlertsV2Action
96103
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
97104
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
98105
import org.opensearch.alerting.transportv2.TransportSearchMonitorV2Action
@@ -194,8 +201,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
194201
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
195202
lateinit var threadPool: ThreadPool
196203
lateinit var alertIndices: AlertIndices
204+
lateinit var alertV2Indices: AlertV2Indices
197205
lateinit var clusterService: ClusterService
198206
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
207+
lateinit var alertV2Mover: AlertV2Mover
199208
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()
200209

201210
override fun getRestHandlers(
@@ -239,6 +248,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
239248
RestDeleteMonitorV2Action(),
240249
RestGetMonitorV2Action(),
241250
RestSearchMonitorV2Action(settings, clusterService),
251+
RestGetAlertsV2Action(),
242252
)
243253
}
244254

@@ -278,6 +288,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
278288
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
279289
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
280290
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
291+
ActionPlugin.ActionHandler(GetAlertsV2Action.INSTANCE, TransportGetAlertsV2Action::class.java)
281292
)
282293
}
283294

@@ -314,6 +325,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
314325
val settings = environment.settings()
315326
val lockService = LockService(client, clusterService)
316327
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
328+
alertV2Indices = AlertV2Indices(settings, client, threadPool, clusterService)
317329
val alertService = AlertService(client, xContentRegistry, alertIndices)
318330
val triggerService = TriggerService(scriptService)
319331
runner = MonitorRunnerService
@@ -325,6 +337,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
325337
.registerSettings(settings)
326338
.registerThreadPool(threadPool)
327339
.registerAlertIndices(alertIndices)
340+
.registerAlertV2Indices(alertV2Indices)
328341
.registerInputService(
329342
InputService(
330343
client,
@@ -351,6 +364,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
351364
scheduler = JobScheduler(threadPool, runner)
352365
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
353366
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
367+
alertV2Mover = AlertV2Mover(environment.settings(), client, threadPool, clusterService, xContentRegistry)
354368
this.threadPool = threadPool
355369
this.clusterService = clusterService
356370

@@ -378,6 +392,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
378392
commentsIndices,
379393
docLevelMonitorQueries,
380394
destinationMigrationCoordinator,
395+
alertV2Mover,
381396
lockService,
382397
alertService,
383398
triggerService
@@ -475,7 +490,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
475490
AlertingSettings.ALERT_V2_QUERY_RESULTS_MAX_SIZE,
476491
AlertingSettings.ALERT_V2_PER_RESULT_TRIGGER_MAX_ALERTS,
477492
AlertingSettings.NOTIFICATION_SUBJECT_SOURCE_MAX_LENGTH,
478-
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH
493+
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH,
494+
AlertingV2Settings.ALERTING_V2_ENABLED
479495
)
480496
}
481497

@@ -494,6 +510,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
494510
SystemIndexDescriptor(ALL_ALERT_INDEX_PATTERN, "Alerting Plugin system index pattern"),
495511
SystemIndexDescriptor(SCHEDULED_JOBS_INDEX, "Alerting Plugin Configuration index"),
496512
SystemIndexDescriptor(ALL_COMMENTS_INDEX_PATTERN, "Alerting Comments system index pattern"),
513+
SystemIndexDescriptor(ALL_ALERT_V2_INDEX_PATTERN, "Alerting V2 Alerts index pattern")
497514
)
498515
}
499516

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.opensearch.action.bulk.BackoffPolicy
99
import org.opensearch.alerting.alerts.AlertIndices
10+
import org.opensearch.alerting.alertsv2.AlertV2Indices
1011
import org.opensearch.alerting.core.lock.LockService
1112
import org.opensearch.alerting.model.destination.DestinationContextFactory
1213
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
@@ -35,6 +36,7 @@ data class MonitorRunnerExecutionContext(
3536
var settings: Settings? = null,
3637
var threadPool: ThreadPool? = null,
3738
var alertIndices: AlertIndices? = null,
39+
var alertV2Indices: AlertV2Indices? = null,
3840
var inputService: InputService? = null,
3941
var triggerService: TriggerService? = null,
4042
var alertService: AlertService? = null,

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ import org.opensearch.alerting.action.ExecuteWorkflowRequest
2323
import org.opensearch.alerting.action.ExecuteWorkflowResponse
2424
import org.opensearch.alerting.alerts.AlertIndices
2525
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
26+
import org.opensearch.alerting.alertsv2.AlertV2Indices
27+
import org.opensearch.alerting.alertsv2.AlertV2Mover.Companion.moveAlertV2s
2628
import org.opensearch.alerting.core.JobRunner
2729
import org.opensearch.alerting.core.ScheduledJobIndices
2830
import org.opensearch.alerting.core.lock.LockModel
2931
import org.opensearch.alerting.core.lock.LockService
3032
import org.opensearch.alerting.model.destination.DestinationContextFactory
33+
import org.opensearch.alerting.modelv2.MonitorV2
3134
import org.opensearch.alerting.opensearchapi.retry
3235
import org.opensearch.alerting.opensearchapi.suspendUntil
3336
import org.opensearch.alerting.remote.monitors.RemoteDocumentLevelMonitorRunner
@@ -137,6 +140,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
137140
return this
138141
}
139142

143+
fun registerAlertV2Indices(alertV2Indices: AlertV2Indices): MonitorRunnerService {
144+
this.monitorCtx.alertV2Indices = alertV2Indices
145+
return this
146+
}
147+
140148
fun registerInputService(inputService: InputService): MonitorRunnerService {
141149
this.monitorCtx.inputService = inputService
142150
return this
@@ -316,6 +324,18 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
316324
logger.error("Failed to move active alerts for monitor [${job.id}].", e)
317325
}
318326
}
327+
} else if (job is MonitorV2) {
328+
launch {
329+
try {
330+
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
331+
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
332+
moveAlertV2s(job.id, job, monitorCtx)
333+
}
334+
}
335+
} catch (e: Exception) {
336+
logger.error("Failed to move active alertV2s for monitorV2 [${job.id}].", e)
337+
}
338+
}
319339
} else {
320340
throw IllegalArgumentException("Invalid job type")
321341
}
@@ -339,6 +359,15 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
339359
} catch (e: Exception) {
340360
logger.error("Failed to move active alerts for monitor [$jobId].", e)
341361
}
362+
try {
363+
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
364+
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
365+
moveAlertV2s(jobId, null, monitorCtx)
366+
}
367+
}
368+
} catch (e: Exception) {
369+
logger.error("Failed to move active alertV2s for monitorV2 [$jobId].", e)
370+
}
342371
}
343372
}
344373

@@ -433,20 +462,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
433462
): MonitorRunResult<*> {
434463
// Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping
435464
// has not been updated.
436-
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
437-
IndexUtils.updateIndexMapping(
438-
ScheduledJob.SCHEDULED_JOBS_INDEX,
439-
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
440-
object : ActionListener<AcknowledgedResponse> {
441-
override fun onResponse(response: AcknowledgedResponse) {
442-
}
443-
444-
override fun onFailure(t: Exception) {
445-
logger.error("Failed to update config index schema", t)
446-
}
447-
}
448-
)
449-
}
465+
updateAlertingConfigIndexSchema()
450466

451467
if (job is Workflow) {
452468
logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun")
@@ -582,4 +598,21 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
582598
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
583599
.execute()
584600
}
601+
602+
private fun updateAlertingConfigIndexSchema() {
603+
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
604+
IndexUtils.updateIndexMapping(
605+
ScheduledJob.SCHEDULED_JOBS_INDEX,
606+
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
607+
object : ActionListener<AcknowledgedResponse> {
608+
override fun onResponse(response: AcknowledgedResponse) {
609+
}
610+
611+
override fun onFailure(t: Exception) {
612+
logger.error("Failed to update config index schema", t)
613+
}
614+
}
615+
)
616+
}
617+
}
585618
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionType
9+
10+
class GetAlertsV2Action private constructor() : ActionType<GetAlertsV2Response>(NAME, ::GetAlertsV2Response) {
11+
companion object {
12+
val INSTANCE = GetAlertsV2Action()
13+
const val NAME = "cluster:admin/opensearch/alerting/v2/alerts/get"
14+
}
15+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.action.ActionRequest
9+
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.commons.alerting.model.Table
11+
import org.opensearch.core.common.io.stream.StreamInput
12+
import org.opensearch.core.common.io.stream.StreamOutput
13+
import java.io.IOException
14+
15+
class GetAlertsV2Request : ActionRequest {
16+
val table: Table
17+
val severityLevel: String
18+
val monitorV2Ids: List<String>?
19+
20+
constructor(
21+
table: Table,
22+
severityLevel: String,
23+
monitorV2Ids: List<String>? = null,
24+
) : super() {
25+
this.table = table
26+
this.severityLevel = severityLevel
27+
this.monitorV2Ids = monitorV2Ids
28+
}
29+
30+
@Throws(IOException::class)
31+
constructor(sin: StreamInput) : this(
32+
table = Table.readFrom(sin),
33+
severityLevel = sin.readString(),
34+
monitorV2Ids = sin.readOptionalStringList(),
35+
)
36+
37+
override fun validate(): ActionRequestValidationException? {
38+
return null
39+
}
40+
41+
@Throws(IOException::class)
42+
override fun writeTo(out: StreamOutput) {
43+
table.writeTo(out)
44+
out.writeString(severityLevel)
45+
out.writeOptionalStringCollection(monitorV2Ids)
46+
}
47+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.actionv2
7+
8+
import org.opensearch.alerting.modelv2.AlertV2
9+
import org.opensearch.commons.notifications.action.BaseResponse
10+
import org.opensearch.core.common.io.stream.StreamInput
11+
import org.opensearch.core.common.io.stream.StreamOutput
12+
import org.opensearch.core.xcontent.ToXContent
13+
import org.opensearch.core.xcontent.XContentBuilder
14+
import java.io.IOException
15+
import java.util.Collections
16+
17+
class GetAlertsV2Response : BaseResponse {
18+
val alertV2s: List<AlertV2>
19+
20+
// totalAlertV2s is not the same as the size of alertV2s because there can be 30 alerts from the request, but
21+
// the request only asked for 5 alerts, so totalAlertV2s will be 30, but alertV2s will only contain 5 alerts
22+
val totalAlertV2s: Int?
23+
24+
constructor(
25+
alertV2s: List<AlertV2>,
26+
totalAlertV2s: Int?
27+
) : super() {
28+
this.alertV2s = alertV2s
29+
this.totalAlertV2s = totalAlertV2s
30+
}
31+
32+
@Throws(IOException::class)
33+
constructor(sin: StreamInput) : this(
34+
alertV2s = Collections.unmodifiableList(sin.readList(::AlertV2)),
35+
totalAlertV2s = sin.readOptionalInt()
36+
)
37+
38+
@Throws(IOException::class)
39+
override fun writeTo(out: StreamOutput) {
40+
out.writeCollection(alertV2s)
41+
out.writeOptionalInt(totalAlertV2s)
42+
}
43+
44+
@Throws(IOException::class)
45+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
46+
builder.startObject()
47+
.field("alerts_v2", alertV2s)
48+
.field("total_alerts_v2", totalAlertV2s)
49+
50+
return builder.endObject()
51+
}
52+
}

0 commit comments

Comments
 (0)