Skip to content

Commit 6a054f6

Browse files
committed
gave AlertV2s their own index and history indices and rewired Get Alerts V2 to use those
1 parent 99bc75b commit 6a054f6

File tree

13 files changed

+349
-73
lines changed

13 files changed

+349
-73
lines changed

alerting/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ dependencies {
169169

170170
// SQL/PPL plugin dependencies are included in alerting-core
171171
api project(":alerting-core")
172+
implementation 'org.json:json:20240303'
172173

173174
implementation "com.github.seancfoley:ipaddress:5.4.1"
174175
implementation project(path: ":alerting-spi", configuration: 'shadow')

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ import org.opensearch.alerting.action.SearchEmailAccountAction
1616
import org.opensearch.alerting.action.SearchEmailGroupAction
1717
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
1818
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Action
19+
import org.opensearch.alerting.actionv2.GetAlertsV2Action
1920
import org.opensearch.alerting.actionv2.GetMonitorV2Action
2021
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
2122
import org.opensearch.alerting.actionv2.SearchMonitorV2Action
2223
import org.opensearch.alerting.alerts.AlertIndices
2324
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
25+
import org.opensearch.alerting.alertsv2.AlertV2Indices
26+
import org.opensearch.alerting.alertsv2.AlertV2Mover
2427
import org.opensearch.alerting.comments.CommentsIndices
2528
import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN
2629
import org.opensearch.alerting.core.JobSweeper
@@ -62,6 +65,7 @@ import org.opensearch.alerting.transport.TransportExecuteMonitorAction
6265
import org.opensearch.alerting.transport.TransportExecuteMonitorV2Action
6366
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
6467
import org.opensearch.alerting.transport.TransportGetAlertsAction
68+
import org.opensearch.alerting.transport.TransportGetAlertsV2Action
6569
import org.opensearch.alerting.transport.TransportGetDestinationsAction
6670
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
6771
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
@@ -135,8 +139,6 @@ import org.opensearch.threadpool.ThreadPool
135139
import org.opensearch.transport.client.Client
136140
import org.opensearch.watcher.ResourceWatcherService
137141
import java.util.function.Supplier
138-
import org.opensearch.alerting.alertsv2.AlertV2Indices
139-
import org.opensearch.alerting.alertsv2.AlertV2Mover
140142

141143
/**
142144
* Entry point of the OpenDistro for Elasticsearch alerting plugin
@@ -270,6 +272,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
270272
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
271273
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
272274
ActionPlugin.ActionHandler(ExecuteMonitorV2Action.INSTANCE, TransportExecuteMonitorV2Action::class.java),
275+
ActionPlugin.ActionHandler(GetAlertsV2Action.INSTANCE, TransportGetAlertsV2Action::class.java)
273276
)
274277
}
275278

@@ -306,6 +309,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
306309
val settings = environment.settings()
307310
val lockService = LockService(client, clusterService)
308311
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
312+
alertV2Indices = AlertV2Indices(settings, client, threadPool, clusterService)
309313
val alertService = AlertService(client, xContentRegistry, alertIndices)
310314
val triggerService = TriggerService(scriptService)
311315
runner = MonitorRunnerService
@@ -455,7 +459,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
455459
AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD,
456460
AlertingSettings.COMMENTS_MAX_CONTENT_SIZE,
457461
AlertingSettings.MAX_COMMENTS_PER_ALERT,
458-
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION
462+
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION,
463+
AlertingSettings.ALERT_V2_HISTORY_ENABLED,
464+
AlertingSettings.ALERT_V2_HISTORY_ROLLOVER_PERIOD,
465+
AlertingSettings.ALERT_V2_HISTORY_INDEX_MAX_AGE,
466+
AlertingSettings.ALERT_V2_HISTORY_MAX_DOCS,
467+
AlertingSettings.ALERT_V2_HISTORY_RETENTION_PERIOD
459468
)
460469
}
461470

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.opensearch.alerting.actionv2.ExecuteMonitorV2Request
2626
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Response
2727
import org.opensearch.alerting.alerts.AlertIndices
2828
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
29+
import org.opensearch.alerting.alertsv2.AlertV2Indices
2930
import org.opensearch.alerting.core.JobRunner
3031
import org.opensearch.alerting.core.ScheduledJobIndices
3132
import org.opensearch.alerting.core.lock.LockModel
@@ -94,7 +95,6 @@ import java.time.Instant
9495
import java.time.LocalDateTime
9596
import java.time.ZoneOffset
9697
import java.util.UUID
97-
import org.opensearch.alerting.alertsv2.AlertV2Indices
9898
import kotlin.coroutines.CoroutineContext
9999

100100
object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

alerting/src/main/kotlin/org/opensearch/alerting/PPLMonitorRunner.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import org.opensearch.action.bulk.BulkResponse
1111
import org.opensearch.action.index.IndexRequest
1212
import org.opensearch.action.support.WriteRequest
1313
import org.opensearch.alerting.QueryLevelMonitorRunner.getConfigAndSendNotification
14-
import org.opensearch.alerting.alerts.AlertIndices
14+
import org.opensearch.alerting.alertsv2.AlertV2Indices
1515
import org.opensearch.alerting.core.modelv2.AlertV2
1616
import org.opensearch.alerting.core.modelv2.MonitorV2
1717
import org.opensearch.alerting.core.modelv2.MonitorV2RunResult
@@ -44,7 +44,6 @@ import java.time.Instant
4444
import java.time.ZoneOffset.UTC
4545
import java.time.format.DateTimeFormatter
4646
import java.time.temporal.ChronoUnit
47-
import org.opensearch.alerting.alertsv2.AlertV2Indices
4847

4948
object PPLMonitorRunner : MonitorV2Runner {
5049
private val logger = LogManager.getLogger(javaClass)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.opensearch.alerting.actionv2
2+
3+
import org.opensearch.action.ActionType
4+
5+
class GetAlertsV2Action private constructor() : ActionType<GetAlertsV2Response>(NAME, ::GetAlertsV2Response) {
6+
companion object {
7+
val INSTANCE = GetAlertsV2Action()
8+
const val NAME = "cluster:admin/opensearch/alerting/v2/alerts/get"
9+
}
10+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.opensearch.alerting.actionv2
2+
3+
import org.opensearch.action.ActionRequest
4+
import org.opensearch.action.ActionRequestValidationException
5+
import org.opensearch.commons.alerting.model.Table
6+
import org.opensearch.core.common.io.stream.StreamInput
7+
import org.opensearch.core.common.io.stream.StreamOutput
8+
import org.opensearch.index.query.BoolQueryBuilder
9+
import java.io.IOException
10+
11+
class GetAlertsV2Request : ActionRequest {
12+
val table: Table
13+
val severityLevel: String
14+
val monitorV2Id: String?
15+
val monitorV2Ids: List<String>?
16+
val alertV2Ids: List<String>?
17+
val boolQueryBuilder: BoolQueryBuilder?
18+
19+
constructor(
20+
table: Table,
21+
severityLevel: String,
22+
monitorV2Id: String?,
23+
monitorV2Ids: List<String>? = null,
24+
alertV2Ids: List<String>? = null,
25+
boolQueryBuilder: BoolQueryBuilder? = null
26+
) : super() {
27+
this.table = table
28+
this.severityLevel = severityLevel
29+
this.monitorV2Id = monitorV2Id
30+
this.monitorV2Ids = monitorV2Ids
31+
this.alertV2Ids = alertV2Ids
32+
this.boolQueryBuilder = boolQueryBuilder
33+
}
34+
35+
@Throws(IOException::class)
36+
constructor(sin: StreamInput) : this(
37+
table = Table.readFrom(sin),
38+
severityLevel = sin.readString(),
39+
monitorV2Id = sin.readOptionalString(),
40+
monitorV2Ids = sin.readOptionalStringList(),
41+
alertV2Ids = sin.readOptionalStringList(),
42+
boolQueryBuilder = if (sin.readOptionalBoolean() == true) BoolQueryBuilder(sin) else null
43+
)
44+
45+
override fun validate(): ActionRequestValidationException? {
46+
return null
47+
}
48+
49+
@Throws(IOException::class)
50+
override fun writeTo(out: StreamOutput) {
51+
table.writeTo(out)
52+
out.writeString(severityLevel)
53+
out.writeOptionalString(monitorV2Id)
54+
out.writeOptionalStringCollection(monitorV2Ids)
55+
out.writeOptionalStringCollection(alertV2Ids)
56+
if (boolQueryBuilder != null) {
57+
out.writeOptionalBoolean(true)
58+
boolQueryBuilder.writeTo(out)
59+
} else {
60+
out.writeOptionalBoolean(false)
61+
}
62+
}
63+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.opensearch.alerting.actionv2
2+
3+
import org.opensearch.alerting.core.modelv2.AlertV2
4+
import org.opensearch.commons.notifications.action.BaseResponse
5+
import org.opensearch.core.common.io.stream.StreamInput
6+
import org.opensearch.core.common.io.stream.StreamOutput
7+
import org.opensearch.core.xcontent.ToXContent
8+
import org.opensearch.core.xcontent.XContentBuilder
9+
import java.io.IOException
10+
import java.util.Collections
11+
12+
class GetAlertsV2Response : BaseResponse {
13+
val alertV2s: List<AlertV2>
14+
15+
// totalAlertV2s is not the same as the size of alerts because there can be 30 alerts from the request, but
16+
// the request only asked for 5 alerts, so totalAlertV2s will be 30, but alerts will only contain 5 alerts
17+
val totalAlertV2s: Int?
18+
19+
constructor(
20+
alertV2s: List<AlertV2>,
21+
totalAlertV2s: Int?
22+
) : super() {
23+
this.alertV2s = alertV2s
24+
this.totalAlertV2s = totalAlertV2s
25+
}
26+
27+
@Throws(IOException::class)
28+
constructor(sin: StreamInput) : this(
29+
alertV2s = Collections.unmodifiableList(sin.readList(::AlertV2)),
30+
totalAlertV2s = sin.readOptionalInt()
31+
)
32+
33+
@Throws(IOException::class)
34+
override fun writeTo(out: StreamOutput) {
35+
out.writeCollection(alertV2s)
36+
out.writeOptionalInt(totalAlertV2s)
37+
}
38+
39+
@Throws(IOException::class)
40+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
41+
builder.startObject()
42+
.field("alertV2s", alertV2s)
43+
.field("totalAlertV2s", totalAlertV2s)
44+
45+
return builder.endObject()
46+
}
47+
}

alerting/src/main/kotlin/org/opensearch/alerting/alertsv2/AlertV2Indices.kt

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.opensearch.alerting.alertsv2
22

3-
import java.time.Instant
43
import kotlinx.coroutines.CoroutineScope
54
import kotlinx.coroutines.Dispatchers
65
import kotlinx.coroutines.launch
@@ -20,7 +19,6 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest
2019
import org.opensearch.action.admin.indices.rollover.RolloverResponse
2120
import org.opensearch.action.support.IndicesOptions
2221
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
23-
import org.opensearch.alerting.alerts.AlertIndices
2422
import org.opensearch.alerting.opensearchapi.suspendUntil
2523
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_V2_HISTORY_ENABLED
2624
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_V2_HISTORY_INDEX_MAX_AGE
@@ -41,6 +39,7 @@ import org.opensearch.core.action.ActionListener
4139
import org.opensearch.threadpool.Scheduler.Cancellable
4240
import org.opensearch.threadpool.ThreadPool
4341
import org.opensearch.transport.client.Client
42+
import java.time.Instant
4443

4544
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
4645
private val logger = LogManager.getLogger(AlertV2Indices::class.java)
@@ -86,25 +85,20 @@ class AlertV2Indices(
8685
const val ALL_ALERT_V2_INDEX_PATTERN = ".opensearch-alerting-v2-alert*"
8786

8887
@JvmStatic
89-
fun alertV2Mapping() = // TODO: create alert_v2 mappings
88+
fun alertV2Mapping() =
9089
AlertV2Indices::class.java.getResource("alert_v2_mapping.json").readText()
9190
}
9291

9392
@Volatile private var alertV2HistoryEnabled = ALERT_V2_HISTORY_ENABLED.get(settings)
9493

95-
9694
@Volatile private var alertV2HistoryMaxDocs = ALERT_V2_HISTORY_MAX_DOCS.get(settings)
9795

98-
9996
@Volatile private var alertV2HistoryMaxAge = ALERT_V2_HISTORY_INDEX_MAX_AGE.get(settings)
10097

101-
10298
@Volatile private var alertV2HistoryRolloverPeriod = ALERT_V2_HISTORY_ROLLOVER_PERIOD.get(settings)
10399

104-
105100
@Volatile private var alertV2HistoryRetentionPeriod = ALERT_V2_HISTORY_RETENTION_PERIOD.get(settings)
106101

107-
108102
@Volatile private var requestTimeout = REQUEST_TIMEOUT.get(settings)
109103

110104
@Volatile private var isClusterManager = false
@@ -350,8 +344,12 @@ class AlertV2Indices(
350344
val indicesToDelete = mutableListOf<String>()
351345
for (entry in clusterStateResponse.state.metadata.indices) {
352346
val indexMetaData = entry.value
353-
getHistoryIndexToDelete(indexMetaData, alertV2HistoryRetentionPeriod.millis, ALERT_V2_HISTORY_WRITE_INDEX, alertV2HistoryEnabled)
354-
?.let { indicesToDelete.add(it) }
347+
getHistoryIndexToDelete(
348+
indexMetaData,
349+
alertV2HistoryRetentionPeriod.millis,
350+
ALERT_V2_HISTORY_WRITE_INDEX,
351+
alertV2HistoryEnabled
352+
)?.let { indicesToDelete.add(it) }
355353
}
356354
return indicesToDelete
357355
}

alerting/src/main/kotlin/org/opensearch/alerting/alertsv2/AlertV2Mover.kt

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package org.opensearch.alerting.alertsv2
22

3-
import java.time.Instant
4-
import java.util.concurrent.TimeUnit
53
import kotlinx.coroutines.CoroutineScope
64
import kotlinx.coroutines.Dispatchers
75
import kotlinx.coroutines.launch
@@ -12,7 +10,6 @@ import org.opensearch.action.delete.DeleteRequest
1210
import org.opensearch.action.index.IndexRequest
1311
import org.opensearch.action.search.SearchRequest
1412
import org.opensearch.action.search.SearchResponse
15-
import org.opensearch.alerting.alerts.AlertIndices
1613
import org.opensearch.alerting.core.modelv2.AlertV2
1714
import org.opensearch.alerting.opensearchapi.suspendUntil
1815
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_V2_HISTORY_ENABLED
@@ -25,9 +22,6 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
2522
import org.opensearch.common.xcontent.XContentFactory
2623
import org.opensearch.common.xcontent.XContentHelper
2724
import org.opensearch.common.xcontent.XContentType
28-
import org.opensearch.commons.alerting.model.Alert
29-
import org.opensearch.commons.alerting.model.ScheduledJob
30-
import org.opensearch.core.action.ActionListener
3125
import org.opensearch.core.common.bytes.BytesReference
3226
import org.opensearch.core.rest.RestStatus
3327
import org.opensearch.core.xcontent.NamedXContentRegistry
@@ -36,14 +30,12 @@ import org.opensearch.core.xcontent.XContentParser
3630
import org.opensearch.core.xcontent.XContentParserUtils
3731
import org.opensearch.index.VersionType
3832
import org.opensearch.index.query.QueryBuilders
39-
import org.opensearch.index.query.RangeQueryBuilder
40-
import org.opensearch.index.reindex.BulkByScrollResponse
41-
import org.opensearch.index.reindex.DeleteByQueryAction
42-
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
4333
import org.opensearch.search.builder.SearchSourceBuilder
4434
import org.opensearch.threadpool.Scheduler
4535
import org.opensearch.threadpool.ThreadPool
4636
import org.opensearch.transport.client.Client
37+
import java.time.Instant
38+
import java.util.concurrent.TimeUnit
4739

4840
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
4941
private val logger = LogManager.getLogger(AlertV2Mover::class.java)
@@ -139,9 +131,9 @@ class AlertV2Mover(
139131
.version(true)
140132

141133
val activeAlertsRequest = SearchRequest(AlertV2Indices.ALERT_V2_INDEX)
142-
// .routing(monitorId)
143134
.source(expiredAlertsSearchQuery)
144-
return client.suspendUntil { search(activeAlertsRequest, it) }
135+
val searchResponse: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) }
136+
return searchResponse
145137
}
146138

147139
private suspend fun copyExpiredAlerts(expiredAlertsSearchResponse: SearchResponse): BulkResponse? {
@@ -152,7 +144,6 @@ class AlertV2Mover(
152144

153145
val indexRequests = expiredAlertsSearchResponse.hits.map { hit ->
154146
IndexRequest(AlertV2Indices.ALERT_V2_HISTORY_WRITE_INDEX)
155-
// .routing(monitorId)
156147
.source(
157148
AlertV2.parse(alertV2ContentParser(hit.sourceRef), hit.id, hit.version)
158149
.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)
@@ -171,28 +162,29 @@ class AlertV2Mover(
171162
private suspend fun deleteExpiredAlerts(expiredAlertsSearchResponse: SearchResponse): BulkResponse {
172163
val deleteRequests = expiredAlertsSearchResponse.hits.map {
173164
DeleteRequest(AlertV2Indices.ALERT_V2_INDEX, it.id)
174-
// .routing(monitorId)
175165
.version(it.version)
176166
.versionType(VersionType.EXTERNAL_GTE)
177167
}
178168

179-
val deleteResponse: BulkResponse = client.suspendUntil { bulk(BulkRequest().add(deleteRequests), it) }
169+
val deleteRequest = BulkRequest().add(deleteRequests)
170+
val deleteResponse: BulkResponse = client.suspendUntil { bulk(deleteRequest, it) }
171+
180172
return deleteResponse
181173
}
182174

183175
private suspend fun deleteExpiredAlertsThatWereCopied(copyResponse: BulkResponse?): BulkResponse? {
184-
// if there were no expired alerts, skip deleting anything
176+
// if there were no expired alerts to copy, skip deleting anything
185177
if (copyResponse == null) {
186178
return null
187179
}
188180

189181
val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map {
190182
DeleteRequest(AlertV2Indices.ALERT_V2_INDEX, it.id)
191-
// .routing(monitorId)
192183
.version(it.version)
193184
.versionType(VersionType.EXTERNAL_GTE)
194185
}
195186
val deleteResponse: BulkResponse = client.suspendUntil { bulk(BulkRequest().add(deleteRequests), it) }
187+
196188
return deleteResponse
197189
}
198190

@@ -220,7 +212,6 @@ class AlertV2Mover(
220212
return xcp
221213
}
222214

223-
224215
private fun areAlertV2IndicesPresent(): Boolean {
225216
return alertV2IndexInitialized && alertV2HistoryIndexInitialized
226217
}

0 commit comments

Comments
 (0)