Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Action
import org.opensearch.alerting.actionv2.GetAlertsV2Action
import org.opensearch.alerting.actionv2.GetMonitorV2Action
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
Expand Down Expand Up @@ -62,6 +63,7 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestExecuteMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestGetAlertsV2Action
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
Expand Down Expand Up @@ -99,6 +101,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
import org.opensearch.alerting.transportv2.TransportExecuteMonitorV2Action
import org.opensearch.alerting.transportv2.TransportGetAlertsV2Action
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
Expand Down Expand Up @@ -245,10 +248,11 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

// Alerting V2
RestIndexMonitorV2Action(),
RestExecuteMonitorV2Action(),
RestDeleteMonitorV2Action(),
RestGetMonitorV2Action(),
RestSearchMonitorV2Action(settings, clusterService),
RestGetAlertsV2Action(),
RestGetAlertsV2Action()
)
}

Expand Down Expand Up @@ -288,6 +292,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
ActionPlugin.ActionHandler(ExecuteMonitorV2Action.INSTANCE, TransportExecuteMonitorV2Action::class.java),
ActionPlugin.ActionHandler(GetAlertsV2Action.INSTANCE, TransportGetAlertsV2Action::class.java)
)
}
Expand Down Expand Up @@ -481,6 +486,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_V2_HISTORY_INDEX_MAX_AGE,
AlertingSettings.ALERT_V2_HISTORY_MAX_DOCS,
AlertingSettings.ALERT_V2_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERT_V2_MONITOR_EXECUTION_MAX_DURATION,
AlertingSettings.ALERTING_V2_MAX_MONITORS,
AlertingSettings.ALERTING_V2_MAX_THROTTLE_DURATION,
AlertingSettings.ALERTING_V2_MAX_EXPIRE_DURATION,
Expand Down
124 changes: 120 additions & 4 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingV2Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,38 @@ package org.opensearch.alerting

import org.apache.lucene.search.TotalHits
import org.apache.lucene.search.TotalHits.Relation
import org.opensearch.OpenSearchSecurityException
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.search.ShardSearchFailure
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_BASE_URI
import org.opensearch.alerting.AlertingPlugin.Companion.MONITOR_V2_BASE_URI
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetDestinationsRequest
import org.opensearch.alerting.action.GetDestinationsResponse
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.modelv2.MonitorV2
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs
import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.getNotificationConfigInfo
import org.opensearch.alerting.util.destinationmigration.getTitle
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
import org.opensearch.alerting.util.destinationmigration.sendNotification
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isTestAction
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Table
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.index.IndexNotFoundException
import org.opensearch.search.SearchHits
import org.opensearch.search.aggregations.InternalAggregations
import org.opensearch.search.internal.InternalSearchResponse
import org.opensearch.search.profile.SearchProfileShardResults
import org.opensearch.search.suggest.Suggest
import org.opensearch.transport.RemoteTransportException
import org.opensearch.transport.client.node.NodeClient
import java.util.Collections

object AlertingV2Utils {
Expand All @@ -31,8 +48,8 @@ object AlertingV2Utils {
if (scheduledJob is MonitorV2) {
return IllegalStateException(
"The ID given corresponds to an Alerting V2 Monitor, but a V1 Monitor was expected. " +
"If you wish to operate on a V1 Monitor (e.g. Per Query, Per Document, etc), please use " +
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI."
"If you wish to operate on a V2 Monitor (e.g. PPL Monitor), please use " +
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI."
)
} else if (scheduledJob !is Monitor && scheduledJob !is Workflow) {
return IllegalStateException(
Expand All @@ -49,8 +66,8 @@ object AlertingV2Utils {
if (scheduledJob is Monitor || scheduledJob is Workflow) {
return IllegalStateException(
"The ID given corresponds to an Alerting V1 Monitor, but a V2 Monitor was expected. " +
"If you wish to operate on a V2 Monitor (e.g. PPL Monitor), please use " +
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI."
"If you wish to operate on a V1 Monitor (e.g. Per Query, Per Document, etc), please use " +
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI."
)
} else if (scheduledJob !is MonitorV2) {
return IllegalStateException(
Expand Down Expand Up @@ -100,4 +117,103 @@ object AlertingV2Utils {
SearchResponse.Clusters.EMPTY
)
}

suspend fun getConfigAndSendNotification(
action: Action,
monitorCtx: MonitorRunnerExecutionContext,
subject: String?,
message: String
): String {
val config = getConfigForNotificationAction(action, monitorCtx)
if (config.destination == null && config.channel == null) {
throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.destinationId}]")
}

// Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type
// just for Alerting integration tests
if (config.destination?.isTestAction() == true) {
return "test action"
}

if (config.destination?.isAllowed(monitorCtx.allowList) == false) {
throw IllegalStateException(
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
)
}

var actionResponseContent = ""
actionResponseContent = config.channel
?.sendNotification(
monitorCtx.client!!,
config.channel.getTitle(subject),
message
) ?: actionResponseContent

actionResponseContent = config.destination
?.buildLegacyBaseMessage(subject, message, monitorCtx.destinationContextFactory!!.getDestinationContext(config.destination))
?.publishLegacyNotification(monitorCtx.client!!)
?: actionResponseContent

return actionResponseContent
}

/**
* The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config
* depending on whether the background migration process has already migrated it from a Destination to a Notification config.
*
* To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved.
*/
private suspend fun getConfigForNotificationAction(
action: Action,
monitorCtx: MonitorRunnerExecutionContext
): NotificationActionConfigs {
var destination: Destination? = null
var notificationPermissionException: Exception? = null

var channel: NotificationConfigInfo? = null
try {
channel = getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId)
} catch (e: OpenSearchSecurityException) {
notificationPermissionException = e
}

// If the channel was not found, try to retrieve the Destination
if (channel == null) {
destination = try {
val table = Table(
"asc",
"destination.name.keyword",
null,
1,
0,
null
)
val getDestinationsRequest = GetDestinationsRequest(
action.destinationId,
0L,
null,
table,
"ALL"
)

val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
}
getDestinationsResponse.destinations.firstOrNull()
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
} catch (e: OpenSearchSecurityException) {
if (notificationPermissionException != null)
throw notificationPermissionException
else
throw e
}

if (destination == null && notificationPermissionException != null)
throw notificationPermissionException
}

return NotificationActionConfigs(destination, channel)
}
}
116 changes: 1 addition & 115 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,19 @@

package org.opensearch.alerting

import org.opensearch.OpenSearchSecurityException
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetDestinationsRequest
import org.opensearch.alerting.action.GetDestinationsResponse
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.AlertingV2Utils.getConfigAndSendNotification
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs
import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.getNotificationConfigInfo
import org.opensearch.alerting.util.destinationmigration.getTitle
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
import org.opensearch.alerting.util.destinationmigration.sendNotification
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isTestAction
import org.opensearch.alerting.util.use
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.Table
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.core.common.Strings
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.node.NodeClient
import java.time.Instant

abstract class MonitorRunner {
Expand Down Expand Up @@ -93,103 +78,4 @@ abstract class MonitorRunner {
ActionRunResult(action.id, action.name, mapOf(), false, MonitorRunnerService.currentTime(), e)
}
}

protected suspend fun getConfigAndSendNotification(
action: Action,
monitorCtx: MonitorRunnerExecutionContext,
subject: String?,
message: String
): String {
val config = getConfigForNotificationAction(action, monitorCtx)
if (config.destination == null && config.channel == null) {
throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.destinationId}]")
}

// Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type
// just for Alerting integration tests
if (config.destination?.isTestAction() == true) {
return "test action"
}

if (config.destination?.isAllowed(monitorCtx.allowList) == false) {
throw IllegalStateException(
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
)
}

var actionResponseContent = ""
actionResponseContent = config.channel
?.sendNotification(
monitorCtx.client!!,
config.channel.getTitle(subject),
message
) ?: actionResponseContent

actionResponseContent = config.destination
?.buildLegacyBaseMessage(subject, message, monitorCtx.destinationContextFactory!!.getDestinationContext(config.destination))
?.publishLegacyNotification(monitorCtx.client!!)
?: actionResponseContent

return actionResponseContent
}

/**
* The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config
* depending on whether the background migration process has already migrated it from a Destination to a Notification config.
*
* To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved.
*/
private suspend fun getConfigForNotificationAction(
action: Action,
monitorCtx: MonitorRunnerExecutionContext
): NotificationActionConfigs {
var destination: Destination? = null
var notificationPermissionException: Exception? = null

var channel: NotificationConfigInfo? = null
try {
channel = getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId)
} catch (e: OpenSearchSecurityException) {
notificationPermissionException = e
}

// If the channel was not found, try to retrieve the Destination
if (channel == null) {
destination = try {
val table = Table(
"asc",
"destination.name.keyword",
null,
1,
0,
null
)
val getDestinationsRequest = GetDestinationsRequest(
action.destinationId,
0L,
null,
table,
"ALL"
)

val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
}
getDestinationsResponse.destinations.firstOrNull()
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
} catch (e: OpenSearchSecurityException) {
if (notificationPermissionException != null)
throw notificationPermissionException
else
throw e
}

if (destination == null && notificationPermissionException != null)
throw notificationPermissionException
}

return NotificationActionConfigs(destination, channel)
}
}
Loading
Loading