Skip to content

Commit 74aed08

Browse files
PPL Alerting: Execute Monitor and Monitor Stats (#1960)
* PPL Alerting initial commit Signed-off-by: Dennis Toepker <[email protected]> * removing unused get alerts params and adding serde tests for alertsv2 request and response Signed-off-by: Dennis Toepker <[email protected]> * adding get alerts rbac IT Signed-off-by: Dennis Toepker <[email protected]> * misc changes Signed-off-by: Dennis Toepker <[email protected]> * removing actioned upon TODO Signed-off-by: Dennis Toepker <[email protected]> * adding execute monitor request/response serde tests that are currently failing Signed-off-by: Dennis Toepker <[email protected]> * adding request response serde tests and passing in explicit readers and writers for PPLSQLMonitorRunResult serde Signed-off-by: Dennis Toepker <[email protected]> * finishing out request/response serde tests Signed-off-by: Dennis Toepker <[email protected]> * minor changes in tests Signed-off-by: Dennis Toepker <[email protected]> * adding debug logs for playground Signed-off-by: Dennis Toepker <[email protected]> * adding expires null check in alert expiration Signed-off-by: Dennis Toepker <[email protected]> * removing null check as expires duration should never be null Signed-off-by: Dennis Toepker <[email protected]> * cleanup Signed-off-by: Dennis Toepker <[email protected]> * updating timestamp field check to include any date type that includes the string date Signed-off-by: Dennis Toepker <[email protected]> * addressing PR comments Signed-off-by: Dennis Toepker <[email protected]> * misc logs changes Signed-off-by: Dennis Toepker <[email protected]> * changing default max query results to 1000 and replacing get alerts filters with constants Signed-off-by: Dennis Toepker <[email protected]> * putting query index parsing logic in try catch Signed-off-by: Dennis Toepker <[email protected]> * adding backtick trimming when reading index names in ppl query Signed-off-by: Dennis Toepker <[email protected]> * adding time measurements and logs Signed-off-by: Dennis Toepker <[email protected]> * refactoring PPL query execution call to use transport service with timeout instead of client Signed-off-by: Dennis Toepker <[email protected]> * adding monitor run timeout test Signed-off-by: Dennis Toepker <[email protected]> * removing return in timeout catch block to allow for monitors to get updated with last triggered times Signed-off-by: Dennis Toepker <[email protected]> * addressing pr comments Signed-off-by: Dennis Toepker <[email protected]> * addressing pr comments Signed-off-by: Dennis Toepker <[email protected]> * update monitor now ignores seqNo and primaryTerm, and eliminating monitor stats v2 api in favor of version parameter in existing monitor stats api Signed-off-by: Dennis Toepker <[email protected]> * adding experimental tags Signed-off-by: Dennis Toepker <[email protected]> --------- Signed-off-by: Dennis Toepker <[email protected]> Co-authored-by: Dennis Toepker <[email protected]>
1 parent d9614ba commit 74aed08

File tree

65 files changed

+3513
-393
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3513
-393
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
1515
import org.opensearch.alerting.action.SearchEmailAccountAction
1616
import org.opensearch.alerting.action.SearchEmailGroupAction
1717
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
18+
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Action
1819
import org.opensearch.alerting.actionv2.GetAlertsV2Action
1920
import org.opensearch.alerting.actionv2.GetMonitorV2Action
2021
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
@@ -62,6 +63,7 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
6263
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
6364
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
6465
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
66+
import org.opensearch.alerting.resthandlerv2.RestExecuteMonitorV2Action
6567
import org.opensearch.alerting.resthandlerv2.RestGetAlertsV2Action
6668
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
6769
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
@@ -99,6 +101,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
99101
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
100102
import org.opensearch.alerting.transport.TransportSearchMonitorAction
101103
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
104+
import org.opensearch.alerting.transportv2.TransportExecuteMonitorV2Action
102105
import org.opensearch.alerting.transportv2.TransportGetAlertsV2Action
103106
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
104107
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
@@ -245,10 +248,11 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
245248

246249
// Alerting V2
247250
RestIndexMonitorV2Action(),
251+
RestExecuteMonitorV2Action(),
248252
RestDeleteMonitorV2Action(),
249253
RestGetMonitorV2Action(),
250254
RestSearchMonitorV2Action(settings, clusterService),
251-
RestGetAlertsV2Action(),
255+
RestGetAlertsV2Action()
252256
)
253257
}
254258

@@ -288,6 +292,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
288292
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
289293
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
290294
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
295+
ActionPlugin.ActionHandler(ExecuteMonitorV2Action.INSTANCE, TransportExecuteMonitorV2Action::class.java),
291296
ActionPlugin.ActionHandler(GetAlertsV2Action.INSTANCE, TransportGetAlertsV2Action::class.java)
292297
)
293298
}
@@ -481,6 +486,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
481486
AlertingSettings.ALERT_V2_HISTORY_INDEX_MAX_AGE,
482487
AlertingSettings.ALERT_V2_HISTORY_MAX_DOCS,
483488
AlertingSettings.ALERT_V2_HISTORY_RETENTION_PERIOD,
489+
AlertingSettings.ALERT_V2_MONITOR_EXECUTION_MAX_DURATION,
484490
AlertingSettings.ALERTING_V2_MAX_MONITORS,
485491
AlertingSettings.ALERTING_V2_MAX_THROTTLE_DURATION,
486492
AlertingSettings.ALERTING_V2_MAX_EXPIRE_DURATION,

alerting/src/main/kotlin/org/opensearch/alerting/AlertingV2Utils.kt

Lines changed: 120 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,38 @@ package org.opensearch.alerting
77

88
import org.apache.lucene.search.TotalHits
99
import org.apache.lucene.search.TotalHits.Relation
10+
import org.opensearch.OpenSearchSecurityException
1011
import org.opensearch.action.search.SearchResponse
1112
import org.opensearch.action.search.ShardSearchFailure
1213
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_BASE_URI
1314
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_V2_BASE_URI
15+
import org.opensearch.alerting.action.GetDestinationsAction
16+
import org.opensearch.alerting.action.GetDestinationsRequest
17+
import org.opensearch.alerting.action.GetDestinationsResponse
18+
import org.opensearch.alerting.model.destination.Destination
1419
import org.opensearch.alerting.modelv2.MonitorV2
20+
import org.opensearch.alerting.opensearchapi.suspendUntil
21+
import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs
22+
import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.getNotificationConfigInfo
23+
import org.opensearch.alerting.util.destinationmigration.getTitle
24+
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
25+
import org.opensearch.alerting.util.destinationmigration.sendNotification
26+
import org.opensearch.alerting.util.isAllowed
27+
import org.opensearch.alerting.util.isTestAction
1528
import org.opensearch.commons.alerting.model.Monitor
1629
import org.opensearch.commons.alerting.model.ScheduledJob
30+
import org.opensearch.commons.alerting.model.Table
1731
import org.opensearch.commons.alerting.model.Workflow
32+
import org.opensearch.commons.alerting.model.action.Action
33+
import org.opensearch.commons.notifications.model.NotificationConfigInfo
1834
import org.opensearch.index.IndexNotFoundException
1935
import org.opensearch.search.SearchHits
2036
import org.opensearch.search.aggregations.InternalAggregations
2137
import org.opensearch.search.internal.InternalSearchResponse
2238
import org.opensearch.search.profile.SearchProfileShardResults
2339
import org.opensearch.search.suggest.Suggest
2440
import org.opensearch.transport.RemoteTransportException
41+
import org.opensearch.transport.client.node.NodeClient
2542
import java.util.Collections
2643

2744
object AlertingV2Utils {
@@ -31,8 +48,8 @@ object AlertingV2Utils {
3148
if (scheduledJob is MonitorV2) {
3249
return IllegalStateException(
3350
"The ID given corresponds to an Alerting V2 Monitor, but a V1 Monitor was expected. " +
34-
"If you wish to operate on a V1 Monitor (e.g. Per Query, Per Document, etc), please use " +
35-
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI."
51+
"If you wish to operate on a V2 Monitor (e.g. PPL Monitor), please use " +
52+
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI."
3653
)
3754
} else if (scheduledJob !is Monitor && scheduledJob !is Workflow) {
3855
return IllegalStateException(
@@ -49,8 +66,8 @@ object AlertingV2Utils {
4966
if (scheduledJob is Monitor || scheduledJob is Workflow) {
5067
return IllegalStateException(
5168
"The ID given corresponds to an Alerting V1 Monitor, but a V2 Monitor was expected. " +
52-
"If you wish to operate on a V2 Monitor (e.g. PPL Monitor), please use " +
53-
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI."
69+
"If you wish to operate on a V1 Monitor (e.g. Per Query, Per Document, etc), please use " +
70+
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI."
5471
)
5572
} else if (scheduledJob !is MonitorV2) {
5673
return IllegalStateException(
@@ -100,4 +117,103 @@ object AlertingV2Utils {
100117
SearchResponse.Clusters.EMPTY
101118
)
102119
}
120+
121+
suspend fun getConfigAndSendNotification(
122+
action: Action,
123+
monitorCtx: MonitorRunnerExecutionContext,
124+
subject: String?,
125+
message: String
126+
): String {
127+
val config = getConfigForNotificationAction(action, monitorCtx)
128+
if (config.destination == null && config.channel == null) {
129+
throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.destinationId}]")
130+
}
131+
132+
// Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type
133+
// just for Alerting integration tests
134+
if (config.destination?.isTestAction() == true) {
135+
return "test action"
136+
}
137+
138+
if (config.destination?.isAllowed(monitorCtx.allowList) == false) {
139+
throw IllegalStateException(
140+
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
141+
)
142+
}
143+
144+
var actionResponseContent = ""
145+
actionResponseContent = config.channel
146+
?.sendNotification(
147+
monitorCtx.client!!,
148+
config.channel.getTitle(subject),
149+
message
150+
) ?: actionResponseContent
151+
152+
actionResponseContent = config.destination
153+
?.buildLegacyBaseMessage(subject, message, monitorCtx.destinationContextFactory!!.getDestinationContext(config.destination))
154+
?.publishLegacyNotification(monitorCtx.client!!)
155+
?: actionResponseContent
156+
157+
return actionResponseContent
158+
}
159+
160+
/**
161+
* The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config
162+
* depending on whether the background migration process has already migrated it from a Destination to a Notification config.
163+
*
164+
* To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved.
165+
*/
166+
private suspend fun getConfigForNotificationAction(
167+
action: Action,
168+
monitorCtx: MonitorRunnerExecutionContext
169+
): NotificationActionConfigs {
170+
var destination: Destination? = null
171+
var notificationPermissionException: Exception? = null
172+
173+
var channel: NotificationConfigInfo? = null
174+
try {
175+
channel = getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId)
176+
} catch (e: OpenSearchSecurityException) {
177+
notificationPermissionException = e
178+
}
179+
180+
// If the channel was not found, try to retrieve the Destination
181+
if (channel == null) {
182+
destination = try {
183+
val table = Table(
184+
"asc",
185+
"destination.name.keyword",
186+
null,
187+
1,
188+
0,
189+
null
190+
)
191+
val getDestinationsRequest = GetDestinationsRequest(
192+
action.destinationId,
193+
0L,
194+
null,
195+
table,
196+
"ALL"
197+
)
198+
199+
val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil {
200+
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
201+
}
202+
getDestinationsResponse.destinations.firstOrNull()
203+
} catch (e: IllegalStateException) {
204+
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
205+
null
206+
} catch (e: OpenSearchSecurityException) {
207+
if (notificationPermissionException != null)
208+
throw notificationPermissionException
209+
else
210+
throw e
211+
}
212+
213+
if (destination == null && notificationPermissionException != null)
214+
throw notificationPermissionException
215+
}
216+
217+
return NotificationActionConfigs(destination, channel)
218+
}
103219
}

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt

Lines changed: 1 addition & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,19 @@
55

66
package org.opensearch.alerting
77

8-
import org.opensearch.OpenSearchSecurityException
9-
import org.opensearch.alerting.action.GetDestinationsAction
10-
import org.opensearch.alerting.action.GetDestinationsRequest
11-
import org.opensearch.alerting.action.GetDestinationsResponse
12-
import org.opensearch.alerting.model.destination.Destination
8+
import org.opensearch.alerting.AlertingV2Utils.getConfigAndSendNotification
139
import org.opensearch.alerting.opensearchapi.InjectorContextElement
14-
import org.opensearch.alerting.opensearchapi.suspendUntil
1510
import org.opensearch.alerting.opensearchapi.withClosableContext
1611
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
1712
import org.opensearch.alerting.script.TriggerExecutionContext
18-
import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs
19-
import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.getNotificationConfigInfo
20-
import org.opensearch.alerting.util.destinationmigration.getTitle
21-
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
22-
import org.opensearch.alerting.util.destinationmigration.sendNotification
23-
import org.opensearch.alerting.util.isAllowed
24-
import org.opensearch.alerting.util.isTestAction
2513
import org.opensearch.alerting.util.use
2614
import org.opensearch.commons.alerting.model.ActionRunResult
2715
import org.opensearch.commons.alerting.model.Monitor
2816
import org.opensearch.commons.alerting.model.MonitorRunResult
29-
import org.opensearch.commons.alerting.model.Table
3017
import org.opensearch.commons.alerting.model.WorkflowRunContext
3118
import org.opensearch.commons.alerting.model.action.Action
32-
import org.opensearch.commons.notifications.model.NotificationConfigInfo
3319
import org.opensearch.core.common.Strings
3420
import org.opensearch.transport.TransportService
35-
import org.opensearch.transport.client.node.NodeClient
3621
import java.time.Instant
3722

3823
abstract class MonitorRunner {
@@ -93,103 +78,4 @@ abstract class MonitorRunner {
9378
ActionRunResult(action.id, action.name, mapOf(), false, MonitorRunnerService.currentTime(), e)
9479
}
9580
}
96-
97-
protected suspend fun getConfigAndSendNotification(
98-
action: Action,
99-
monitorCtx: MonitorRunnerExecutionContext,
100-
subject: String?,
101-
message: String
102-
): String {
103-
val config = getConfigForNotificationAction(action, monitorCtx)
104-
if (config.destination == null && config.channel == null) {
105-
throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.destinationId}]")
106-
}
107-
108-
// Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type
109-
// just for Alerting integration tests
110-
if (config.destination?.isTestAction() == true) {
111-
return "test action"
112-
}
113-
114-
if (config.destination?.isAllowed(monitorCtx.allowList) == false) {
115-
throw IllegalStateException(
116-
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
117-
)
118-
}
119-
120-
var actionResponseContent = ""
121-
actionResponseContent = config.channel
122-
?.sendNotification(
123-
monitorCtx.client!!,
124-
config.channel.getTitle(subject),
125-
message
126-
) ?: actionResponseContent
127-
128-
actionResponseContent = config.destination
129-
?.buildLegacyBaseMessage(subject, message, monitorCtx.destinationContextFactory!!.getDestinationContext(config.destination))
130-
?.publishLegacyNotification(monitorCtx.client!!)
131-
?: actionResponseContent
132-
133-
return actionResponseContent
134-
}
135-
136-
/**
137-
* The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config
138-
* depending on whether the background migration process has already migrated it from a Destination to a Notification config.
139-
*
140-
* To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved.
141-
*/
142-
private suspend fun getConfigForNotificationAction(
143-
action: Action,
144-
monitorCtx: MonitorRunnerExecutionContext
145-
): NotificationActionConfigs {
146-
var destination: Destination? = null
147-
var notificationPermissionException: Exception? = null
148-
149-
var channel: NotificationConfigInfo? = null
150-
try {
151-
channel = getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId)
152-
} catch (e: OpenSearchSecurityException) {
153-
notificationPermissionException = e
154-
}
155-
156-
// If the channel was not found, try to retrieve the Destination
157-
if (channel == null) {
158-
destination = try {
159-
val table = Table(
160-
"asc",
161-
"destination.name.keyword",
162-
null,
163-
1,
164-
0,
165-
null
166-
)
167-
val getDestinationsRequest = GetDestinationsRequest(
168-
action.destinationId,
169-
0L,
170-
null,
171-
table,
172-
"ALL"
173-
)
174-
175-
val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil {
176-
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
177-
}
178-
getDestinationsResponse.destinations.firstOrNull()
179-
} catch (e: IllegalStateException) {
180-
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
181-
null
182-
} catch (e: OpenSearchSecurityException) {
183-
if (notificationPermissionException != null)
184-
throw notificationPermissionException
185-
else
186-
throw e
187-
}
188-
189-
if (destination == null && notificationPermissionException != null)
190-
throw notificationPermissionException
191-
}
192-
193-
return NotificationActionConfigs(destination, channel)
194-
}
19581
}

0 commit comments

Comments
 (0)