Skip to content

Commit a990ded

Browse files
eirsepengechas
andauthored
Doc level monitor seq_no calculation bug fix (#1886)
* ontiFix MGet bug, randomize fan out distribution Signed-off-by: Chase Engelbrecht <[email protected]> * Fix ktlint Signed-off-by: Chase Engelbrecht <[email protected]> * changes last run context initialization to first preserve existing shard seq_nos and only initialize new shards Signed-off-by: Surya Sashank Nistala <[email protected]> * standardize seq_no as long in last run context maps Signed-off-by: Surya Sashank Nistala <[email protected]> --------- Signed-off-by: Chase Engelbrecht <[email protected]> Signed-off-by: Surya Sashank Nistala <[email protected]> Co-authored-by: Chase Engelbrecht <[email protected]>
1 parent 2634e04 commit a990ded

File tree

6 files changed

+87
-81
lines changed

6 files changed

+87
-81
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.opensearch.core.common.breaker.CircuitBreakingException
3333
import org.opensearch.core.common.io.stream.Writeable
3434
import org.opensearch.core.rest.RestStatus
3535
import org.opensearch.index.IndexNotFoundException
36-
import org.opensearch.index.seqno.SequenceNumbers
3736
import org.opensearch.node.NodeClosedException
3837
import org.opensearch.transport.ActionNotFoundTransportException
3938
import org.opensearch.transport.ConnectTransportException
@@ -188,12 +187,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
188187
)
189188
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
190189
}
191-
190+
val shardCount: Int = getShardsCount(monitorCtx.clusterService!!, concreteIndexName)
192191
// Prepare updatedLastRunContext for each index
193192
val indexUpdatedRunContext = initializeNewLastRunContext(
194193
indexLastRunContext.toMutableMap(),
195-
monitorCtx,
196194
concreteIndexName,
195+
shardCount
197196
) as MutableMap<String, Any>
198197
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
199198
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
@@ -217,8 +216,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
217216
// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
218217
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
219218
if (isTempMonitor) {
220-
indexLastRunContext[shard] =
221-
max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
219+
indexLastRunContext[shard] = if (indexLastRunContext.containsKey(shard)) {
220+
if (indexLastRunContext[shard] is Long) {
221+
max(-1L, indexUpdatedRunContext[shard] as Long - 10L)
222+
} else if (indexLastRunContext[shard] is Int) {
223+
max(-1L, (indexUpdatedRunContext[shard] as Int).toLong() - 10L)
224+
} else -1L
225+
} else {
226+
-1L
227+
}
222228
}
223229
}
224230
val indexExecutionContext = IndexExecutionContext(
@@ -421,7 +427,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
421427
return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults)
422428
} catch (e: Exception) {
423429
val errorMessage = ExceptionsHelper.detailedMessage(e)
424-
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
430+
if (false == dryrun) {
431+
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
432+
}
425433
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
426434
val alertingException = AlertingException(
427435
errorMessage,
@@ -455,7 +463,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
455463
if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) {
456464
fanOutResponse.lastRunContexts.keys.forEach {
457465

458-
val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull()
466+
val seq_no = fanOutResponse.lastRunContexts[it].toString().toLongOrNull()
459467
if (
460468
it != "shards_count" &&
461469
it != "index" &&
@@ -560,20 +568,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
560568
return InputRunResults(listOf(inputRunResults), if (!errors.isEmpty()) AlertingException.merge(*errors.toTypedArray()) else null)
561569
}
562570

563-
private fun initializeNewLastRunContext(
564-
lastRunContext: Map<String, Any>,
565-
monitorCtx: MonitorRunnerExecutionContext,
566-
index: String,
567-
): Map<String, Any> {
568-
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
569-
val updatedLastRunContext = lastRunContext.toMutableMap()
570-
for (i: Int in 0 until count) {
571-
val shard = i.toString()
572-
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString()
573-
}
574-
return updatedLastRunContext
575-
}
576-
577571
private fun validate(monitor: Monitor) {
578572
if (monitor.inputs.size > 1) {
579573
throw IOException("Only one input is supported with document-level-monitor.")

alerting/src/main/kotlin/org/opensearch/alerting/MonitorFanOutUtils.kt

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager
88
import org.apache.logging.log4j.Logger
99
import org.opensearch.core.index.Index
1010
import org.opensearch.core.index.shard.ShardId
11+
import org.opensearch.index.seqno.SequenceNumbers
1112

1213
private val logger: Logger = LogManager.getLogger("FanOutEligibility")
1314

@@ -42,3 +43,37 @@ fun distributeShards(
4243

4344
return nodeShardAssignments
4445
}
46+
47+
/**
48+
* Initializes the last run context for a given index.
49+
*
50+
* This method prepares the context structure to be updated later by fan-out operations.
51+
* It preserves existing sequence numbers and initializes new shards with UNASSIGNED_SEQ_NO.
52+
*
53+
* @param lastRunContext The previous run context from monitor metadata
54+
* @param monitorCtx The execution context containing cluster service
55+
* @param index The name of the index for which the context is being initialized
56+
* @return A map containing the initialized last run context
57+
*/
58+
fun initializeNewLastRunContext(
59+
lastRunContext: Map<String, Any>,
60+
index: String,
61+
shardCount: Int,
62+
): Map<String, Any> {
63+
val updatedLastRunContext = lastRunContext.toMutableMap()
64+
65+
// Only initialize shards that don't already have a sequence number
66+
for (i: Int in 0 until shardCount) {
67+
val shard = i.toString()
68+
// Preserve existing sequence numbers instead of resetting to UNASSIGNED
69+
if (!updatedLastRunContext.containsKey(shard)) {
70+
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO
71+
}
72+
}
73+
74+
// Metadata fields
75+
updatedLastRunContext["shards_count"] = shardCount
76+
updatedLastRunContext["index"] = index
77+
78+
return updatedLastRunContext
79+
}

alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.opensearch.Version
1111
import org.opensearch.alerting.MonitorMetadataService
1212
import org.opensearch.alerting.MonitorRunner
1313
import org.opensearch.alerting.MonitorRunnerExecutionContext
14+
import org.opensearch.alerting.initializeNewLastRunContext
1415
import org.opensearch.alerting.util.IndexUtils
1516
import org.opensearch.cluster.metadata.IndexMetadata
1617
import org.opensearch.cluster.node.DiscoveryNode
@@ -27,7 +28,6 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonit
2728
import org.opensearch.commons.alerting.util.AlertingException
2829
import org.opensearch.core.index.shard.ShardId
2930
import org.opensearch.core.rest.RestStatus
30-
import org.opensearch.index.seqno.SequenceNumbers
3131
import org.opensearch.transport.TransportService
3232
import java.io.IOException
3333
import java.time.Instant
@@ -121,11 +121,11 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
121121
)
122122
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
123123
}
124-
124+
val shardCount: Int = getShardsCount(monitorCtx.clusterService!!, concreteIndexName)
125125
val indexUpdatedRunContext = initializeNewLastRunContext(
126126
indexLastRunContext.toMutableMap(),
127-
monitorCtx,
128-
concreteIndexName
127+
concreteIndexName,
128+
shardCount
129129
) as MutableMap<String, Any>
130130
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
131131
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
@@ -269,12 +269,12 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
269269
if (fanOutResponse.lastRunContexts.contains(indexName)) {
270270
(fanOutResponse.lastRunContexts[indexName] as Map<String, Any>).forEach {
271271

272-
val seq_no = it.value.toString().toIntOrNull()
272+
val seq_no = it.value.toString().toLongOrNull()
273273
if (
274274
it.key != "shards_count" &&
275275
it.key != "index" &&
276276
seq_no != null &&
277-
seq_no >= 0
277+
seq_no >= 0L
278278
) {
279279
indexLastRunContext[it.key] = seq_no
280280
}
@@ -370,18 +370,4 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
370370
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
371371
return indexCreationDate > lastExecutionTime.toEpochMilli()
372372
}
373-
374-
private fun initializeNewLastRunContext(
375-
lastRunContext: Map<String, Any>,
376-
monitorCtx: MonitorRunnerExecutionContext,
377-
index: String,
378-
): Map<String, Any> {
379-
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
380-
val updatedLastRunContext = lastRunContext.toMutableMap()
381-
for (i: Int in 0 until count) {
382-
val shard = i.toString()
383-
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO
384-
}
385-
return updatedLastRunContext
386-
}
387373
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class TransportDocLevelMonitorFanOutAction
141141
val alertService: AlertService,
142142
val scriptService: ScriptService,
143143
val settings: Settings,
144-
val xContentRegistry: NamedXContentRegistry
144+
val xContentRegistry: NamedXContentRegistry,
145145
) : HandledTransportAction<DocLevelMonitorFanOutRequest, DocLevelMonitorFanOutResponse>(
146146
DocLevelMonitorFanOutAction.NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest
147147
),
@@ -773,7 +773,7 @@ class TransportDocLevelMonitorFanOutAction
773773
fieldsToBeQueried: List<String>,
774774
shardList: List<Int>,
775775
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
776-
updateLastRunContext: (String, String) -> Unit
776+
updateLastRunContext: (String, Long) -> Unit
777777
) {
778778
for (shardId in shardList) {
779779
val shard = shardId.toString()
@@ -799,7 +799,7 @@ class TransportDocLevelMonitorFanOutAction
799799

800800
if (maxSeqNo == null || maxSeqNo <= from) {
801801
// No new documents to process
802-
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString())
802+
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED))
803803
continue
804804
}
805805
// Process documents in chunks between prevSeqNo and maxSeqNo
@@ -873,7 +873,7 @@ class TransportDocLevelMonitorFanOutAction
873873
// Move to next chunk - use the last document's sequence number
874874
currentSeqNo = hits.hits.last().seqNo
875875
// update last seen sequence number after every set of seen docs
876-
updateLastRunContext(shard, currentSeqNo.toString())
876+
updateLastRunContext(shard, currentSeqNo)
877877
}
878878
} catch (e: Exception) {
879879
log.error(
Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.opensearch.alerting
22

3-
import org.junit.Assert
3+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope
44
import org.opensearch.alerting.settings.AlertingSettings
55
import org.opensearch.common.unit.TimeValue
66
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
@@ -11,14 +11,12 @@ import java.time.ZonedDateTime
1111
import java.time.format.DateTimeFormatter
1212
import java.time.temporal.ChronoUnit.MILLIS
1313

14+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
1415
class DocLeveFanOutIT : AlertingRestTestCase() {
1516

1617
fun `test execution reaches endtime before completing execution`() {
17-
var updateSettings =
18-
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueNanos(1))
1918
val updateSettings1 = adminClient().updateSettings(AlertingSettings.FINDING_HISTORY_ENABLED.key, false)
2019
logger.info(updateSettings1)
21-
logger.info(updateSettings)
2220
val testIndex = createTestIndex()
2321
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
2422
val testDoc = """{
@@ -41,45 +39,33 @@ class DocLeveFanOutIT : AlertingRestTestCase() {
4139
}
4240

4341
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
44-
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
45-
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
46-
val trigger3 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
47-
val trigger4 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
48-
val trigger5 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
49-
val trigger6 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
5042

5143
val monitor = createMonitor(
5244
randomDocumentLevelMonitor(
5345
inputs = listOf(docLevelInput),
54-
triggers = listOf(trigger, trigger1, trigger2, trigger3, trigger4, trigger5, trigger6)
46+
triggers = listOf(trigger)
5547
)
5648
)
5749
assertNotNull(monitor.id)
58-
50+
executeMonitor(monitor.id)
5951
indexDoc(testIndex, "1", testDoc)
60-
indexDoc(testIndex, "5", testDoc)
52+
indexDoc(testIndex, "2", testDoc)
6153

6254
var response = executeMonitor(monitor.id)
6355

6456
var output = entityAsMap(response)
65-
66-
assertEquals(monitor.name, output["monitor_name"])
67-
@Suppress("UNCHECKED_CAST")
68-
var inputResults = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
69-
Assert.assertTrue(inputResults.isEmpty())
70-
71-
updateSettings =
72-
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueMinutes(4))
73-
logger.info(updateSettings)
74-
75-
response = executeMonitor(monitor.id)
76-
output = entityAsMap(response)
77-
assertEquals(monitor.name, output["monitor_name"])
78-
inputResults = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
79-
@Suppress("UNCHECKED_CAST")
80-
val matchingDocsToQuery = inputResults[docQuery.id] as List<String>
81-
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
82-
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
83-
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))
57+
val findings1 = searchFindings(monitor)
58+
val findingsSize1 = findings1.size
59+
assertEquals(findingsSize1, 2)
60+
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueNanos(1))
61+
executeMonitor(monitor.id)
62+
Thread.sleep(1000)
63+
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueMinutes(4))
64+
indexDoc(testIndex, "3", testDoc)
65+
indexDoc(testIndex, "4", testDoc)
66+
executeMonitor(monitor.id)
67+
val findings = searchFindings(monitor)
68+
val findingsSize = findings.size
69+
assertEquals(findingsSize, 4)
8470
}
8571
}

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,15 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
190190
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
191191

192192
val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
193-
val monitor = randomDocumentLevelMonitor(
194-
inputs = listOf(docLevelInput),
195-
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
193+
val monitor = createMonitor(
194+
randomDocumentLevelMonitor(
195+
inputs = listOf(docLevelInput),
196+
triggers = listOf(
197+
randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))
198+
)
199+
)
196200
)
201+
assertNotNull(monitor.id)
197202

198203
indexDoc(index, "1", testDoc)
199204
indexDoc(index, "2", testDoc)
@@ -207,7 +212,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
207212
indexDoc(index, "51", testDoc)
208213

209214
deleteDoc(index, "51")
210-
val response = executeMonitor(monitor, params = mapOf("dryrun" to "false"))
215+
val response = executeMonitor(monitor.id)
211216

212217
val output = entityAsMap(response)
213218
assertEquals(monitor.name, output["monitor_name"])

0 commit comments

Comments
 (0)