Skip to content

Commit 9130713

Browse files
committed
initial alert v2 history implementation
1 parent ad9b57e commit 9130713

File tree

9 files changed

+723
-143
lines changed

9 files changed

+723
-143
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertV2Expirer.kt

Lines changed: 0 additions & 112 deletions
This file was deleted.

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ import org.opensearch.threadpool.ThreadPool
135135
import org.opensearch.transport.client.Client
136136
import org.opensearch.watcher.ResourceWatcherService
137137
import java.util.function.Supplier
138+
import org.opensearch.alerting.alertsv2.AlertV2Indices
139+
import org.opensearch.alerting.alertsv2.AlertV2Mover
138140

139141
/**
140142
* Entry point of the OpenDistro for Elasticsearch alerting plugin
@@ -178,9 +180,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
178180
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
179181
lateinit var threadPool: ThreadPool
180182
lateinit var alertIndices: AlertIndices
183+
lateinit var alertV2Indices: AlertV2Indices
181184
lateinit var clusterService: ClusterService
182185
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
183-
lateinit var alertV2Expirer: AlertV2Expirer
186+
lateinit var alertV2Mover: AlertV2Mover
184187
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()
185188

186189
override fun getRestHandlers(
@@ -314,6 +317,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
314317
.registerSettings(settings)
315318
.registerThreadPool(threadPool)
316319
.registerAlertIndices(alertIndices)
320+
.registerAlertV2Indices(alertV2Indices)
317321
.registerInputService(
318322
InputService(
319323
client,
@@ -340,7 +344,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
340344
scheduler = JobScheduler(threadPool, runner)
341345
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
342346
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
343-
alertV2Expirer = AlertV2Expirer(client, threadPool, clusterService)
347+
alertV2Mover = AlertV2Mover(environment.settings(), client, threadPool, clusterService)
344348
this.threadPool = threadPool
345349
this.clusterService = clusterService
346350

@@ -368,7 +372,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
368372
commentsIndices,
369373
docLevelMonitorQueries,
370374
destinationMigrationCoordinator,
371-
alertV2Expirer,
375+
alertV2Mover,
372376
lockService,
373377
alertService,
374378
triggerService

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.opensearch.action.bulk.BackoffPolicy
99
import org.opensearch.alerting.alerts.AlertIndices
10+
import org.opensearch.alerting.alertsv2.AlertV2Indices
1011
import org.opensearch.alerting.core.lock.LockService
1112
import org.opensearch.alerting.model.destination.DestinationContextFactory
1213
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
@@ -35,6 +36,7 @@ data class MonitorRunnerExecutionContext(
3536
var settings: Settings? = null,
3637
var threadPool: ThreadPool? = null,
3738
var alertIndices: AlertIndices? = null,
39+
var alertV2Indices: AlertV2Indices? = null,
3840
var inputService: InputService? = null,
3941
var triggerService: TriggerService? = null,
4042
var alertService: AlertService? = null,

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ import java.time.Instant
9494
import java.time.LocalDateTime
9595
import java.time.ZoneOffset
9696
import java.util.UUID
97+
import org.opensearch.alerting.alertsv2.AlertV2Indices
9798
import kotlin.coroutines.CoroutineContext
9899

99100
object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
@@ -145,6 +146,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
145146
return this
146147
}
147148

149+
fun registerAlertV2Indices(alertV2Indices: AlertV2Indices): MonitorRunnerService {
150+
this.monitorCtx.alertV2Indices = alertV2Indices
151+
return this
152+
}
153+
148154
fun registerInputService(inputService: InputService): MonitorRunnerService {
149155
this.monitorCtx.inputService = inputService
150156
return this

alerting/src/main/kotlin/org/opensearch/alerting/PPLMonitorRunner.kt

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import java.time.Instant
4444
import java.time.ZoneOffset.UTC
4545
import java.time.format.DateTimeFormatter
4646
import java.time.temporal.ChronoUnit
47+
import org.opensearch.alerting.alertsv2.AlertV2Indices
4748

4849
object PPLMonitorRunner : MonitorV2Runner {
4950
private val logger = LogManager.getLogger(javaClass)
@@ -86,11 +87,10 @@ object PPLMonitorRunner : MonitorV2Runner {
8687
// use threadpool time for cross node consistency
8788
val timeOfCurrentExecution = Instant.ofEpochMilli(MonitorRunnerService.monitorCtx.threadPool!!.absoluteTimeInMillis())
8889

89-
// TODO: should alerting v1 and v2 alerts index be separate?
90+
// TODO: put alertV2s in their own index
9091
try {
91-
// TODO: write generated V2 alerts to existing alerts v1 index for now, revisit this decision
92-
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
93-
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
92+
monitorCtx.alertV2Indices!!.createOrUpdateAlertV2Index()
93+
monitorCtx.alertV2Indices!!.createOrUpdateInitialAlertV2HistoryIndex()
9494
} catch (e: Exception) {
9595
val id = if (pplMonitor.id.trim().isEmpty()) "_na_" else pplMonitor.id
9696
logger.error("Error loading alerts for monitorV2: $id", e)
@@ -121,21 +121,6 @@ object PPLMonitorRunner : MonitorV2Runner {
121121
}
122122
logger.info("suppression check passed, executing trigger ${pplTrigger.name} from monitor ${pplMonitor.name}")
123123

124-
// internal fun isActionActionable(action: Action, alert: Alert?): Boolean {
125-
// if (alert != null && alert.state == Alert.State.AUDIT)
126-
// return false
127-
// if (alert == null || action.throttle == null) {
128-
// return true
129-
// }
130-
// if (action.throttleEnabled) {
131-
// val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id }
132-
// val lastExecutionTime: Instant? = result?.lastExecutionTime
133-
// val throttledTimeBound = currentTime().minus(action.throttle!!.value.toLong(), action.throttle!!.unit)
134-
// return (lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound))
135-
// }
136-
// return true
137-
// }
138-
139124
// if trigger uses custom condition, append the custom condition to query, otherwise simply proceed
140125
val queryToExecute = if (pplTrigger.conditionType == ConditionType.NUMBER_OF_RESULTS) { // number of results trigger
141126
timeFilteredQuery
@@ -147,15 +132,15 @@ object PPLMonitorRunner : MonitorV2Runner {
147132
val queryResponseJson = executePplQuery(queryToExecute, nodeClient)
148133
logger.info("query execution results for trigger ${pplTrigger.name}: $queryResponseJson")
149134

150-
// retrieve only the relevant query response rows.
135+
// retrieve deep copies of only the relevant query response rows.
151136
// for num_results triggers, that's the entire response
152137
// for custom triggers, that's only rows that evaluated to true
153138
val relevantQueryResultRows = if (pplTrigger.conditionType == ConditionType.NUMBER_OF_RESULTS) {
154139
// number of results trigger
155140
getQueryResponseWithoutSize(queryResponseJson)
156141
} else {
157142
// custom condition trigger
158-
evaluateCustomConditionTrigger(queryResponseJson, pplTrigger)
143+
collectCustomConditionResults(queryResponseJson, pplTrigger)
159144
}
160145

161146
// retrieve the number of results
@@ -194,10 +179,6 @@ object PPLMonitorRunner : MonitorV2Runner {
194179
timeOfCurrentExecution
195180
)
196181

197-
// collect the generated alerts to be written to alerts index
198-
// if the trigger is on result_set mode
199-
// generatedAlerts.addAll(thisTriggersGeneratedAlerts)
200-
201182
// update the trigger's last execution time for future suppression checks
202183
pplTrigger.lastTriggeredTime = timeOfCurrentExecution
203184

@@ -354,7 +335,7 @@ object PPLMonitorRunner : MonitorV2Runner {
354335
return queryResponseDeepCopy
355336
}
356337

357-
private fun evaluateCustomConditionTrigger(customConditionQueryResponse: JSONObject, pplTrigger: PPLTrigger): JSONObject {
338+
private fun collectCustomConditionResults(customConditionQueryResponse: JSONObject, pplTrigger: PPLTrigger): JSONObject {
358339
// a PPL query with custom condition returning 0 results should imply a valid but not useful query.
359340
// do not trigger alert, but warn that query likely is not functioning as user intended
360341
if (customConditionQueryResponse.getLong("total") == 0L) {
@@ -445,7 +426,10 @@ object PPLMonitorRunner : MonitorV2Runner {
445426
alertV2s.add(alertV2)
446427
}
447428

448-
return alertV2s.toList() // return as immutable list
429+
// TODO: this is a magic number right now, make it a setting
430+
val alertsLimit = 10
431+
432+
return alertV2s.take(alertsLimit).toList() // return as immutable list
449433
}
450434

451435
private fun generateErrorAlert(
@@ -488,7 +472,7 @@ object PPLMonitorRunner : MonitorV2Runner {
488472

489473
var requestsToRetry = alerts.flatMap { alert ->
490474
listOf<DocWriteRequest<*>>(
491-
IndexRequest(AlertIndices.ALERT_INDEX)
475+
IndexRequest(AlertV2Indices.ALERT_V2_INDEX)
492476
.routing(pplMonitor.id) // set routing ID to PPL Monitor ID
493477
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
494478
.id(if (alert.id != Alert.NO_ID) alert.id else null)

0 commit comments

Comments
 (0)