From 4332937ed7bc4788717b51167a4819f562edf163 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 20 Mar 2025 20:04:17 -0700 Subject: [PATCH 1/7] construct dummy request for doc level monitor fanout request on serde failure Signed-off-by: Surya Sashank Nistala --- .../action/DocLevelMonitorFanOutRequest.kt | 100 +++++++++++++++--- .../DocLevelMonitorFanOutRequestTests.kt | 59 +++++++++++ 2 files changed, 147 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index fe5cfe29..46492a9a 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -7,17 +7,24 @@ package org.opensearch.commons.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.IndexExecutionContext +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION import org.opensearch.commons.alerting.model.MonitorMetadata import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.index.shard.ShardId import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.ToXContentObject import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.seqno.SequenceNumbers import java.io.IOException +import java.time.Instant +import java.time.temporal.ChronoUnit class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val monitor: Monitor @@ -29,6 +36,77 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val concreteIndicesSeenSoFar: List val workflowRunContext: WorkflowRunContext? + companion object { + private fun safeReadMonitor(sin: StreamInput): Monitor = + try { + Monitor.readFrom(sin)!! + } catch (e: Exception) { + Monitor( + "failed_serde", NO_VERSION, "failed_serde", true, + IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "", + null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(), + DataSources(), false, false, "failed" + ) + } + + private fun safeReadBoolean(sin: StreamInput): Boolean = + try { + sin.readBoolean() + } catch (e: Exception) { + false + } + + private fun safeReadMonitorMetadata(sin: StreamInput): MonitorMetadata = + try { + MonitorMetadata.readFrom(sin) + } catch (e: Exception) { + MonitorMetadata( + "failed_serde", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + "failed_serde", + emptyList(), + emptyMap(), + mutableMapOf() + ) + } + + private fun safeReadString(sin: StreamInput): String = + try { + sin.readString() + } catch (e: Exception) { + "" + } + + private fun safeReadShardIds(sin: StreamInput): List = + try { + sin.readList(::ShardId) + } catch (e: Exception) { + listOf(ShardId("failed_serde", "failed_serde", 999999)) + } + + private fun safeReadStringList(sin: StreamInput): List = + try { + sin.readStringList() + } catch (e: Exception) { + emptyList() + } + + private fun safeReadWorkflowRunContext(sin: StreamInput): WorkflowRunContext? = + try { + if (sin.readBoolean()) WorkflowRunContext(sin) else null + } catch (e: Exception) { + null + } + + private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? = + try { + IndexExecutionContext(sin) + } catch (e: Exception) { + null + } + } + constructor( monitor: Monitor, dryRun: Boolean, @@ -52,16 +130,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { @Throws(IOException::class) constructor(sin: StreamInput) : this( - monitor = Monitor.readFrom(sin)!!, - dryRun = sin.readBoolean(), - monitorMetadata = MonitorMetadata.readFrom(sin), - executionId = sin.readString(), - shardIds = sin.readList(::ShardId), - concreteIndicesSeenSoFar = sin.readStringList(), - workflowRunContext = if (sin.readBoolean()) { - WorkflowRunContext(sin) - } else { null }, - indexExecutionContext = IndexExecutionContext(sin) + monitor = safeReadMonitor(sin), + dryRun = safeReadBoolean(sin), + monitorMetadata = safeReadMonitorMetadata(sin), + executionId = safeReadString(sin), + shardIds = safeReadShardIds(sin), + concreteIndicesSeenSoFar = safeReadStringList(sin), + workflowRunContext = safeReadWorkflowRunContext(sin), + indexExecutionContext = safeReadIndexExecutionContext(sin) ) @Throws(IOException::class) @@ -88,7 +164,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { @Throws(IOException::class) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder.startObject() + return builder.startObject() .field("monitor", monitor) .field("dry_run", dryRun) .field("execution_id", executionId) @@ -96,6 +172,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { .field("shard_ids", shardIds) .field("concrete_indices", concreteIndicesSeenSoFar) .field("workflow_run_context", workflowRunContext) - return builder.endObject() + .endObject() } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index 1ef82f18..f74fe6e4 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -6,6 +6,8 @@ package org.opensearch.commons.alerting.action import org.junit.Assert.assertEquals +import org.junit.Assert.assertNull +import org.junit.Assert.assertTrue import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.ActionExecutionTime @@ -151,4 +153,61 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) } + + @Test + fun `test serde failure returning dummy object instead of exception`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("return true")) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + val monitorMetadata = MonitorMetadata( + "test", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + Monitor.NO_ID, + listOf(ActionExecutionTime("", Instant.now())), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") + ) + val indexExecutionContext = IndexExecutionContext( + listOf(docQuery), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("index" to mutableMapOf("1" to "1")), + "test-index", + "test-index", + listOf("test-index"), + listOf("test-index"), + listOf("test-field"), + listOf("1", "2") + ) + val workflowRunContext = WorkflowRunContext( + Workflow.NO_ID, + Workflow.NO_ID, + Monitor.NO_ID, + mutableMapOf("index" to listOf("1")), + true, + listOf("finding1") + ) + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + false, + monitorMetadata, + UUID.randomUUID().toString(), + indexExecutionContext, + listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), + listOf("test-index"), + workflowRunContext + ) + val out = BytesStreamOutput() + monitor.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) + assertNull(newDocLevelMonitorFanOutRequest.indexExecutionContext) + } } From 8fc053b9d954556385c2f555625d95679aecd1ee Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 24 Mar 2025 01:37:37 -0700 Subject: [PATCH 2/7] force consume stream in fan out request Signed-off-by: Surya Sashank Nistala --- .../action/DocLevelMonitorFanOutRequest.kt | 36 +++++++++++++++++-- .../DocLevelMonitorFanOutRequestTests.kt | 3 +- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index 46492a9a..5f19fe2a 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.action +import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException import org.opensearch.commons.alerting.model.DataSources @@ -22,11 +23,13 @@ import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.ToXContentObject import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.index.seqno.SequenceNumbers +import java.io.EOFException import java.io.IOException import java.time.Instant import java.time.temporal.ChronoUnit class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { + val monitor: Monitor val dryRun: Boolean val monitorMetadata: MonitorMetadata @@ -37,10 +40,12 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val workflowRunContext: WorkflowRunContext? companion object { + val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java) private fun safeReadMonitor(sin: StreamInput): Monitor = try { Monitor.readFrom(sin)!! } catch (e: Exception) { + log.error("Error parsing monitor in Doc level monitor fanout request", e) Monitor( "failed_serde", NO_VERSION, "failed_serde", true, IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "", @@ -53,6 +58,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readBoolean() } catch (e: Exception) { + log.error("Error parsing boolean in Doc level monitor fanout request", e) false } @@ -60,6 +66,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { MonitorMetadata.readFrom(sin) } catch (e: Exception) { + log.error("Error parsing monitor in Doc level monitor fanout request", e) MonitorMetadata( "failed_serde", SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -75,6 +82,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readString() } catch (e: Exception) { + log.error("Error parsing string in Doc level monitor fanout request", e) "" } @@ -82,6 +90,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readList(::ShardId) } catch (e: Exception) { + log.error("Error parsing shardId list in Doc level monitor fanout request", e) listOf(ShardId("failed_serde", "failed_serde", 999999)) } @@ -89,6 +98,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readStringList() } catch (e: Exception) { + log.error("Error parsing string list in Doc level monitor fanout request", e) emptyList() } @@ -96,15 +106,35 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { if (sin.readBoolean()) WorkflowRunContext(sin) else null } catch (e: Exception) { + log.error("Error parsing workflow context in Doc level monitor fanout request", e) null } - private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? = - try { - IndexExecutionContext(sin) + private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? { + var indexExecutionContext: IndexExecutionContext? = null + return try { + indexExecutionContext = IndexExecutionContext(sin) + while (sin.read() != -1) { + // read and discard bytes until stream is entirely consumed + try { + sin.readByte() + } catch (_: EOFException) { + } + } + return indexExecutionContext + } catch (e: EOFException) { + indexExecutionContext } catch (e: Exception) { + log.error("Error parsing index execution context in Doc level monitor fanout request", e) + while (sin.read() != -1) { + try { // read and throw bytes until stream is entirely consumed + sin.readByte() + } catch (_: EOFException) { + } + } null } + } } constructor( diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index f74fe6e4..3c05225c 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -7,7 +7,6 @@ package org.opensearch.commons.alerting.action import org.junit.Assert.assertEquals import org.junit.Assert.assertNull -import org.junit.Assert.assertTrue import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.ActionExecutionTime @@ -82,6 +81,7 @@ class DocLevelMonitorFanOutRequestTests { ) val out = BytesStreamOutput() docLevelMonitorFanOutRequest.writeTo(out) + monitor.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor) @@ -90,6 +90,7 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + assertEquals(sin.read(), -1) } @Test From d30dac58612bb726e0976c540b2fcc3766584936 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 24 Mar 2025 17:13:39 -0700 Subject: [PATCH 3/7] adds flag for capturing serialization failure in Doc level monitor fan out request Signed-off-by: Surya Sashank Nistala --- .../action/DocLevelMonitorFanOutRequest.kt | 18 ++++++++++++++++-- .../DocLevelMonitorFanOutRequestTests.kt | 8 +++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index 5f19fe2a..ad6c944a 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -38,13 +38,17 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val shardIds: List val concreteIndicesSeenSoFar: List val workflowRunContext: WorkflowRunContext? + val hasSerializationFailed: Boolean companion object { + // flag flipped to true whenever a safeRead*() method fails to serialize a field correctly + private var serializationFailedFlag: Boolean = false val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java) private fun safeReadMonitor(sin: StreamInput): Monitor = try { Monitor.readFrom(sin)!! } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing monitor in Doc level monitor fanout request", e) Monitor( "failed_serde", NO_VERSION, "failed_serde", true, @@ -58,6 +62,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readBoolean() } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing boolean in Doc level monitor fanout request", e) false } @@ -66,6 +71,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { MonitorMetadata.readFrom(sin) } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing monitor in Doc level monitor fanout request", e) MonitorMetadata( "failed_serde", @@ -82,6 +88,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readString() } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing string in Doc level monitor fanout request", e) "" } @@ -90,6 +97,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readList(::ShardId) } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing shardId list in Doc level monitor fanout request", e) listOf(ShardId("failed_serde", "failed_serde", 999999)) } @@ -98,6 +106,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { sin.readStringList() } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing string list in Doc level monitor fanout request", e) emptyList() } @@ -106,6 +115,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { try { if (sin.readBoolean()) WorkflowRunContext(sin) else null } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing workflow context in Doc level monitor fanout request", e) null } @@ -125,6 +135,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { } catch (e: EOFException) { indexExecutionContext } catch (e: Exception) { + serializationFailedFlag = true log.error("Error parsing index execution context in Doc level monitor fanout request", e) while (sin.read() != -1) { try { // read and throw bytes until stream is entirely consumed @@ -145,7 +156,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { indexExecutionContext: IndexExecutionContext?, shardIds: List, concreteIndicesSeenSoFar: List, - workflowRunContext: WorkflowRunContext? + workflowRunContext: WorkflowRunContext?, + hasSerializationFailed: Boolean? = null ) : super() { this.monitor = monitor this.dryRun = dryRun @@ -156,6 +168,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar this.workflowRunContext = workflowRunContext require(false == shardIds.isEmpty()) { } + this.hasSerializationFailed = hasSerializationFailed ?: false } @Throws(IOException::class) @@ -167,7 +180,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { shardIds = safeReadShardIds(sin), concreteIndicesSeenSoFar = safeReadStringList(sin), workflowRunContext = safeReadWorkflowRunContext(sin), - indexExecutionContext = safeReadIndexExecutionContext(sin) + indexExecutionContext = safeReadIndexExecutionContext(sin), + hasSerializationFailed = serializationFailedFlag ) @Throws(IOException::class) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index 3c05225c..2aadac2a 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -6,7 +6,9 @@ package org.opensearch.commons.alerting.action import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse import org.junit.Assert.assertNull +import org.junit.Assert.assertTrue import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.ActionExecutionTime @@ -81,7 +83,6 @@ class DocLevelMonitorFanOutRequestTests { ) val out = BytesStreamOutput() docLevelMonitorFanOutRequest.writeTo(out) - monitor.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor) @@ -91,6 +92,7 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) assertEquals(sin.read(), -1) + assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed) } @Test @@ -153,6 +155,8 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed) + assertEquals(sin.read(), -1) } @Test @@ -210,5 +214,7 @@ class DocLevelMonitorFanOutRequestTests { val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) assertNull(newDocLevelMonitorFanOutRequest.indexExecutionContext) + assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed) + assertEquals(sin.read(), -1) } } From d87d83eb65a783237d522f69fe4e0002f674abaf Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 25 Mar 2025 00:13:06 -0700 Subject: [PATCH 4/7] initialize serde failed flag to false Signed-off-by: Surya Sashank Nistala --- .../action/DocLevelMonitorFanOutRequest.kt | 36 ++++++++-- .../DocLevelMonitorFanOutRequestTests.kt | 68 ++++++++++++++++++- 2 files changed, 96 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index ad6c944a..b2740d95 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -40,6 +40,9 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val workflowRunContext: WorkflowRunContext? val hasSerializationFailed: Boolean + init { + serializationFailedFlag = false + } companion object { // flag flipped to true whenever a safeRead*() method fails to serialize a field correctly private var serializationFailedFlag: Boolean = false @@ -99,7 +102,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { } catch (e: Exception) { serializationFailedFlag = true log.error("Error parsing shardId list in Doc level monitor fanout request", e) - listOf(ShardId("failed_serde", "failed_serde", 999999)) + emptyList() } private fun safeReadStringList(sin: StreamInput): List = @@ -124,7 +127,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { var indexExecutionContext: IndexExecutionContext? = null return try { indexExecutionContext = IndexExecutionContext(sin) - while (sin.read() != -1) { + while (sin.read() != 0) { + serializationFailedFlag = true // read and discard bytes until stream is entirely consumed try { sin.readByte() @@ -137,7 +141,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { } catch (e: Exception) { serializationFailedFlag = true log.error("Error parsing index execution context in Doc level monitor fanout request", e) - while (sin.read() != -1) { + while (sin.read() != 0) { try { // read and throw bytes until stream is entirely consumed sin.readByte() } catch (_: EOFException) { @@ -148,7 +152,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { } } - constructor( + private constructor( monitor: Monitor, dryRun: Boolean, monitorMetadata: MonitorMetadata, @@ -157,7 +161,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { shardIds: List, concreteIndicesSeenSoFar: List, workflowRunContext: WorkflowRunContext?, - hasSerializationFailed: Boolean? = null + hasSerializationFailed: Boolean ) : super() { this.monitor = monitor this.dryRun = dryRun @@ -167,10 +171,30 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { this.shardIds = shardIds this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar this.workflowRunContext = workflowRunContext - require(false == shardIds.isEmpty()) { } this.hasSerializationFailed = hasSerializationFailed ?: false } + constructor( + monitor: Monitor, + dryRun: Boolean, + monitorMetadata: MonitorMetadata, + executionId: String, + indexExecutionContext: IndexExecutionContext?, + shardIds: List, + concreteIndicesSeenSoFar: List, + workflowRunContext: WorkflowRunContext? + ) : super() { + this.monitor = monitor + this.dryRun = dryRun + this.monitorMetadata = monitorMetadata + this.executionId = executionId + this.indexExecutionContext = indexExecutionContext + this.shardIds = shardIds + this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar + this.workflowRunContext = workflowRunContext + this.hasSerializationFailed = false + } + @Throws(IOException::class) constructor(sin: StreamInput) : this( monitor = safeReadMonitor(sin), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index 2aadac2a..c0842e52 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -91,7 +91,7 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) - assertEquals(sin.read(), -1) + assertEquals(sin.read(), 0) assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed) } @@ -156,7 +156,7 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed) - assertEquals(sin.read(), -1) + assertEquals(sin.read().toString(), sin.read(), 0) } @Test @@ -217,4 +217,68 @@ class DocLevelMonitorFanOutRequestTests { assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed) assertEquals(sin.read(), -1) } + + @Test + fun `test doc level monitor fan out request as stream when there are additional bytes left to handle`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("return true")) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + val monitorMetadata = MonitorMetadata( + "test", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + Monitor.NO_ID, + listOf(ActionExecutionTime("", Instant.now())), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") + ) + val indexExecutionContext = IndexExecutionContext( + listOf(docQuery), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("index" to mutableMapOf("1" to "1")), + "test-index", + "test-index", + listOf("test-index"), + listOf("test-index"), + listOf("test-field"), + listOf("1", "2") + ) + val workflowRunContext = WorkflowRunContext( + Workflow.NO_ID, + Workflow.NO_ID, + Monitor.NO_ID, + mutableMapOf("index" to listOf("1")), + true + ) + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + false, + monitorMetadata, + UUID.randomUUID().toString(), + indexExecutionContext, + listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), + listOf("test-index"), + workflowRunContext + ) + val out = BytesStreamOutput() + docLevelMonitorFanOutRequest.writeTo(out) + out.writeByte(Byte.MIN_VALUE) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) + assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor) + assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId) + assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata) + assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) + assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) + assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + assertEquals(sin.read(), 0) + assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed) + } } From aae19d4450eb5b25d32da80774aae114a12e590c Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 25 Mar 2025 16:11:19 -0700 Subject: [PATCH 5/7] change serde failure handling to expect failure only in monitor object for Doc Level monitor fanout request Signed-off-by: Surya Sashank Nistala --- .../action/DocLevelMonitorFanOutRequest.kt | 197 +++++------------- .../DocLevelMonitorFanOutRequestTests.kt | 68 +----- 2 files changed, 58 insertions(+), 207 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index b2740d95..634fad67 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -40,138 +40,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val workflowRunContext: WorkflowRunContext? val hasSerializationFailed: Boolean - init { - serializationFailedFlag = false - } companion object { - // flag flipped to true whenever a safeRead*() method fails to serialize a field correctly - private var serializationFailedFlag: Boolean = false val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java) - private fun safeReadMonitor(sin: StreamInput): Monitor = - try { - Monitor.readFrom(sin)!! - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing monitor in Doc level monitor fanout request", e) - Monitor( - "failed_serde", NO_VERSION, "failed_serde", true, - IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "", - null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(), - DataSources(), false, false, "failed" - ) - } - - private fun safeReadBoolean(sin: StreamInput): Boolean = - try { - sin.readBoolean() - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing boolean in Doc level monitor fanout request", e) - false - } - - private fun safeReadMonitorMetadata(sin: StreamInput): MonitorMetadata = - try { - MonitorMetadata.readFrom(sin) - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing monitor in Doc level monitor fanout request", e) - MonitorMetadata( - "failed_serde", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM, - "failed_serde", - emptyList(), - emptyMap(), - mutableMapOf() - ) - } - - private fun safeReadString(sin: StreamInput): String = - try { - sin.readString() - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing string in Doc level monitor fanout request", e) - "" - } - - private fun safeReadShardIds(sin: StreamInput): List = - try { - sin.readList(::ShardId) - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing shardId list in Doc level monitor fanout request", e) - emptyList() - } - - private fun safeReadStringList(sin: StreamInput): List = - try { - sin.readStringList() - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing string list in Doc level monitor fanout request", e) - emptyList() - } - - private fun safeReadWorkflowRunContext(sin: StreamInput): WorkflowRunContext? = - try { - if (sin.readBoolean()) WorkflowRunContext(sin) else null - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing workflow context in Doc level monitor fanout request", e) - null - } - - private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? { - var indexExecutionContext: IndexExecutionContext? = null - return try { - indexExecutionContext = IndexExecutionContext(sin) - while (sin.read() != 0) { - serializationFailedFlag = true - // read and discard bytes until stream is entirely consumed - try { - sin.readByte() - } catch (_: EOFException) { - } - } - return indexExecutionContext - } catch (e: EOFException) { - indexExecutionContext - } catch (e: Exception) { - serializationFailedFlag = true - log.error("Error parsing index execution context in Doc level monitor fanout request", e) - while (sin.read() != 0) { - try { // read and throw bytes until stream is entirely consumed - sin.readByte() - } catch (_: EOFException) { - } - } - null - } - } - } - - private constructor( - monitor: Monitor, - dryRun: Boolean, - monitorMetadata: MonitorMetadata, - executionId: String, - indexExecutionContext: IndexExecutionContext?, - shardIds: List, - concreteIndicesSeenSoFar: List, - workflowRunContext: WorkflowRunContext?, - hasSerializationFailed: Boolean - ) : super() { - this.monitor = monitor - this.dryRun = dryRun - this.monitorMetadata = monitorMetadata - this.executionId = executionId - this.indexExecutionContext = indexExecutionContext - this.shardIds = shardIds - this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar - this.workflowRunContext = workflowRunContext - this.hasSerializationFailed = hasSerializationFailed ?: false } constructor( @@ -196,16 +66,63 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { } @Throws(IOException::class) - constructor(sin: StreamInput) : this( - monitor = safeReadMonitor(sin), - dryRun = safeReadBoolean(sin), - monitorMetadata = safeReadMonitorMetadata(sin), - executionId = safeReadString(sin), - shardIds = safeReadShardIds(sin), - concreteIndicesSeenSoFar = safeReadStringList(sin), - workflowRunContext = safeReadWorkflowRunContext(sin), - indexExecutionContext = safeReadIndexExecutionContext(sin), - hasSerializationFailed = serializationFailedFlag + constructor(sin: StreamInput) : super() { + var monitorSerializationSucceeded = true + var parsedMonitor = getDummyMonitor() + try { + parsedMonitor = Monitor(sin) + } catch (e: Exception) { + log.error("Error parsing monitor in Doc level monitor fanout request", e) + monitorSerializationSucceeded = false + log.info("Force consuming stream in Doc level monitor fanout request") + while (sin.read() != 0) { + // read and discard bytes until stream is entirely consumed + try { + sin.readByte() + } catch (_: EOFException) { + } + } + } + if (monitorSerializationSucceeded) { + this.monitor = parsedMonitor + this.dryRun = sin.readBoolean() + this.monitorMetadata = MonitorMetadata.readFrom(sin) + this.executionId = sin.readString() + this.shardIds = sin.readList(::ShardId) + this.concreteIndicesSeenSoFar = sin.readStringList() + this.workflowRunContext = if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else { + null + } + indexExecutionContext = IndexExecutionContext(sin) + this.hasSerializationFailed = false == monitorSerializationSucceeded + } else { + this.monitor = parsedMonitor + this.dryRun = false + this.monitorMetadata = MonitorMetadata( + "failed_serde", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + "failed_serde", + emptyList(), + emptyMap(), + mutableMapOf() + ) + this.executionId = "" + this.shardIds = emptyList() + this.concreteIndicesSeenSoFar = emptyList() + this.workflowRunContext = null + this.indexExecutionContext = null + this.hasSerializationFailed = false == monitorSerializationSucceeded + } + } + + private fun getDummyMonitor() = Monitor( + "failed_serde", NO_VERSION, "failed_serde", true, + IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "", + null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(), + DataSources(), false, false, "failed" ) @Throws(IOException::class) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index c0842e52..75b9b364 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -7,7 +7,6 @@ package org.opensearch.commons.alerting.action import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse -import org.junit.Assert.assertNull import org.junit.Assert.assertTrue import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput @@ -164,65 +163,6 @@ class DocLevelMonitorFanOutRequestTests { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) - val trigger = randomDocumentLevelTrigger(condition = Script("return true")) - val monitor = randomDocumentLevelMonitor( - inputs = listOf(docLevelInput), - triggers = listOf(trigger), - enabled = true, - schedule = IntervalSchedule(1, ChronoUnit.MINUTES) - ) - val monitorMetadata = MonitorMetadata( - "test", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM, - Monitor.NO_ID, - listOf(ActionExecutionTime("", Instant.now())), - mutableMapOf("index" to mutableMapOf("1" to "1")), - mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") - ) - val indexExecutionContext = IndexExecutionContext( - listOf(docQuery), - mutableMapOf("index" to mutableMapOf("1" to "1")), - mutableMapOf("index" to mutableMapOf("1" to "1")), - "test-index", - "test-index", - listOf("test-index"), - listOf("test-index"), - listOf("test-field"), - listOf("1", "2") - ) - val workflowRunContext = WorkflowRunContext( - Workflow.NO_ID, - Workflow.NO_ID, - Monitor.NO_ID, - mutableMapOf("index" to listOf("1")), - true, - listOf("finding1") - ) - val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( - monitor, - false, - monitorMetadata, - UUID.randomUUID().toString(), - indexExecutionContext, - listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), - listOf("test-index"), - workflowRunContext - ) - val out = BytesStreamOutput() - monitor.writeTo(out) - val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) - assertNull(newDocLevelMonitorFanOutRequest.indexExecutionContext) - assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed) - assertEquals(sin.read(), -1) - } - - @Test - fun `test doc level monitor fan out request as stream when there are additional bytes left to handle`() { - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") - val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) - val trigger = randomDocumentLevelTrigger(condition = Script("return true")) val monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), @@ -268,16 +208,10 @@ class DocLevelMonitorFanOutRequestTests { workflowRunContext ) val out = BytesStreamOutput() + out.writeString(UUID.randomUUID().toString()) docLevelMonitorFanOutRequest.writeTo(out) - out.writeByte(Byte.MIN_VALUE) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) - assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor) - assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId) - assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata) - assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) - assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) - assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) assertEquals(sin.read(), 0) assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed) } From 2c3155ef0233b924c8c8333255dd7e381262d232 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 26 Mar 2025 12:22:15 -0700 Subject: [PATCH 6/7] add catch for all serde failures Signed-off-by: Surya Sashank Nistala --- .../action/DocLevelMonitorFanOutRequest.kt | 71 ++++++++++--------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index 634fad67..96c54058 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -8,11 +8,11 @@ package org.opensearch.commons.alerting.action import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.commons.alerting.model.Alert.Companion.NO_VERSION import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.IndexExecutionContext import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor -import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION import org.opensearch.commons.alerting.model.MonitorMetadata import org.opensearch.commons.alerting.model.WorkflowRunContext import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION @@ -69,8 +69,34 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { constructor(sin: StreamInput) : super() { var monitorSerializationSucceeded = true var parsedMonitor = getDummyMonitor() + var parsedDryRun = false + var parsedMonitorMetadata: MonitorMetadata = MonitorMetadata( + "failed_serde", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + "failed_serde", + emptyList(), + emptyMap(), + mutableMapOf() + ) + var parsedShardIds: List = emptyList() + var parsedConcreteIndicesSeenSoFar = mutableListOf() + var parsedExecutionId: String = "" + var parsedWorkflowContext: WorkflowRunContext? = null + var parsedIndexExecutionContext: IndexExecutionContext? = null try { parsedMonitor = Monitor(sin) + parsedDryRun = sin.readBoolean() + parsedMonitorMetadata = MonitorMetadata.readFrom(sin) + parsedExecutionId = sin.readString() + parsedShardIds = sin.readList(::ShardId) + parsedConcreteIndicesSeenSoFar = sin.readStringList() + parsedWorkflowContext = if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else { + null + } + parsedIndexExecutionContext = IndexExecutionContext(sin) } catch (e: Exception) { log.error("Error parsing monitor in Doc level monitor fanout request", e) monitorSerializationSucceeded = false @@ -83,39 +109,16 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { } } } - if (monitorSerializationSucceeded) { - this.monitor = parsedMonitor - this.dryRun = sin.readBoolean() - this.monitorMetadata = MonitorMetadata.readFrom(sin) - this.executionId = sin.readString() - this.shardIds = sin.readList(::ShardId) - this.concreteIndicesSeenSoFar = sin.readStringList() - this.workflowRunContext = if (sin.readBoolean()) { - WorkflowRunContext(sin) - } else { - null - } - indexExecutionContext = IndexExecutionContext(sin) - this.hasSerializationFailed = false == monitorSerializationSucceeded - } else { - this.monitor = parsedMonitor - this.dryRun = false - this.monitorMetadata = MonitorMetadata( - "failed_serde", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM, - "failed_serde", - emptyList(), - emptyMap(), - mutableMapOf() - ) - this.executionId = "" - this.shardIds = emptyList() - this.concreteIndicesSeenSoFar = emptyList() - this.workflowRunContext = null - this.indexExecutionContext = null - this.hasSerializationFailed = false == monitorSerializationSucceeded - } + + this.monitor = parsedMonitor + this.dryRun = parsedDryRun + this.monitorMetadata = parsedMonitorMetadata + this.executionId = parsedExecutionId + this.shardIds = parsedShardIds + this.concreteIndicesSeenSoFar = parsedConcreteIndicesSeenSoFar + this.workflowRunContext = parsedWorkflowContext + this.indexExecutionContext = parsedIndexExecutionContext + this.hasSerializationFailed = false == monitorSerializationSucceeded } private fun getDummyMonitor() = Monitor( From 06f8d688639f9a1ed5514ae6202d83b8b27d1eee Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 26 Mar 2025 14:35:33 -0700 Subject: [PATCH 7/7] add version checks in serde flows for Fan out request changes since 2.15 Signed-off-by: Surya Sashank Nistala --- .../alerting/model/DocLevelMonitorInput.kt | 7 +++++-- .../alerting/model/IndexExecutionContext.kt | 7 +++++-- .../commons/alerting/model/Monitor.kt | 21 +++++++++++++++---- .../alerting/model/WorkflowRunContext.kt | 17 ++++++++------- 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt index fd67007a..88483f27 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.common.CheckedFunction import org.opensearch.core.ParseField import org.opensearch.core.common.io.stream.StreamInput @@ -23,7 +24,7 @@ data class DocLevelMonitorInput( sin.readString(), // description sin.readStringList(), // indices sin.readList(::DocLevelQuery), // docLevelQueries - sin.readOptionalBoolean() // fanoutEnabled + if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalBoolean() else true // fanoutEnabled ) override fun asTemplateArg(): Map { @@ -43,7 +44,9 @@ data class DocLevelMonitorInput( out.writeString(description) out.writeStringCollection(indices) out.writeCollection(queries) - out.writeOptionalBoolean(fanoutEnabled) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalBoolean(fanoutEnabled) + } } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt index 4ecf1e67..6b104d13 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable @@ -36,7 +37,7 @@ data class IndexExecutionContext( concreteIndexNames = sin.readStringList(), conflictingFields = sin.readStringList(), docIds = sin.readOptionalStringList(), - findingIds = sin.readOptionalStringList() + findingIds = if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalStringList() else emptyList() ) override fun writeTo(out: StreamOutput?) { @@ -49,7 +50,9 @@ data class IndexExecutionContext( out.writeStringCollection(concreteIndexNames) out.writeStringCollection(conflictingFields) out.writeOptionalStringCollection(docIds) - out.writeOptionalStringCollection(findingIds) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalStringCollection(findingIds) + } } override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index a0a5ed5b..18fdde5c 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.common.CheckedFunction import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.alerting.util.IndexUtils.Companion.MONITOR_MAX_INPUTS @@ -112,8 +113,16 @@ data class Monitor( } else { DataSources() }, - deleteQueryIndexInEveryRun = sin.readOptionalBoolean(), - shouldCreateSingleAlertForFindings = sin.readOptionalBoolean(), + deleteQueryIndexInEveryRun = if (sin.version.onOrAfter(Version.V_2_15_0)) { + sin.readOptionalBoolean() + } else { + false + }, + shouldCreateSingleAlertForFindings = if (sin.version.onOrAfter(Version.V_2_15_0)) { + sin.readOptionalBoolean() + } else { + false + }, owner = sin.readOptionalString() ) @@ -226,8 +235,12 @@ data class Monitor( out.writeMap(uiMetadata) out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field dataSources.writeTo(out) - out.writeOptionalBoolean(deleteQueryIndexInEveryRun) - out.writeOptionalBoolean(shouldCreateSingleAlertForFindings) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalBoolean(deleteQueryIndexInEveryRun) + } + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalBoolean(shouldCreateSingleAlertForFindings) + } out.writeOptionalString(owner) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt index 5d3cd7c1..a83b8815 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable @@ -28,12 +29,12 @@ data class WorkflowRunContext( } constructor(sin: StreamInput) : this( - sin.readString(), - sin.readString(), - sin.readOptionalString(), - sin.readMap() as Map>, - sin.readBoolean(), - sin.readOptionalStringList() + workflowId = sin.readString(), + workflowMetadataId = sin.readString(), + chainedMonitorId = sin.readOptionalString(), + matchingDocIdsPerIndex = sin.readMap() as Map>, + auditDelegateMonitorAlerts = sin.readBoolean(), + findingIds = if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalStringList() else emptyList() ) override fun writeTo(out: StreamOutput) { @@ -42,7 +43,9 @@ data class WorkflowRunContext( out.writeOptionalString(chainedMonitorId) out.writeMap(matchingDocIdsPerIndex) out.writeBoolean(auditDelegateMonitorAlerts) - out.writeOptionalStringCollection(findingIds) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalStringCollection(findingIds) + } } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {