diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 12d7daf0b..27d5ee774 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -75,6 +75,7 @@ import org.opensearch.commons.alerting.AlertingPluginInterface import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse +import org.opensearch.commons.alerting.action.PublishBatchFindingsRequest import org.opensearch.commons.alerting.action.PublishFindingsRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.ActionExecutionResult @@ -613,9 +614,7 @@ class TransportDocLevelMonitorFanOutAction if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { try { - findings.forEach { finding -> - publishFinding(monitor, finding) - } + publishBatchFindings(monitor, findings) } catch (e: Exception) { // suppress exception log.error("Optional finding callback failed", e) @@ -663,6 +662,27 @@ class TransportDocLevelMonitorFanOutAction ) } + private fun publishBatchFindings( + monitor: Monitor, + findings: List + ) { + val publishBatchFindingsRequest = PublishBatchFindingsRequest(monitor.id, findings) + log.debug("publishing {} findings from node {}", findings.size, clusterService.localNode().id) + AlertingPluginInterface.publishBatchFindings( + client as NodeClient, + publishBatchFindingsRequest, + object : ActionListener { + override fun onResponse(response: SubscribeFindingsResponse) { + log.debug("findings published successfully") + } + + override fun onFailure(e: Exception) { + log.error("publishing findings failed", e) + } + } + ) + } + suspend fun runAction( action: Action, ctx: TriggerExecutionContext,