diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/AlertV2.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/AlertV2.kt new file mode 100644 index 00000000..5e0030ad --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/AlertV2.kt @@ -0,0 +1,202 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.lucene.uid.Versions +import org.opensearch.commons.alerting.model.Alert.Companion.ALERT_ID_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.ALERT_VERSION_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.ERROR_MESSAGE_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.EXECUTION_ID_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.MONITOR_ID_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.MONITOR_NAME_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.MONITOR_VERSION_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.NO_ID +import org.opensearch.commons.alerting.model.Alert.Companion.NO_VERSION +import org.opensearch.commons.alerting.model.Alert.Companion.SCHEMA_VERSION_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.SEVERITY_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.TRIGGER_ID_FIELD +import org.opensearch.commons.alerting.model.Alert.Companion.TRIGGER_NAME_FIELD +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.nonOptionalTimeField +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import java.io.IOException +import java.time.Instant + +data class AlertV2( + val id: String = NO_ID, + val version: Long = NO_VERSION, + val schemaVersion: Int = NO_SCHEMA_VERSION, + val monitorId: String, + val monitorName: String, + val monitorVersion: Long, +// val monitorUser: User?, + val triggerId: String, + val triggerName: String, + val queryResults: Map, + val triggeredTime: Instant, + val expirationTime: Instant?, + val errorMessage: String? = null, + val severity: String, + val executionId: String? = null +) : Writeable, ToXContent { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + version = sin.readLong(), + schemaVersion = sin.readInt(), + monitorId = sin.readString(), + monitorName = sin.readString(), + monitorVersion = sin.readLong(), +// monitorUser = if (sin.readBoolean()) { +// User(sin) +// } else { +// null +// }, + triggerId = sin.readString(), + triggerName = sin.readString(), + queryResults = sin.readMap()!!.toMap(), + triggeredTime = sin.readInstant(), + expirationTime = sin.readOptionalInstant(), + errorMessage = sin.readOptionalString(), + severity = sin.readString(), + executionId = sin.readOptionalString() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeInt(schemaVersion) + out.writeString(monitorId) + out.writeString(monitorName) + out.writeLong(monitorVersion) +// out.writeBoolean(monitorUser != null) +// monitorUser?.writeTo(out) + out.writeString(triggerId) + out.writeString(triggerName) + out.writeMap(queryResults) + out.writeInstant(triggeredTime) + out.writeOptionalInstant(expirationTime) + out.writeOptionalString(errorMessage) + out.writeString(severity) + out.writeOptionalString(executionId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(ALERT_ID_FIELD, id) + .field(ALERT_VERSION_FIELD, version) + .field(MONITOR_ID_FIELD, monitorId) + .field(SCHEMA_VERSION_FIELD, schemaVersion) + .field(MONITOR_VERSION_FIELD, monitorVersion) + .field(MONITOR_NAME_FIELD, monitorName) + .field(EXECUTION_ID_FIELD, executionId) + .field(TRIGGER_ID_FIELD, triggerId) + .field(TRIGGER_NAME_FIELD, triggerName) + .field(QUERY_RESULTS_FIELD, queryResults) + .field(ERROR_MESSAGE_FIELD, errorMessage) + .field(SEVERITY_FIELD, severity) + .nonOptionalTimeField(TRIGGERED_TIME_FIELD, triggeredTime) + .optionalTimeField(EXPIRATION_TIME_FIELD, expirationTime) + .endObject() + +// if (!secure) { +// builder.optionalUserField(MONITOR_USER_FIELD, monitorUser) +// } + + return builder + } + + fun asTemplateArg(): Map { + return mapOf( + ALERT_ID_FIELD to id, + ALERT_VERSION_FIELD to version, + ERROR_MESSAGE_FIELD to errorMessage, + EXECUTION_ID_FIELD to executionId, + EXPIRATION_TIME_FIELD to expirationTime?.toEpochMilli(), + SEVERITY_FIELD to severity + ) + } + + companion object { + const val TRIGGERED_TIME_FIELD = "triggered_time" + const val EXPIRATION_TIME_FIELD = "expiration_time" + const val QUERY_RESULTS_FIELD = "query_results" + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): AlertV2 { + var schemaVersion = NO_SCHEMA_VERSION + lateinit var monitorId: String + lateinit var monitorName: String + var monitorVersion: Long = Versions.NOT_FOUND +// var monitorUser: User? = null + lateinit var triggerId: String + lateinit var triggerName: String + var queryResults: Map = mapOf() + lateinit var severity: String + var triggeredTime: Instant? = null + var expirationTime: Instant? = null + var errorMessage: String? = null + var executionId: String? = null + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + MONITOR_ID_FIELD -> monitorId = xcp.text() + SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() + MONITOR_NAME_FIELD -> monitorName = xcp.text() + MONITOR_VERSION_FIELD -> monitorVersion = xcp.longValue() +// MONITOR_USER_FIELD -> +// monitorUser = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { +// null +// } else { +// User.parse(xcp) +// } + TRIGGER_ID_FIELD -> triggerId = xcp.text() + TRIGGER_NAME_FIELD -> triggerName = xcp.text() + QUERY_RESULTS_FIELD -> queryResults = xcp.map() + TRIGGERED_TIME_FIELD -> triggeredTime = xcp.instant() + EXPIRATION_TIME_FIELD -> expirationTime = xcp.instant() + ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull() + EXECUTION_ID_FIELD -> executionId = xcp.textOrNull() + SEVERITY_FIELD -> severity = xcp.text() + } + } + + return AlertV2( + id = id, + version = version, + schemaVersion = schemaVersion, + monitorId = requireNotNull(monitorId), + monitorName = requireNotNull(monitorName), + monitorVersion = monitorVersion, +// monitorUser = monitorUser, + triggerId = requireNotNull(triggerId), + triggerName = requireNotNull(triggerName), + queryResults = requireNotNull(queryResults), + triggeredTime = requireNotNull(triggeredTime), + expirationTime = expirationTime, + errorMessage = errorMessage, + severity = severity, + executionId = executionId + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): AlertV2 { + return AlertV2(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorV2.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorV2.kt new file mode 100644 index 00000000..489e1624 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorV2.kt @@ -0,0 +1,102 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.PPLMonitor.Companion.PPL_MONITOR_TYPE +import org.opensearch.core.ParseField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.time.Instant + +interface MonitorV2 : ScheduledJob { + override val id: String + override val version: Long + override val name: String + override val enabled: Boolean + override val schedule: Schedule + override val lastUpdateTime: Instant // required for scheduled job maintenance + override val enabledTime: Instant? // required for scheduled job maintenance + val triggers: List + val schemaVersion: Int // for updating monitors + val lookBackWindow: TimeValue? // how far back to look when querying data during monitor execution + + fun asTemplateArg(): Map + + enum class MonitorV2Type(val value: String) { + PPL_MONITOR(PPL_MONITOR_TYPE); + + override fun toString(): String { + return value + } + + companion object { + fun enumFromString(value: String): MonitorV2Type? { + return MonitorV2Type.entries.find { it.value == value } + } + } + } + + companion object { + // scheduled job field names + const val MONITOR_V2_TYPE = "monitor_v2" // scheduled job type is MonitorV2 + + // field names + const val NAME_FIELD = "name" + const val MONITOR_TYPE_FIELD = "monitor_type" + const val ENABLED_FIELD = "enabled" + const val SCHEDULE_FIELD = "schedule" + const val LAST_UPDATE_TIME_FIELD = "last_update_time" + const val ENABLED_TIME_FIELD = "enabled_time" + const val TRIGGERS_FIELD = "triggers" + const val LOOK_BACK_WINDOW_FIELD = "look_back_window" + + // default values + const val NO_ID = "" + const val NO_VERSION = 1L + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + ScheduledJob::class.java, + ParseField(MONITOR_V2_TYPE), + CheckedFunction { parse(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): MonitorV2 { + /* parse outer object for monitorV2 type, then delegate to correct monitorV2 parser */ + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) // outer monitor object start + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) // monitor type field name + val monitorTypeText = xcp.currentName() + val monitorType = MonitorV2Type.enumFromString(monitorTypeText) + ?: throw IllegalStateException("when parsing MonitorV2, received invalid monitor type: $monitorTypeText") + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) // inner monitor object start + + return when (monitorType) { + MonitorV2Type.PPL_MONITOR -> PPLMonitor.parse(xcp) + } + } + + fun readFrom(sin: StreamInput): MonitorV2 { + return when (val monitorType = sin.readEnum(MonitorV2Type::class.java)) { + MonitorV2Type.PPL_MONITOR -> PPLMonitor(sin) + else -> throw IllegalStateException("Unexpected input \"$monitorType\" when reading MonitorV2") + } + } + + fun writeTo(out: StreamOutput, monitorV2: MonitorV2) { + when (monitorV2) { + is PPLMonitor -> { + out.writeEnum(MonitorV2.MonitorV2Type.PPL_MONITOR) + monitorV2.writeTo(out) + } + } + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLMonitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLMonitor.kt new file mode 100644 index 00000000..09a32009 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLMonitor.kt @@ -0,0 +1,328 @@ +package org.opensearch.commons.alerting.model + +import org.apache.logging.log4j.LogManager +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.Monitor.Companion.SCHEMA_VERSION_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.ENABLED_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.ENABLED_TIME_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.LAST_UPDATE_TIME_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.LOOK_BACK_WINDOW_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.MONITOR_TYPE_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.MONITOR_V2_TYPE +import org.opensearch.commons.alerting.model.MonitorV2.Companion.NAME_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.NO_ID +import org.opensearch.commons.alerting.model.MonitorV2.Companion.NO_VERSION +import org.opensearch.commons.alerting.model.MonitorV2.Companion.SCHEDULE_FIELD +import org.opensearch.commons.alerting.model.MonitorV2.Companion.TRIGGERS_FIELD +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.nonOptionalTimeField +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.time.Instant + +private val logger = LogManager.getLogger(PPLMonitor::class.java) + +// TODO: probably change this to be called PPLSQLMonitor. A PPL Monitor and SQL Monitor +// would have the exact same functionality, except the choice of language +// when calling PPL/SQL plugin's execute API would be different. +// we dont need 2 different monitor types for that, just a simple if check +// for query language at monitor execution time +data class PPLMonitor( + override val id: String = NO_ID, + override val version: Long = NO_VERSION, + override val name: String, + override val enabled: Boolean, + override val schedule: Schedule, + override val lookBackWindow: TimeValue? = null, + override val lastUpdateTime: Instant, + override val enabledTime: Instant?, + override val triggers: List, + override val schemaVersion: Int = NO_SCHEMA_VERSION, + val queryLanguage: QueryLanguage = QueryLanguage.PPL, // default to PPL, SQL not currently supported + val query: String +) : MonitorV2 { + + // specify scheduled job type + override val type = MONITOR_V2_TYPE + + override fun fromDocument(id: String, version: Long): PPLMonitor = copy(id = id, version = version) + + init { + // SQL monitors are not yet supported + if (this.queryLanguage == QueryLanguage.SQL) { + throw IllegalStateException("Monitors with SQL queries are not supported") + } + + // for checking trigger ID uniqueness + val triggerIds = mutableSetOf() + triggers.forEach { trigger -> + require(triggerIds.add(trigger.id)) { "Duplicate trigger id: ${trigger.id}. Trigger ids must be unique." } + } + + if (enabled) { + requireNotNull(enabledTime) + } else { + require(enabledTime == null) + } + + triggers.forEach { trigger -> + require(trigger is PPLTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$PPL_MONITOR_TYPE]" } + } + + // TODO: create setting for max triggers and check for max triggers here + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + version = sin.readLong(), + name = sin.readString(), + enabled = sin.readBoolean(), + schedule = Schedule.readFrom(sin), + lookBackWindow = TimeValue.parseTimeValue(sin.readString(), PLACEHOLDER_LOOK_BACK_WINDOW_SETTING_NAME), + lastUpdateTime = sin.readInstant(), + enabledTime = sin.readOptionalInstant(), + triggers = sin.readList(PPLTrigger::readFrom), + schemaVersion = sin.readInt(), + queryLanguage = sin.readEnum(QueryLanguage::class.java), + query = sin.readString() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() // overall start object + + // if this is being written as ScheduledJob, add extra object layer and add ScheduledJob + // related metadata, default to false + if (params.paramAsBoolean("with_type", false)) { + builder.startObject(MONITOR_V2_TYPE) + } + + // wrap PPLMonitor in outer object named after its monitor type + // required for MonitorV2 XContentParser to first encounter this, + // read in monitor type, then delegate to correct parse() function + builder.startObject(PPL_MONITOR_TYPE) // monitor type start object + + builder.field(NAME_FIELD, name) + builder.field(SCHEDULE_FIELD, schedule) + builder.field(LOOK_BACK_WINDOW_FIELD, lookBackWindow?.toHumanReadableString(0)) + builder.field(ENABLED_FIELD, enabled) + builder.nonOptionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime) + builder.optionalTimeField(ENABLED_TIME_FIELD, enabledTime) + builder.field(TRIGGERS_FIELD, triggers.toTypedArray()) + builder.field(SCHEMA_VERSION_FIELD, schemaVersion) + builder.field(QUERY_LANGUAGE_FIELD, queryLanguage.value) + builder.field(QUERY_FIELD, query) + + builder.endObject() // monitor type end object + + // if ScheduledJob metadata was added, end the extra object layer that was created + if (params.paramAsBoolean("with_type", false)) { + builder.endObject() + } + + builder.endObject() // overall end object + + return builder + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeString(name) + out.writeBoolean(enabled) + if (schedule is CronSchedule) { + out.writeEnum(Schedule.TYPE.CRON) + } else { + out.writeEnum(Schedule.TYPE.INTERVAL) + } + + out.writeBoolean(lookBackWindow != null) + lookBackWindow?.let { out.writeString(lookBackWindow.toHumanReadableString(0)) } + + out.writeInstant(lastUpdateTime) + out.writeOptionalInstant(enabledTime) + out.writeVInt(triggers.size) + triggers.forEach { it.writeTo(out) } + out.writeInt(schemaVersion) + out.writeEnum(queryLanguage) + out.writeString(query) + } + + override fun asTemplateArg(): Map { + return mapOf( + _ID to id, + _VERSION to version, + NAME_FIELD to name, + ENABLED_FIELD to enabled, + SCHEDULE_FIELD to schedule, + LOOK_BACK_WINDOW_FIELD to lookBackWindow?.toHumanReadableString(0), + LAST_UPDATE_TIME_FIELD to lastUpdateTime.toEpochMilli(), + ENABLED_TIME_FIELD to enabledTime?.toEpochMilli(), + TRIGGERS_FIELD to triggers, + QUERY_LANGUAGE_FIELD to queryLanguage.value, + QUERY_FIELD to query + ) + } + + enum class QueryLanguage(val value: String) { + PPL(PPL_QUERY_LANGUAGE), + SQL(SQL_QUERY_LANGUAGE); + + companion object { + fun enumFromString(value: String): QueryLanguage? = QueryLanguage.entries.firstOrNull { it.value == value } + } + } + + companion object { + // monitor type name + const val PPL_MONITOR_TYPE = "ppl_monitor" // TODO: eventually change to SQL_PPL_MONITOR_TYPE + + // query languages + const val PPL_QUERY_LANGUAGE = "ppl" + const val SQL_QUERY_LANGUAGE = "sql" + + // field names + const val QUERY_LANGUAGE_FIELD = "query_language" + const val QUERY_FIELD = "query" + + // mock setting name used when parsing TimeValue + // TimeValue class is usually reserved for declaring settings, but we're using it + // outside that use case here, which is why we need these placeholders + private const val PLACEHOLDER_LOOK_BACK_WINDOW_SETTING_NAME = "ppl_monitor_look_back_window" + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): PPLMonitor { + var name: String? = null + var monitorType: String = PPL_MONITOR_TYPE + var enabled = true + var schedule: Schedule? = null + var lookBackWindow: TimeValue? = null + var lastUpdateTime: Instant? = null + var enabledTime: Instant? = null + val triggers: MutableList = mutableListOf() + var schemaVersion = NO_SCHEMA_VERSION + var queryLanguage: QueryLanguage = QueryLanguage.PPL // default to PPL + var query: String? = null + + /* parse */ + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + NAME_FIELD -> name = xcp.text() + MONITOR_TYPE_FIELD -> monitorType = xcp.text() + ENABLED_FIELD -> enabled = xcp.booleanValue() + SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) + LOOK_BACK_WINDOW_FIELD -> { + lookBackWindow = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + val input = xcp.text() + TimeValue.parseTimeValue(input, PLACEHOLDER_LOOK_BACK_WINDOW_SETTING_NAME) // throws IllegalArgumentException if there's parsing error + } + } + LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant() + ENABLED_TIME_FIELD -> enabledTime = xcp.instant() + TRIGGERS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + triggers.add(PPLTrigger.parseInner(xcp)) + } + } + SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() + QUERY_LANGUAGE_FIELD -> { + val input = xcp.text() + val enumMatchResult = QueryLanguage.enumFromString(input) + ?: throw IllegalArgumentException("Invalid value for $QUERY_LANGUAGE_FIELD: $input. Supported values are ${QueryLanguage.entries.map { it.value }}") + queryLanguage = enumMatchResult + } + QUERY_FIELD -> query = xcp.text() + else -> throw IllegalArgumentException("Unexpected field \"$fieldName\" when parsing PPL Monitor") + } + } + + /* validations */ + + // ensure MonitorV2 XContent being parsed by PPLMonitor class is PPL Monitor type + if (monitorType != PPL_MONITOR_TYPE) { + throw IllegalArgumentException("Invalid monitor type: $monitorType") + } + + // ensure there's at least 1 trigger + if (triggers.isEmpty()) { + throw IllegalArgumentException("Monitor must include at least 1 trigger") + } + + // ensure the trigger suppress durations are valid + triggers.forEach { trigger -> + trigger.suppressDuration?.let { suppressDuration -> + // TODO: these max and min values are completely arbitrary, make them settings + val minValue = TimeValue.timeValueMinutes(1) + val maxValue = TimeValue.timeValueDays(5) + + require(suppressDuration <= maxValue) { "Suppress duration must be at most $maxValue but was $suppressDuration" } + + require(suppressDuration >= minValue) { "Suppress duration must be at least $minValue but was $suppressDuration" } + } + } + + // if enabled, set time of MonitorV2 creation/update is set as enable time + if (enabled && enabledTime == null) { + enabledTime = Instant.now() + } else if (!enabled) { + enabledTime = null + } + + lastUpdateTime = lastUpdateTime ?: Instant.now() + + // check for required fields + requireNotNull(name) { "Monitor name is null" } + requireNotNull(schedule) { "Schedule is null" } + requireNotNull(queryLanguage) { "Query language is null" } + requireNotNull(query) { "Query is null" } + requireNotNull(lastUpdateTime) { "Last update time is null" } + + if (schedule is IntervalSchedule && lookBackWindow != null) { + throw IllegalArgumentException("Look back windows only supported for CRON schedules") + } + + if (queryLanguage == QueryLanguage.SQL) { + throw IllegalArgumentException("SQL queries are not supported. Please use a PPL query.") + } + + /* return PPLMonitor */ + return PPLMonitor( + id, + version, + name, + enabled, + schedule, + lookBackWindow, + lastUpdateTime, + enabledTime, + triggers, + schemaVersion, + queryLanguage, + query + ) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt new file mode 100644 index 00000000..0e5d5d41 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt @@ -0,0 +1,313 @@ +package org.opensearch.commons.alerting.model + +import org.apache.logging.log4j.LogManager +import org.opensearch.common.CheckedFunction +import org.opensearch.common.UUIDs +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.TriggerV2.Companion.ACTIONS_FIELD +import org.opensearch.commons.alerting.model.TriggerV2.Companion.EXPIRE_FIELD +import org.opensearch.commons.alerting.model.TriggerV2.Companion.ID_FIELD +import org.opensearch.commons.alerting.model.TriggerV2.Companion.LAST_TRIGGERED_FIELD +import org.opensearch.commons.alerting.model.TriggerV2.Companion.NAME_FIELD +import org.opensearch.commons.alerting.model.TriggerV2.Companion.SEVERITY_FIELD +import org.opensearch.commons.alerting.model.TriggerV2.Companion.SUPPRESS_FIELD +import org.opensearch.commons.alerting.model.TriggerV2.Severity +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.ParseField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.time.Instant + +private val logger = LogManager.getLogger(PPLTrigger::class.java) + +data class PPLTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: Severity, + override val suppressDuration: TimeValue?, + override val expireDuration: TimeValue?, + override var lastTriggeredTime: Instant?, + override val actions: List, + val mode: TriggerMode, // result_set or per_result + val conditionType: ConditionType, + val numResultsCondition: NumResultsCondition?, + val numResultsValue: Long?, + val customCondition: String? +) : TriggerV2 { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readString(), // name + sin.readEnum(Severity::class.java), // severity + // parseTimeValue() is typically used to parse OpenSearch settings + // the second param is supposed to accept a setting name, but here we're passing in our own name + TimeValue.parseTimeValue(sin.readString(), PLACEHOLDER_SUPPRESS_SETTING_NAME), // suppressDuration + TimeValue.parseTimeValue(sin.readString(), PLACEHOLDER_EXPIRE_SETTING_NAME), // expireDuration + sin.readOptionalInstant(), // lastTriggeredTime + sin.readList(::Action), // actions + sin.readEnum(TriggerMode::class.java), // trigger mode + sin.readEnum(ConditionType::class.java), // condition type + if (sin.readBoolean()) sin.readEnum(NumResultsCondition::class.java) else null, // num results condition + sin.readOptionalLong(), // num results value + sin.readOptionalString() // custom condition + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeEnum(severity) + + out.writeBoolean(suppressDuration != null) + suppressDuration?.let { out.writeString(suppressDuration.toHumanReadableString(0)) } + + out.writeBoolean(expireDuration != null) + expireDuration?.let { out.writeString(expireDuration.toHumanReadableString(0)) } + + out.writeOptionalInstant(lastTriggeredTime) + out.writeCollection(actions) + out.writeEnum(mode) + out.writeEnum(conditionType) + + out.writeBoolean(numResultsCondition != null) + numResultsCondition?.let { out.writeEnum(numResultsCondition) } + + out.writeOptionalLong(numResultsValue) + out.writeOptionalString(customCondition) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + builder.field(ID_FIELD, id) + builder.field(NAME_FIELD, name) + builder.field(SEVERITY_FIELD, severity.value) + builder.field(SUPPRESS_FIELD, suppressDuration?.toHumanReadableString(0)) + builder.field(EXPIRE_FIELD, expireDuration?.toHumanReadableString(0)) + builder.optionalTimeField(LAST_TRIGGERED_FIELD, lastTriggeredTime) + builder.field(ACTIONS_FIELD, actions.toTypedArray()) + builder.field(MODE_FIELD, mode.value) + builder.field(CONDITION_TYPE_FIELD, conditionType.value) + numResultsCondition?.let { builder.field(NUM_RESULTS_CONDITION_FIELD, numResultsCondition.value) } + numResultsValue?.let { builder.field(NUM_RESULTS_VALUE_FIELD, numResultsValue) } + customCondition?.let { builder.field(CUSTOM_CONDITION_FIELD, customCondition) } + builder.endObject() + return builder + } + + fun asTemplateArg(): Map { + return mapOf( + ID_FIELD to id, + NAME_FIELD to name, + SEVERITY_FIELD to severity.value, + SUPPRESS_FIELD to suppressDuration?.toHumanReadableString(0), + EXPIRE_FIELD to expireDuration?.toHumanReadableString(0), + ACTIONS_FIELD to actions.map { it.asTemplateArg() }, + MODE_FIELD to mode.value, + CONDITION_TYPE_FIELD to conditionType.value, + NUM_RESULTS_CONDITION_FIELD to numResultsCondition?.value, + NUM_RESULTS_VALUE_FIELD to numResultsValue, + CUSTOM_CONDITION_FIELD to customCondition + ) + } + + enum class TriggerMode(val value: String) { + RESULT_SET("result_set"), + PER_RESULT("per_result"); + + companion object { + fun enumFromString(value: String): TriggerMode? = entries.firstOrNull { it.value == value } + } + } + + enum class ConditionType(val value: String) { + NUMBER_OF_RESULTS("number_of_results"), + CUSTOM("custom"); + + companion object { + fun enumFromString(value: String): ConditionType? = entries.firstOrNull { it.value == value } + } + } + + enum class NumResultsCondition(val value: String) { + GREATER_THAN(">"), + GREATER_THAN_EQUAL(">="), + LESS_THAN("<"), + LESS_THAN_EQUAL("<="), + EQUAL("=="), + NOT_EQUAL("!="); + + companion object { + fun enumFromString(value: String): NumResultsCondition? = entries.firstOrNull { it.value == value } + } + } + + companion object { + // trigger wrapper object field name + const val PPL_TRIGGER_FIELD = "ppl_trigger" + + // field names + const val MODE_FIELD = "mode" + const val CONDITION_TYPE_FIELD = "type" + const val NUM_RESULTS_CONDITION_FIELD = "num_results_condition" + const val NUM_RESULTS_VALUE_FIELD = "num_results_value" + const val CUSTOM_CONDITION_FIELD = "custom_condition" + + // mock setting name used when parsing TimeValue + // TimeValue class is usually reserved for declaring settings, but we're using it + // outside that use case here, which is why we need these placeholders + private const val PLACEHOLDER_SUPPRESS_SETTING_NAME = "ppl_trigger_suppress_duration" + private const val PLACEHOLDER_EXPIRE_SETTING_NAME = "ppl_trigger_expire_duration" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + TriggerV2::class.java, + ParseField(PPL_TRIGGER_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): PPLTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + var name: String? = null + var severity: Severity? = null + var suppressDuration: TimeValue? = null + var expireDuration: TimeValue? = null + var lastTriggeredTime: Instant? = null + val actions: MutableList = mutableListOf() + var mode: TriggerMode? = null + var conditionType: ConditionType? = null + var numResultsCondition: NumResultsCondition? = null + var numResultsValue: Long? = null + var customCondition: String? = null + + /* parse */ + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) // outer trigger object start + + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> { + val input = xcp.text() + val enumMatchResult = Severity.enumFromString(input) + ?: throw IllegalArgumentException("Invalid value for $SEVERITY_FIELD: $input. Supported values are ${Severity.entries.map { it.value }}") + severity = enumMatchResult + } + MODE_FIELD -> { + val input = xcp.text() + val enumMatchResult = TriggerMode.enumFromString(input) + ?: throw IllegalArgumentException("Invalid value for $MODE_FIELD: $input. Supported values are ${TriggerMode.entries.map { it.value }}") + mode = enumMatchResult + } + CONDITION_TYPE_FIELD -> { + val input = xcp.text() + val enumMatchResult = ConditionType.enumFromString(input) + ?: throw IllegalArgumentException("Invalid value for $CONDITION_TYPE_FIELD: $input. Supported values are ${ConditionType.entries.map { it.value }}") + conditionType = enumMatchResult + } + NUM_RESULTS_CONDITION_FIELD -> { + numResultsCondition = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + val input = xcp.text() + val enumMatchResult = NumResultsCondition.enumFromString(input) + ?: throw IllegalArgumentException("Invalid value for $NUM_RESULTS_CONDITION_FIELD: $input. Supported values are ${NumResultsCondition.entries.map { it.value }}") + enumMatchResult + } + } + NUM_RESULTS_VALUE_FIELD -> { + numResultsValue = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + xcp.longValue() + } + } + CUSTOM_CONDITION_FIELD -> { + customCondition = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + xcp.text() + } + } + SUPPRESS_FIELD -> { + suppressDuration = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + val input = xcp.text() + TimeValue.parseTimeValue(input, PLACEHOLDER_SUPPRESS_SETTING_NAME) // throws IllegalArgumentException if there's parsing error + } + } + EXPIRE_FIELD -> { + expireDuration = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + val input = xcp.text() + TimeValue.parseTimeValue(input, PLACEHOLDER_EXPIRE_SETTING_NAME) // throws IllegalArgumentException if there's parsing error + } + } + LAST_TRIGGERED_FIELD -> lastTriggeredTime = xcp.instant() + ACTIONS_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + else -> throw IllegalArgumentException("Unexpected field $fieldName when parsing PPL Trigger") + } + } + + /* validations */ + requireNotNull(name) { "Trigger name must be included" } + requireNotNull(severity) { "Trigger severity must be included" } + requireNotNull(mode) { "Trigger mode must be included" } + requireNotNull(conditionType) { "Trigger condition type must be included" } + + when (conditionType) { + ConditionType.NUMBER_OF_RESULTS -> { + requireNotNull(numResultsCondition) { "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, $NUM_RESULTS_CONDITION_FIELD must be included" } + requireNotNull(numResultsValue) { "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, $NUM_RESULTS_VALUE_FIELD must be included" } + require(customCondition == null) { "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, $CUSTOM_CONDITION_FIELD must not be included" } + } + ConditionType.CUSTOM -> { + requireNotNull(customCondition) { "if trigger condition is of type ${ConditionType.CUSTOM.value}, $CUSTOM_CONDITION_FIELD must be included" } + require(numResultsCondition == null) { "if trigger condition is of type ${ConditionType.CUSTOM.value}, $NUM_RESULTS_CONDITION_FIELD must not be included" } + require(numResultsValue == null) { "if trigger condition is of type ${ConditionType.CUSTOM.value}, $NUM_RESULTS_VALUE_FIELD must not be included" } + } + } + + // 3. prepare and return PPLTrigger object + return PPLTrigger( + id, + name, + severity, + suppressDuration, + expireDuration, + lastTriggeredTime, + actions, + mode, + conditionType, + numResultsCondition, + numResultsValue, + customCondition + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): PPLTrigger { + return PPLTrigger(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerV2.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerV2.kt new file mode 100644 index 00000000..a6cb9e27 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerV2.kt @@ -0,0 +1,51 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.PPLTrigger.Companion.PPL_TRIGGER_FIELD +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.commons.notifications.model.BaseModel +import java.time.Instant + +interface TriggerV2 : BaseModel { + + val id: String + val name: String + val severity: Severity + val suppressDuration: TimeValue? + val expireDuration: TimeValue? + var lastTriggeredTime: Instant? + val actions: List + + enum class TriggerV2Type(val value: String) { + PPL_TRIGGER(PPL_TRIGGER_FIELD); + + override fun toString(): String { + return value + } + } + + enum class Severity(val value: String) { + INFO("info"), + ERROR("error"), + LOW("low"), + MEDIUM("medium"), + HIGH("high"), + CRITICAL("critical"); + + companion object { + fun enumFromString(value: String): Severity? { + return entries.find { it.value == value } + } + } + } + + companion object { + const val ID_FIELD = "id" + const val NAME_FIELD = "name" + const val SEVERITY_FIELD = "severity" + const val SUPPRESS_FIELD = "suppress" + const val LAST_TRIGGERED_FIELD = "last_triggered_time" + const val EXPIRE_FIELD = "expires" + const val ACTIONS_FIELD = "actions" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index 887e8430..0ea1d3cc 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -66,6 +66,10 @@ fun XContentBuilder.optionalUsernameField(name: String, user: User?): XContentBu return this.field(name, user.name) } +fun XContentBuilder.nonOptionalTimeField(name: String, instant: Instant): XContentBuilder { + return this.timeField(name, "${name}_in_millis", instant.toEpochMilli()) +} + fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContentBuilder { if (instant == null) { return nullField(name)