Skip to content

Commit 8fc053b

Browse files
committed
force consume stream in fan out request
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 4332937 commit 8fc053b

File tree

2 files changed

+35
-4
lines changed

2 files changed

+35
-4
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.commons.alerting.action
77

8+
import org.apache.logging.log4j.LogManager
89
import org.opensearch.action.ActionRequest
910
import org.opensearch.action.ActionRequestValidationException
1011
import org.opensearch.commons.alerting.model.DataSources
@@ -22,11 +23,13 @@ import org.opensearch.core.xcontent.ToXContent
2223
import org.opensearch.core.xcontent.ToXContentObject
2324
import org.opensearch.core.xcontent.XContentBuilder
2425
import org.opensearch.index.seqno.SequenceNumbers
26+
import java.io.EOFException
2527
import java.io.IOException
2628
import java.time.Instant
2729
import java.time.temporal.ChronoUnit
2830

2931
class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
32+
3033
val monitor: Monitor
3134
val dryRun: Boolean
3235
val monitorMetadata: MonitorMetadata
@@ -37,10 +40,12 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
3740
val workflowRunContext: WorkflowRunContext?
3841

3942
companion object {
43+
val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
4044
private fun safeReadMonitor(sin: StreamInput): Monitor =
4145
try {
4246
Monitor.readFrom(sin)!!
4347
} catch (e: Exception) {
48+
log.error("Error parsing monitor in Doc level monitor fanout request", e)
4449
Monitor(
4550
"failed_serde", NO_VERSION, "failed_serde", true,
4651
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
@@ -53,13 +58,15 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
5358
try {
5459
sin.readBoolean()
5560
} catch (e: Exception) {
61+
log.error("Error parsing boolean in Doc level monitor fanout request", e)
5662
false
5763
}
5864

5965
private fun safeReadMonitorMetadata(sin: StreamInput): MonitorMetadata =
6066
try {
6167
MonitorMetadata.readFrom(sin)
6268
} catch (e: Exception) {
69+
log.error("Error parsing monitor in Doc level monitor fanout request", e)
6370
MonitorMetadata(
6471
"failed_serde",
6572
SequenceNumbers.UNASSIGNED_SEQ_NO,
@@ -75,36 +82,59 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
7582
try {
7683
sin.readString()
7784
} catch (e: Exception) {
85+
log.error("Error parsing string in Doc level monitor fanout request", e)
7886
""
7987
}
8088

8189
private fun safeReadShardIds(sin: StreamInput): List<ShardId> =
8290
try {
8391
sin.readList(::ShardId)
8492
} catch (e: Exception) {
93+
log.error("Error parsing shardId list in Doc level monitor fanout request", e)
8594
listOf(ShardId("failed_serde", "failed_serde", 999999))
8695
}
8796

8897
private fun safeReadStringList(sin: StreamInput): List<String> =
8998
try {
9099
sin.readStringList()
91100
} catch (e: Exception) {
101+
log.error("Error parsing string list in Doc level monitor fanout request", e)
92102
emptyList()
93103
}
94104

95105
private fun safeReadWorkflowRunContext(sin: StreamInput): WorkflowRunContext? =
96106
try {
97107
if (sin.readBoolean()) WorkflowRunContext(sin) else null
98108
} catch (e: Exception) {
109+
log.error("Error parsing workflow context in Doc level monitor fanout request", e)
99110
null
100111
}
101112

102-
private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? =
103-
try {
104-
IndexExecutionContext(sin)
113+
private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? {
114+
var indexExecutionContext: IndexExecutionContext? = null
115+
return try {
116+
indexExecutionContext = IndexExecutionContext(sin)
117+
while (sin.read() != -1) {
118+
// read and discard bytes until stream is entirely consumed
119+
try {
120+
sin.readByte()
121+
} catch (_: EOFException) {
122+
}
123+
}
124+
return indexExecutionContext
125+
} catch (e: EOFException) {
126+
indexExecutionContext
105127
} catch (e: Exception) {
128+
log.error("Error parsing index execution context in Doc level monitor fanout request", e)
129+
while (sin.read() != -1) {
130+
try { // read and throw bytes until stream is entirely consumed
131+
sin.readByte()
132+
} catch (_: EOFException) {
133+
}
134+
}
106135
null
107136
}
137+
}
108138
}
109139

110140
constructor(

src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package org.opensearch.commons.alerting.action
77

88
import org.junit.Assert.assertEquals
99
import org.junit.Assert.assertNull
10-
import org.junit.Assert.assertTrue
1110
import org.junit.jupiter.api.Test
1211
import org.opensearch.common.io.stream.BytesStreamOutput
1312
import org.opensearch.commons.alerting.model.ActionExecutionTime
@@ -82,6 +81,7 @@ class DocLevelMonitorFanOutRequestTests {
8281
)
8382
val out = BytesStreamOutput()
8483
docLevelMonitorFanOutRequest.writeTo(out)
84+
monitor.writeTo(out)
8585
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
8686
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
8787
assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)
@@ -90,6 +90,7 @@ class DocLevelMonitorFanOutRequestTests {
9090
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
9191
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
9292
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
93+
assertEquals(sin.read(), -1)
9394
}
9495

9596
@Test

0 commit comments

Comments
 (0)