Skip to content

Commit ad9b57e

Browse files
committed
adding PPL Monitor query validations before creating PPL Monitors
1 parent 45f9250 commit ad9b57e

File tree

5 files changed

+167
-85
lines changed

5 files changed

+167
-85
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/PPLMonitorRunner.kt

Lines changed: 78 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ object PPLMonitorRunner : MonitorV2Runner {
5050

5151
private const val PPL_SQL_QUERY_FIELD = "query" // name of PPL query field when passing into PPL/SQL Execute API call
5252

53+
private const val TIMESTAMP_FIELD = "timestamp" // TODO: this should be deleted once PPL plugin side time keywords are introduced
54+
5355
override suspend fun runMonitorV2(
5456
monitorV2: MonitorV2,
5557
monitorCtx: MonitorRunnerExecutionContext, // MonitorV2 reads from same context as Monitor
@@ -141,7 +143,6 @@ object PPLMonitorRunner : MonitorV2Runner {
141143
appendCustomCondition(timeFilteredQuery, pplTrigger.customCondition!!)
142144
}
143145

144-
// TODO: does this handle pagination? does it need to?
145146
// execute the PPL query
146147
val queryResponseJson = executePplQuery(queryToExecute, nodeClient)
147148
logger.info("query execution results for trigger ${pplTrigger.name}: $queryResponseJson")
@@ -229,7 +230,7 @@ object PPLMonitorRunner : MonitorV2Runner {
229230
}
230231
}
231232
} catch (e: Exception) {
232-
logger.error("failed to run PPL Trigger ${pplTrigger.name} for PPL Monitor ${pplMonitor.name}", e)
233+
logger.error("failed to run PPL Trigger ${pplTrigger.name} from PPL Monitor ${pplMonitor.name}", e)
233234

234235
// generate an alert with an error message
235236
monitorCtx.retryPolicy?.let {
@@ -283,11 +284,11 @@ object PPLMonitorRunner : MonitorV2Runner {
283284
// inject time filter into PPL query to only query for data within the (periodStart, periodEnd) interval
284285
// TODO: if query contains "_time", "span", "earliest", "latest", skip adding filter
285286
// pending https://github.com/opensearch-project/sql/issues/3969
286-
// for now assume "_time" field is always present in customer data
287+
// for now assume TIMESTAMP_FIELD field is always present in customer data
287288

288289
// if the raw query contained any time check whatsoever, skip adding a time filter internally
289290
// and return query as is, customer's in-query time checks instantly and automatically overrides
290-
if (query.contains("_time")) { // TODO: replace with PPL time keyword checks after that's GA
291+
if (query.contains(TIMESTAMP_FIELD)) { // TODO: replace with PPL time keyword checks after that's GA
291292
return query
292293
}
293294

@@ -302,8 +303,9 @@ object PPLMonitorRunner : MonitorV2Runner {
302303
val periodStartPplTimestamp = formatter.format(updatedPeriodStart)
303304
val periodEndPplTimeStamp = formatter.format(periodEnd)
304305

305-
val timeFilterReplace = "| where _time > TIMESTAMP('$periodStartPplTimestamp') and _time < TIMESTAMP('$periodEndPplTimeStamp') |"
306-
val timeFilterAppend = "| where _time > TIMESTAMP('$periodStartPplTimestamp') and _time < TIMESTAMP('$periodEndPplTimeStamp')"
306+
val timeFilterAppend = "| where $TIMESTAMP_FIELD > TIMESTAMP('$periodStartPplTimestamp') and " +
307+
"$TIMESTAMP_FIELD < TIMESTAMP('$periodEndPplTimeStamp')"
308+
val timeFilterReplace = "$timeFilterAppend |"
307309

308310
val timeFilteredQuery: String = if (query.contains("|")) {
309311
// if Monitor query contains piped statements, inject the time filter
@@ -319,33 +321,6 @@ object PPLMonitorRunner : MonitorV2Runner {
319321
return timeFilteredQuery
320322
}
321323

322-
// appends user-defined custom trigger condition to PPL query, only for custom condition Triggers
323-
private fun appendCustomCondition(query: String, customCondition: String): String {
324-
return "$query | $customCondition"
325-
}
326-
327-
// returns PPL query response as parsable JSONObject
328-
private suspend fun executePplQuery(query: String, client: NodeClient): JSONObject {
329-
// call PPL plugin to execute time filtered query
330-
val transportPplQueryRequest = TransportPPLQueryRequest(
331-
query,
332-
JSONObject(mapOf(PPL_SQL_QUERY_FIELD to query)),
333-
null // null path falls back to a default path internal to SQL/PPL Plugin
334-
)
335-
336-
val transportPplQueryResponse = PPLPluginInterface.suspendUntil {
337-
this.executeQuery(
338-
client,
339-
transportPplQueryRequest,
340-
it
341-
)
342-
}
343-
344-
val queryResponseJson = JSONObject(transportPplQueryResponse.result)
345-
346-
return queryResponseJson
347-
}
348-
349324
private fun evaluateNumResultsTrigger(numResults: Long, numResultsCondition: NumResultsCondition, numResultsValue: Long): Boolean {
350325
return when (numResultsCondition) {
351326
NumResultsCondition.GREATER_THAN -> numResults > numResultsValue
@@ -402,33 +377,8 @@ object PPLMonitorRunner : MonitorV2Runner {
402377
// find the name of the eval result variable defined in custom condition
403378
val evalResultVarName = findEvalResultVar(pplTrigger.customCondition!!)
404379

405-
// find the eval statement result variable in the PPL query response schema
406-
val schemaList = customConditionQueryResponse.getJSONArray("schema")
407-
var evalResultVarIdx = -1
408-
for (i in 0 until schemaList.length()) {
409-
val schemaObj = schemaList.getJSONObject(i)
410-
val columnName = schemaObj.getString("name")
411-
412-
if (columnName == evalResultVarName) {
413-
if (schemaObj.getString("type") != "boolean") {
414-
throw IllegalStateException(
415-
"parsing results of PPL query with custom condition failed," +
416-
"eval statement variable was not type boolean, but instead type: ${schemaObj.getString("type")}"
417-
)
418-
}
419-
420-
evalResultVarIdx = i
421-
break
422-
}
423-
}
424-
425-
// eval statement result variable should always be found
426-
if (evalResultVarIdx == -1) {
427-
throw IllegalStateException(
428-
"expected to find eval statement results variable \"$evalResultVarName\" in results " +
429-
"of PPL query with custom condition, but did not."
430-
)
431-
}
380+
// find the index eval statement result variable in the PPL query response schema
381+
val evalResultVarIdx = findEvalResultVarIdxInSchema(customConditionQueryResponse, evalResultVarName)
432382

433383
val dataRowList = customConditionQueryResponse.getJSONArray("datarows")
434384
for (i in 0 until dataRowList.length()) {
@@ -448,20 +398,6 @@ object PPLMonitorRunner : MonitorV2Runner {
448398
return relevantQueryResultRows
449399
}
450400

451-
// TODO: is there maybe some PPL plugin util function we can use to replace this?
452-
// searches a given custom condition eval statement for the name of the result
453-
// variable and returns it
454-
private fun findEvalResultVar(customCondition: String): String {
455-
// the PPL keyword "eval", followed by a whitespace must be present, otherwise a syntax error from PPL plugin would've
456-
// been thrown when executing the query (without the whitespace, the query would've had something like "evalresult",
457-
// which is invalid PPL
458-
val startOfEvalStatement = "eval "
459-
460-
val startIdx = customCondition.indexOf(startOfEvalStatement) + startOfEvalStatement.length
461-
val endIdx = startIdx + customCondition.substring(startIdx).indexOfFirst { it == ' ' || it == '=' }
462-
return customCondition.substring(startIdx, endIdx)
463-
}
464-
465401
// prepares the query results to be passed into alerts and notifications based on trigger mode
466402
// if result set, alert and notification simply stores all query results
467403
// if per result, each alert and notification stores a single row of the query results
@@ -480,7 +416,6 @@ object PPLMonitorRunner : MonitorV2Runner {
480416
individualRow.put("datarows", JSONArray(relevantQueryResultRows.getJSONArray("datarows").getJSONArray(i).toList()))
481417
individualRows.add(individualRow)
482418
}
483-
484419
return individualRows
485420
}
486421

@@ -637,4 +572,72 @@ object PPLMonitorRunner : MonitorV2Runner {
637572
// }
638573
}
639574
}
575+
576+
/* public util functions */
577+
578+
// appends user-defined custom trigger condition to PPL query, only for custom condition Triggers
579+
fun appendCustomCondition(query: String, customCondition: String): String {
580+
return "$query | $customCondition"
581+
}
582+
583+
// returns PPL query response as parsable JSONObject
584+
suspend fun executePplQuery(query: String, client: NodeClient): JSONObject {
585+
// call PPL plugin to execute time filtered query
586+
val transportPplQueryRequest = TransportPPLQueryRequest(
587+
query,
588+
JSONObject(mapOf(PPL_SQL_QUERY_FIELD to query)),
589+
null // null path falls back to a default path internal to SQL/PPL Plugin
590+
)
591+
592+
val transportPplQueryResponse = PPLPluginInterface.suspendUntil {
593+
this.executeQuery(
594+
client,
595+
transportPplQueryRequest,
596+
it
597+
)
598+
}
599+
600+
val queryResponseJson = JSONObject(transportPplQueryResponse.result)
601+
602+
return queryResponseJson
603+
}
604+
605+
// TODO: is there maybe some PPL plugin util function we can use to replace this?
606+
// searches a given custom condition eval statement for the name of
607+
// the eval result variable and returns it
608+
fun findEvalResultVar(customCondition: String): String {
609+
// the PPL keyword "eval", followed by a whitespace must be present, otherwise a syntax error from PPL plugin would've
610+
// been thrown when executing the query (without the whitespace, the query would've had something like "evalresult",
611+
// which is invalid PPL
612+
val startOfEvalStatement = "eval "
613+
614+
val startIdx = customCondition.indexOf(startOfEvalStatement) + startOfEvalStatement.length
615+
val endIdx = startIdx + customCondition.substring(startIdx).indexOfFirst { it == ' ' || it == '=' }
616+
return customCondition.substring(startIdx, endIdx)
617+
}
618+
619+
fun findEvalResultVarIdxInSchema(customConditionQueryResponse: JSONObject, evalResultVarName: String): Int {
620+
// find the index eval statement result variable in the PPL query response schema
621+
val schemaList = customConditionQueryResponse.getJSONArray("schema")
622+
var evalResultVarIdx = -1
623+
for (i in 0 until schemaList.length()) {
624+
val schemaObj = schemaList.getJSONObject(i)
625+
val columnName = schemaObj.getString("name")
626+
627+
if (columnName == evalResultVarName) {
628+
evalResultVarIdx = i
629+
break
630+
}
631+
}
632+
633+
// eval statement result variable should always be found
634+
if (evalResultVarIdx == -1) {
635+
throw IllegalStateException(
636+
"expected to find eval statement results variable \"$evalResultVarName\" in results " +
637+
"of PPL query with custom condition, but did not."
638+
)
639+
}
640+
641+
return evalResultVarIdx
642+
}
640643
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorV2Action.kt

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ import org.opensearch.action.search.SearchResponse
2121
import org.opensearch.action.support.ActionFilters
2222
import org.opensearch.action.support.HandledTransportAction
2323
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
24+
import org.opensearch.alerting.PPLMonitorRunner.appendCustomCondition
25+
import org.opensearch.alerting.PPLMonitorRunner.executePplQuery
26+
import org.opensearch.alerting.PPLMonitorRunner.findEvalResultVar
27+
import org.opensearch.alerting.PPLMonitorRunner.findEvalResultVarIdxInSchema
2428
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
2529
import org.opensearch.alerting.actionv2.IndexMonitorV2Request
2630
import org.opensearch.alerting.actionv2.IndexMonitorV2Response
2731
import org.opensearch.alerting.core.ScheduledJobIndices
2832
import org.opensearch.alerting.core.modelv2.MonitorV2
2933
import org.opensearch.alerting.core.modelv2.PPLMonitor
34+
import org.opensearch.alerting.core.modelv2.PPLTrigger.ConditionType
3035
import org.opensearch.alerting.opensearchapi.suspendUntil
3136
import org.opensearch.alerting.settings.AlertingSettings
3237
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS
@@ -43,6 +48,7 @@ import org.opensearch.common.xcontent.XContentType
4348
import org.opensearch.commons.alerting.model.Monitor
4449
import org.opensearch.commons.alerting.model.ScheduledJob
4550
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
51+
import org.opensearch.commons.alerting.model.userErrorMessage
4652
import org.opensearch.commons.alerting.util.AlertingException
4753
import org.opensearch.core.action.ActionListener
4854
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
@@ -55,6 +61,7 @@ import org.opensearch.search.builder.SearchSourceBuilder
5561
import org.opensearch.tasks.Task
5662
import org.opensearch.transport.TransportService
5763
import org.opensearch.transport.client.Client
64+
import org.opensearch.transport.client.node.NodeClient
5865

5966
private val log = LogManager.getLogger(TransportIndexMonitorV2Action::class.java)
6067
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
@@ -81,6 +88,86 @@ class TransportIndexMonitorV2Action @Inject constructor(
8188
@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
8289

8390
override fun doExecute(task: Task, indexMonitorRequest: IndexMonitorV2Request, actionListener: ActionListener<IndexMonitorV2Response>) {
91+
// validate the MonitorV2 based on its type
92+
when (indexMonitorRequest.monitorV2) {
93+
is PPLMonitor -> validateMonitorPplQuery(
94+
indexMonitorRequest.monitorV2 as PPLMonitor,
95+
object : ActionListener<Unit> { // validationListener
96+
override fun onResponse(response: Unit) {
97+
checkScheduledJobIndex(indexMonitorRequest, actionListener)
98+
}
99+
100+
override fun onFailure(e: Exception) {
101+
actionListener.onFailure(e)
102+
}
103+
}
104+
)
105+
else -> actionListener.onFailure(
106+
AlertingException.wrap(
107+
IllegalStateException(
108+
"unexpected MonitorV2 type: ${indexMonitorRequest.monitorV2.javaClass.name}"
109+
)
110+
)
111+
)
112+
}
113+
}
114+
115+
private fun validateMonitorPplQuery(pplMonitor: PPLMonitor, validationListener: ActionListener<Unit>) {
116+
scope.launch {
117+
try {
118+
val nodeClient = client as NodeClient
119+
120+
// first attempt to run the base query
121+
// if there are any PPL syntax errors, this will throw an exception
122+
executePplQuery(pplMonitor.query, nodeClient)
123+
124+
// now scan all the triggers with custom conditions, and ensure each query constructed
125+
// from the base query + custom condition is valid
126+
val allCustomTriggersValid = true
127+
for (pplTrigger in pplMonitor.triggers) {
128+
if (pplTrigger.conditionType == ConditionType.NUMBER_OF_RESULTS) {
129+
continue
130+
}
131+
132+
val evalResultVar = findEvalResultVar(pplTrigger.customCondition!!)
133+
134+
val queryWithCustomCondition = appendCustomCondition(pplMonitor.query, pplTrigger.customCondition!!)
135+
136+
val executePplQueryResponse = executePplQuery(queryWithCustomCondition, nodeClient)
137+
138+
val evalResultVarIdx = findEvalResultVarIdxInSchema(executePplQueryResponse, evalResultVar)
139+
140+
val resultVarType = executePplQueryResponse
141+
.getJSONArray("schema")
142+
.getJSONObject(evalResultVarIdx)
143+
.getString("type")
144+
145+
// custom conditions must evaluate to a boolean result, otherwise it's invalid
146+
if (resultVarType != "boolean") {
147+
validationListener.onFailure(
148+
AlertingException.wrap(
149+
IllegalArgumentException(
150+
"Custom condition in trigger ${pplTrigger.name} is invalid because it does not " +
151+
"evaluate to a boolean, but instead to type: $resultVarType"
152+
)
153+
)
154+
)
155+
return@launch
156+
}
157+
}
158+
159+
validationListener.onResponse(Unit)
160+
} catch (e: Exception) {
161+
validationListener.onFailure(
162+
AlertingException.wrap(
163+
IllegalArgumentException("Invalid PPL Query in PPL Monitor: ${e.userErrorMessage()}")
164+
)
165+
)
166+
}
167+
}
168+
}
169+
170+
private fun checkScheduledJobIndex(indexMonitorRequest: IndexMonitorV2Request, actionListener: ActionListener<IndexMonitorV2Response>) {
84171
/* check to see if alerting-config index (scheduled job index) is created and updated before indexing MonitorV2 into it */
85172
if (!scheduledJobIndices.scheduledJobIndexExists()) { // if alerting-config index doesn't exist, send request to create it
86173
scheduledJobIndices.initScheduledJobIndex(object : ActionListener<CreateIndexResponse> {

core/src/main/kotlin/org/opensearch/alerting/core/modelv2/MonitorV2.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ interface MonitorV2 : ScheduledJob {
4848

4949
// field names
5050
const val NAME_FIELD = "name"
51-
const val MONITOR_TYPE_FIELD = "monitor_type"
5251
const val ENABLED_FIELD = "enabled"
5352
const val SCHEDULE_FIELD = "schedule"
5453
const val LAST_UPDATE_TIME_FIELD = "last_update_time"

core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLMonitor.kt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ data class PPLMonitor(
209209
@Throws(IOException::class)
210210
fun parse(xcp: XContentParser, id: String = MonitorV2.NO_ID, version: Long = MonitorV2.NO_VERSION): PPLMonitor {
211211
var name: String? = null
212-
var monitorType: String = PPL_MONITOR_TYPE
213212
var enabled = true
214213
var schedule: Schedule? = null
215214
var lookBackWindow: TimeValue? = null
@@ -228,7 +227,6 @@ data class PPLMonitor(
228227

229228
when (fieldName) {
230229
MonitorV2.NAME_FIELD -> name = xcp.text()
231-
MonitorV2.MONITOR_TYPE_FIELD -> monitorType = xcp.text()
232230
MonitorV2.ENABLED_FIELD -> enabled = xcp.booleanValue()
233231
MonitorV2.SCHEDULE_FIELD -> schedule = Schedule.parse(xcp)
234232
MonitorV2.LOOK_BACK_WINDOW_FIELD -> {
@@ -263,17 +261,12 @@ data class PPLMonitor(
263261
queryLanguage = enumMatchResult
264262
}
265263
QUERY_FIELD -> query = xcp.text()
266-
else -> throw IllegalArgumentException("Unexpected field \"$fieldName\" when parsing PPL Monitor")
264+
else -> throw IllegalArgumentException("Unexpected field when parsing PPL Monitor: $fieldName")
267265
}
268266
}
269267

270268
/* validations */
271269

272-
// ensure MonitorV2 XContent being parsed by PPLMonitor class is PPL Monitor type
273-
if (monitorType != PPL_MONITOR_TYPE) {
274-
throw IllegalArgumentException("Invalid monitor type: $monitorType")
275-
}
276-
277270
// ensure there's at least 1 trigger
278271
if (triggers.isEmpty()) {
279272
throw IllegalArgumentException("Monitor must include at least 1 trigger")

core/src/main/kotlin/org/opensearch/alerting/core/modelv2/PPLTrigger.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ data class PPLTrigger(
313313
actions.add(Action.parse(xcp))
314314
}
315315
}
316-
else -> throw IllegalArgumentException("Unexpected field $fieldName when parsing PPL Trigger")
316+
else -> throw IllegalArgumentException("Unexpected field when parsing PPL Trigger: $fieldName")
317317
}
318318
}
319319

0 commit comments

Comments
 (0)