Skip to content

Commit c18d7c7

Browse files
committed
change serde failure handling to expect failure only in monitor object for Doc Level monitor fanout request
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent d87d83e commit c18d7c7

File tree

2 files changed

+58
-112
lines changed

2 files changed

+58
-112
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,13 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
4040
val workflowRunContext: WorkflowRunContext?
4141
val hasSerializationFailed: Boolean
4242

43-
init {
44-
serializationFailedFlag = false
45-
}
4643
companion object {
4744
// flag flipped to true whenever a safeRead*() method fails to serialize a field correctly
48-
private var serializationFailedFlag: Boolean = false
4945
val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
5046
private fun safeReadMonitor(sin: StreamInput): Monitor =
5147
try {
5248
Monitor.readFrom(sin)!!
5349
} catch (e: Exception) {
54-
serializationFailedFlag = true
5550
log.error("Error parsing monitor in Doc level monitor fanout request", e)
5651
Monitor(
5752
"failed_serde", NO_VERSION, "failed_serde", true,
@@ -65,7 +60,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
6560
try {
6661
sin.readBoolean()
6762
} catch (e: Exception) {
68-
serializationFailedFlag = true
6963
log.error("Error parsing boolean in Doc level monitor fanout request", e)
7064
false
7165
}
@@ -74,7 +68,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
7468
try {
7569
MonitorMetadata.readFrom(sin)
7670
} catch (e: Exception) {
77-
serializationFailedFlag = true
7871
log.error("Error parsing monitor in Doc level monitor fanout request", e)
7972
MonitorMetadata(
8073
"failed_serde",
@@ -91,7 +84,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
9184
try {
9285
sin.readString()
9386
} catch (e: Exception) {
94-
serializationFailedFlag = true
9587
log.error("Error parsing string in Doc level monitor fanout request", e)
9688
""
9789
}
@@ -100,7 +92,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
10092
try {
10193
sin.readList(::ShardId)
10294
} catch (e: Exception) {
103-
serializationFailedFlag = true
10495
log.error("Error parsing shardId list in Doc level monitor fanout request", e)
10596
emptyList()
10697
}
@@ -109,7 +100,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
109100
try {
110101
sin.readStringList()
111102
} catch (e: Exception) {
112-
serializationFailedFlag = true
113103
log.error("Error parsing string list in Doc level monitor fanout request", e)
114104
emptyList()
115105
}
@@ -118,7 +108,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
118108
try {
119109
if (sin.readBoolean()) WorkflowRunContext(sin) else null
120110
} catch (e: Exception) {
121-
serializationFailedFlag = true
122111
log.error("Error parsing workflow context in Doc level monitor fanout request", e)
123112
null
124113
}
@@ -128,7 +117,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
128117
return try {
129118
indexExecutionContext = IndexExecutionContext(sin)
130119
while (sin.read() != 0) {
131-
serializationFailedFlag = true
132120
// read and discard bytes until stream is entirely consumed
133121
try {
134122
sin.readByte()
@@ -139,7 +127,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
139127
} catch (e: EOFException) {
140128
indexExecutionContext
141129
} catch (e: Exception) {
142-
serializationFailedFlag = true
143130
log.error("Error parsing index execution context in Doc level monitor fanout request", e)
144131
while (sin.read() != 0) {
145132
try { // read and throw bytes until stream is entirely consumed
@@ -152,28 +139,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
152139
}
153140
}
154141

155-
private constructor(
156-
monitor: Monitor,
157-
dryRun: Boolean,
158-
monitorMetadata: MonitorMetadata,
159-
executionId: String,
160-
indexExecutionContext: IndexExecutionContext?,
161-
shardIds: List<ShardId>,
162-
concreteIndicesSeenSoFar: List<String>,
163-
workflowRunContext: WorkflowRunContext?,
164-
hasSerializationFailed: Boolean
165-
) : super() {
166-
this.monitor = monitor
167-
this.dryRun = dryRun
168-
this.monitorMetadata = monitorMetadata
169-
this.executionId = executionId
170-
this.indexExecutionContext = indexExecutionContext
171-
this.shardIds = shardIds
172-
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
173-
this.workflowRunContext = workflowRunContext
174-
this.hasSerializationFailed = hasSerializationFailed ?: false
175-
}
176-
177142
constructor(
178143
monitor: Monitor,
179144
dryRun: Boolean,
@@ -196,16 +161,63 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
196161
}
197162

198163
@Throws(IOException::class)
199-
constructor(sin: StreamInput) : this(
200-
monitor = safeReadMonitor(sin),
201-
dryRun = safeReadBoolean(sin),
202-
monitorMetadata = safeReadMonitorMetadata(sin),
203-
executionId = safeReadString(sin),
204-
shardIds = safeReadShardIds(sin),
205-
concreteIndicesSeenSoFar = safeReadStringList(sin),
206-
workflowRunContext = safeReadWorkflowRunContext(sin),
207-
indexExecutionContext = safeReadIndexExecutionContext(sin),
208-
hasSerializationFailed = serializationFailedFlag
164+
constructor(sin: StreamInput) : super() {
165+
var monitorSerializationSucceded = true
166+
var parsedMonitor = getDummyMonitor()
167+
try {
168+
parsedMonitor = Monitor(sin)
169+
} catch (e: Exception) {
170+
log.error("Error parsing monitor in Doc level monitor fanout request", e)
171+
monitorSerializationSucceded = false
172+
log.info("Force consuming stream in Doc level monitor fanout request")
173+
while (sin.read() != 0) {
174+
// read and discard bytes until stream is entirely consumed
175+
try {
176+
sin.readByte()
177+
} catch (_: EOFException) {
178+
}
179+
}
180+
}
181+
if (monitorSerializationSucceded) {
182+
this.monitor = parsedMonitor
183+
this.dryRun = sin.readBoolean()
184+
this.monitorMetadata = MonitorMetadata.readFrom(sin)
185+
this.executionId = sin.readString()
186+
this.shardIds = sin.readList(::ShardId)
187+
this.concreteIndicesSeenSoFar = sin.readStringList()
188+
this.workflowRunContext = if (sin.readBoolean()) {
189+
WorkflowRunContext(sin)
190+
} else {
191+
null
192+
}
193+
indexExecutionContext = IndexExecutionContext(sin)
194+
this.hasSerializationFailed = false == monitorSerializationSucceded
195+
} else {
196+
this.monitor = parsedMonitor
197+
this.dryRun = false
198+
this.monitorMetadata = MonitorMetadata(
199+
"failed_serde",
200+
SequenceNumbers.UNASSIGNED_SEQ_NO,
201+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
202+
"failed_serde",
203+
emptyList(),
204+
emptyMap(),
205+
mutableMapOf()
206+
)
207+
this.executionId = ""
208+
this.shardIds = emptyList()
209+
this.concreteIndicesSeenSoFar = emptyList()
210+
this.workflowRunContext = null
211+
this.indexExecutionContext = null
212+
this.hasSerializationFailed = false == monitorSerializationSucceded
213+
}
214+
}
215+
216+
private fun getDummyMonitor() = Monitor(
217+
"failed_serde", NO_VERSION, "failed_serde", true,
218+
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
219+
null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(),
220+
DataSources(), false, false, "failed"
209221
)
210222

211223
@Throws(IOException::class)

src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package org.opensearch.commons.alerting.action
77

88
import org.junit.Assert.assertEquals
99
import org.junit.Assert.assertFalse
10-
import org.junit.Assert.assertNull
1110
import org.junit.Assert.assertTrue
1211
import org.junit.jupiter.api.Test
1312
import org.opensearch.common.io.stream.BytesStreamOutput
@@ -164,65 +163,6 @@ class DocLevelMonitorFanOutRequestTests {
164163
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
165164
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
166165

167-
val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
168-
val monitor = randomDocumentLevelMonitor(
169-
inputs = listOf(docLevelInput),
170-
triggers = listOf(trigger),
171-
enabled = true,
172-
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
173-
)
174-
val monitorMetadata = MonitorMetadata(
175-
"test",
176-
SequenceNumbers.UNASSIGNED_SEQ_NO,
177-
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
178-
Monitor.NO_ID,
179-
listOf(ActionExecutionTime("", Instant.now())),
180-
mutableMapOf("index" to mutableMapOf("1" to "1")),
181-
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
182-
)
183-
val indexExecutionContext = IndexExecutionContext(
184-
listOf(docQuery),
185-
mutableMapOf("index" to mutableMapOf("1" to "1")),
186-
mutableMapOf("index" to mutableMapOf("1" to "1")),
187-
"test-index",
188-
"test-index",
189-
listOf("test-index"),
190-
listOf("test-index"),
191-
listOf("test-field"),
192-
listOf("1", "2")
193-
)
194-
val workflowRunContext = WorkflowRunContext(
195-
Workflow.NO_ID,
196-
Workflow.NO_ID,
197-
Monitor.NO_ID,
198-
mutableMapOf("index" to listOf("1")),
199-
true,
200-
listOf("finding1")
201-
)
202-
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
203-
monitor,
204-
false,
205-
monitorMetadata,
206-
UUID.randomUUID().toString(),
207-
indexExecutionContext,
208-
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
209-
listOf("test-index"),
210-
workflowRunContext
211-
)
212-
val out = BytesStreamOutput()
213-
monitor.writeTo(out)
214-
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
215-
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
216-
assertNull(newDocLevelMonitorFanOutRequest.indexExecutionContext)
217-
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
218-
assertEquals(sin.read(), -1)
219-
}
220-
221-
@Test
222-
fun `test doc level monitor fan out request as stream when there are additional bytes left to handle`() {
223-
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
224-
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
225-
226166
val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
227167
val monitor = randomDocumentLevelMonitor(
228168
inputs = listOf(docLevelInput),
@@ -268,16 +208,10 @@ class DocLevelMonitorFanOutRequestTests {
268208
workflowRunContext
269209
)
270210
val out = BytesStreamOutput()
211+
out.writeString(UUID.randomUUID().toString())
271212
docLevelMonitorFanOutRequest.writeTo(out)
272-
out.writeByte(Byte.MIN_VALUE)
273213
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
274214
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
275-
assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)
276-
assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId)
277-
assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata)
278-
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
279-
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
280-
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
281215
assertEquals(sin.read(), 0)
282216
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
283217
}

0 commit comments

Comments
 (0)