diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/MonitorV2RunResult.kt b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/MonitorV2RunResult.kt new file mode 100644 index 000000000..0db8b9ce8 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/MonitorV2RunResult.kt @@ -0,0 +1,44 @@ +package org.opensearch.alerting.core.modelv2 + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import java.time.Instant + +interface MonitorV2RunResult : Writeable, ToXContent { + val monitorName: String + val error: Exception? + val periodStart: Instant + val periodEnd: Instant + val triggerResults: Map + + enum class MonitorV2RunResultType() { + PPL_MONITOR_RUN_RESULT; + } + + companion object { + const val MONITOR_V2_NAME_FIELD = "monitor_v2_name" + const val ERROR_FIELD = "error" + const val PERIOD_START_FIELD = "period_start" + const val PERIOD_END_FIELD = "period_end" + const val TRIGGER_RESULTS_FIELD = "trigger_results" + + fun readFrom(sin: StreamInput): MonitorV2RunResult { + val monitorRunResultType = sin.readEnum(MonitorV2RunResultType::class.java) + return when (monitorRunResultType) { + MonitorV2RunResultType.PPL_MONITOR_RUN_RESULT -> PPLMonitorRunResult(sin) + else -> throw IllegalStateException("Unexpected input [$monitorRunResultType] when reading MonitorV2RunResult") + } + } + + fun writeTo(out: StreamOutput, monitorV2RunResult: MonitorV2RunResult) { + when (monitorV2RunResult) { + is PPLMonitorRunResult -> { + out.writeEnum(MonitorV2RunResultType.PPL_MONITOR_RUN_RESULT) + monitorV2RunResult.writeTo(out) + } + } + } + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLMonitorRunResult.kt b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLMonitorRunResult.kt new file mode 100644 index 000000000..135186abb --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLMonitorRunResult.kt @@ -0,0 +1,61 @@ +package org.opensearch.alerting.core.modelv2 + +import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.ERROR_FIELD +import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.MONITOR_V2_NAME_FIELD +import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.PERIOD_END_FIELD +import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.PERIOD_START_FIELD +import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.TRIGGER_RESULTS_FIELD +import org.opensearch.alerting.core.util.nonOptionalTimeField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException +import java.time.Instant + +data class PPLMonitorRunResult( + override val monitorName: String, + override val error: Exception?, + override val periodStart: Instant, + override val periodEnd: Instant, + override val triggerResults: Map, + val pplQueryResults: Map> // key: trigger id, value: query results +) : MonitorV2RunResult { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + sin.readString(), // monitorName + sin.readException(), // error + sin.readInstant(), // periodStart + sin.readInstant(), // periodEnd + sin.readMap() as Map, // triggerResults + sin.readMap() as Map> // pplQueryResults + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field(MONITOR_V2_NAME_FIELD, monitorName) + builder.nonOptionalTimeField(PERIOD_START_FIELD, periodStart) + builder.nonOptionalTimeField(PERIOD_END_FIELD, periodEnd) + builder.field(ERROR_FIELD, error?.message) + builder.field(TRIGGER_RESULTS_FIELD, triggerResults) + builder.field(PPL_QUERY_RESULTS_FIELD, pplQueryResults) + builder.endObject() + return builder + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorName) + out.writeException(error) + out.writeInstant(periodStart) + out.writeInstant(periodEnd) + out.writeMap(triggerResults) + out.writeMap(pplQueryResults) + } + + companion object { + const val PPL_QUERY_RESULTS_FIELD = "ppl_query_results" + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLTriggerRunResult.kt b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLTriggerRunResult.kt new file mode 100644 index 000000000..175275a97 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLTriggerRunResult.kt @@ -0,0 +1,51 @@ +package org.opensearch.alerting.core.modelv2 + +import org.opensearch.alerting.core.modelv2.TriggerV2RunResult.Companion.ERROR_FIELD +import org.opensearch.alerting.core.modelv2.TriggerV2RunResult.Companion.NAME_FIELD +import org.opensearch.alerting.core.modelv2.TriggerV2RunResult.Companion.TRIGGERED_FIELD +import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult +import org.opensearch.commons.alerting.model.TriggerRunResult +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +data class PPLTriggerRunResult( + override var triggerName: String, + override var triggered: Boolean, + override var error: Exception?, +) : TriggerV2RunResult { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + triggered = sin.readBoolean(), + error = sin.readException() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field(NAME_FIELD, triggerName) + builder.field(TRIGGERED_FIELD, triggered) + builder.field(ERROR_FIELD, error?.message) + builder.endObject() + return builder + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(triggerName) + out.writeBoolean(triggered) + out.writeException(error) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return QueryLevelTriggerRunResult(sin) + } + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/TriggerV2RunResult.kt b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/TriggerV2RunResult.kt new file mode 100644 index 000000000..37ea50c90 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/modelv2/TriggerV2RunResult.kt @@ -0,0 +1,17 @@ +package org.opensearch.alerting.core.modelv2 + +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent + +interface TriggerV2RunResult : Writeable, ToXContent { + + val triggerName: String + val triggered: Boolean + val error: Exception? + + companion object { + const val NAME_FIELD = "name" + const val TRIGGERED_FIELD = "triggered" + const val ERROR_FIELD = "error" + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/ppl/PPLPluginInterface.kt b/core/src/main/kotlin/org/opensearch/alerting/core/ppl/PPLPluginInterface.kt new file mode 100644 index 000000000..477b417ce --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/ppl/PPLPluginInterface.kt @@ -0,0 +1,50 @@ +package org.opensearch.alerting.core.ppl + +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.sql.plugin.transport.PPLQueryAction +import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse +import org.opensearch.transport.client.node.NodeClient + +/** + * Various transport action plugin interfaces for the SQL/PPL plugin + */ +object PPLPluginInterface { + fun executeQuery( + client: NodeClient, + request: TransportPPLQueryRequest, + listener: ActionListener + ) { + client.execute( + PPLQueryAction.INSTANCE, + request, + wrapActionListener(listener) { response -> recreateObject(response) { TransportPPLQueryResponse(it) } } + ) + } + + /** + * Wrap action listener on concrete response class by a new created one on ActionResponse. + * This is required because the response may be loaded by different classloader across plugins. + * The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate + * the response object. + */ + @Suppress("UNCHECKED_CAST") + private fun wrapActionListener( + listener: ActionListener, + recreate: (Writeable) -> Response + ): ActionListener { + return object : ActionListener { + override fun onResponse(response: ActionResponse) { + val recreated = recreate(response) + listener.onResponse(recreated) + } + + override fun onFailure(exception: java.lang.Exception) { + listener.onFailure(exception) + } + } as ActionListener + } +}