Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.action.PublishBatchFindingsRequest
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SearchMonitorRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
Expand Down Expand Up @@ -268,6 +269,24 @@ object AlertingPluginInterface {
)
}

fun publishBatchFindings(
client: NodeClient,
request: PublishBatchFindingsRequest,
listener: ActionListener<SubscribeFindingsResponse>
) {
client.execute(
AlertingActions.SUBSCRIBE_BATCH_FINDINGS_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
SubscribeFindingsResponse(
it
)
}
}
)
}

/**
* Acknowledge Chained Alerts interface.
* @param client Node client for making transport action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ object AlertingActions {
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
const val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/chained_alerts/ack"
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
const val SUBSCRIBE_BATCH_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/batch/subscribe"
const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get"
const val SEARCH_MONITORS_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/search"
const val INDEX_COMMENT_ACTION_NAME = "cluster:admin/opensearch/alerting/comments/write"
Expand Down Expand Up @@ -65,6 +66,10 @@ object AlertingActions {
val SUBSCRIBE_FINDINGS_ACTION_TYPE =
ActionType(SUBSCRIBE_FINDINGS_ACTION_NAME, ::SubscribeFindingsResponse)

@JvmField
val SUBSCRIBE_BATCH_FINDINGS_ACTION_TYPE =
ActionType(SUBSCRIBE_BATCH_FINDINGS_ACTION_NAME, ::SubscribeFindingsResponse)

@JvmField
val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_TYPE =
ActionType(ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException
import java.util.Collections

class PublishBatchFindingsRequest : ActionRequest {

val monitorId: String

val findings: List<Finding>

constructor(
monitorId: String,
findings: List<Finding>
) : super() {
this.monitorId = monitorId
this.findings = findings
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitorId = sin.readString(),
findings = Collections.unmodifiableList(sin.readList(::Finding))
)

override fun validate(): ActionRequestValidationException? {
return null
}

override fun writeTo(out: StreamOutput) {
out.writeString(monitorId)
out.writeCollection(findings)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.opensearch.commons.alerting.action

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.randomFinding
import org.opensearch.core.common.io.stream.StreamInput

class PublishBatchFindingsRequestTests {

@Test
fun `test publish batch findings request`() {
val findings = listOf(randomFinding(), randomFinding())
val monitorId = "mid"
val req = PublishBatchFindingsRequest(monitorId, findings)
assertNotNull(req)
assertEquals(monitorId, req.monitorId)
assertEquals(findings, req.findings)

val out = BytesStreamOutput()
req.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newReq = PublishBatchFindingsRequest(sin)
assertEquals(monitorId, newReq.monitorId)
assertEquals(findings.size, newReq.findings.size)
assert(newReq.findings.zip(findings).all { (f1, f2) -> f1.id == f2.id })
}
}
Loading