55
66package org.opensearch.commons.alerting.action
77
8+ import org.apache.logging.log4j.LogManager
89import org.opensearch.action.ActionRequest
910import org.opensearch.action.ActionRequestValidationException
1011import org.opensearch.commons.alerting.model.DataSources
@@ -22,11 +23,13 @@ import org.opensearch.core.xcontent.ToXContent
2223import org.opensearch.core.xcontent.ToXContentObject
2324import org.opensearch.core.xcontent.XContentBuilder
2425import org.opensearch.index.seqno.SequenceNumbers
26+ import java.io.EOFException
2527import java.io.IOException
2628import java.time.Instant
2729import java.time.temporal.ChronoUnit
2830
2931class DocLevelMonitorFanOutRequest : ActionRequest , ToXContentObject {
32+ private val log = LogManager .getLogger(DocLevelMonitorFanOutRequest ::class .java)
3033 val monitor: Monitor
3134 val dryRun: Boolean
3235 val monitorMetadata: MonitorMetadata
@@ -99,12 +102,30 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
99102 null
100103 }
101104
102- private fun safeReadIndexExecutionContext (sin : StreamInput ): IndexExecutionContext ? =
103- try {
104- IndexExecutionContext (sin)
105+ private fun safeReadIndexExecutionContext (sin : StreamInput ): IndexExecutionContext ? {
106+ var indexExecutionContext: IndexExecutionContext ? = null
107+ return try {
108+ indexExecutionContext = IndexExecutionContext (sin)
109+ while (sin.read() != - 1 ) {
110+ try { // read and throw bytes until stream is entirely consumed
111+ sin.readByte()
112+ } catch (_: EOFException ) {
113+ }
114+ }
115+ return indexExecutionContext
116+ } catch (e: EOFException ) {
117+ indexExecutionContext
118+
105119 } catch (e: Exception ) {
120+ while (sin.read() != - 1 ) {
121+ try { // read and throw bytes until stream is entirely consumed
122+ sin.readByte()
123+ } catch (_: EOFException ) {
124+ }
125+ }
106126 null
107127 }
128+ }
108129 }
109130
110131 constructor (
@@ -151,6 +172,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
151172 out .writeBoolean(workflowRunContext != null )
152173 workflowRunContext?.writeTo(out )
153174 indexExecutionContext?.writeTo(out )
175+ indexExecutionContext?.writeTo(out )
154176 }
155177
156178 override fun validate (): ActionRequestValidationException ? {
0 commit comments