diff --git a/core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/AggregationPolicy.java b/core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/AggregationPolicy.java index 3a538bebe2db..657675352f36 100644 --- a/core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/AggregationPolicy.java +++ b/core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/AggregationPolicy.java @@ -96,4 +96,6 @@ public interface AggregationPolicy { */ T build(U accumulator); + + int getBatchSize(); } diff --git a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/ArrayListAggregationPolicy.java b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/ArrayListAggregationPolicy.java index 5d95439babfe..7ce857b96649 100644 --- a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/ArrayListAggregationPolicy.java +++ b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/ArrayListAggregationPolicy.java @@ -63,6 +63,11 @@ public List build(List accumulator) { return accumulator; } + @Override + public int getBatchSize() { + return m_completionSize; + } + @Override public int getCompletionSize() { return m_completionSize; diff --git a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/IdentityAggregationPolicy.java b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/IdentityAggregationPolicy.java index 3f7888158050..fd82c9692565 100644 --- a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/IdentityAggregationPolicy.java +++ b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/IdentityAggregationPolicy.java @@ -47,6 +47,11 @@ public T build(T accumulator) { return accumulator; } + @Override + public int getBatchSize() { + return 1; + } + @Override public int getCompletionSize() { return 1; diff --git a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/MappingAggregationPolicy.java b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/MappingAggregationPolicy.java index 680b9acdaf22..e0f00bafe750 100644 --- a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/MappingAggregationPolicy.java +++ b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/aggregation/MappingAggregationPolicy.java @@ -59,4 +59,8 @@ public T build(T accumulator) { return accumulator; } + @Override + public int getBatchSize() { + return 1; + } } diff --git a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/common/AsyncDispatcherImpl.java b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/common/AsyncDispatcherImpl.java index b30edd6bd520..cc1ad09545d6 100644 --- a/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/common/AsyncDispatcherImpl.java +++ b/core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/common/AsyncDispatcherImpl.java @@ -103,6 +103,12 @@ public AsyncDispatcherImpl(DispatcherState state, AsyncPolicy asyncPoli state.getMetrics().register(queueSizeMetricName(), (Gauge) activeDispatchers::get); + if (state.getModule() != null) { + state.getMetrics().register( + batchSizeMetricName(), + (Gauge) () -> state.getModule().getAggregationPolicy().getBatchSize() + ); + } droppedCounter = state.getMetrics().counter(MetricRegistry.name(state.getModule().getId(), "dropped")); executor = Executors.newFixedThreadPool(asyncPolicy.getNumThreads(), @@ -115,6 +121,11 @@ private String queueSizeMetricName() { return MetricRegistry.name(state.getModule().getId(), "queue-size"); } + private String batchSizeMetricName() { + return MetricRegistry.name(state.getModule().getId(), "batch-size"); + } + + private void dispatchFromQueue() { while (true) { try { diff --git a/core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/aggregation/AggregationTest.java b/core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/aggregation/AggregationTest.java index 7745a58b7e8c..57af19a84acb 100644 --- a/core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/aggregation/AggregationTest.java +++ b/core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/aggregation/AggregationTest.java @@ -321,6 +321,12 @@ public UDPPacketLog aggregate(UDPPacketLog oldPacketLog, UDPPacket newPacket) { public UDPPacketLog build(UDPPacketLog accumulator) { return accumulator; } + + @Override + public int getBatchSize() { + return COMPLETION_SIZE; + } + }; } } @@ -359,6 +365,13 @@ public UDPPacketLog aggregate(UDPPacketLog oldPacketLog, UDPPacket newPacket) { public UDPPacketLog build(UDPPacketLog accumulator) { return accumulator; } + + @Override + public int getBatchSize() { + return COMPLETION_SIZE; + } + + }; } } @@ -398,6 +411,10 @@ public UDPPacketLog aggregate(UDPPacketLog oldPacketLog, UDPPacket newPacket) { public UDPPacketLog build(UDPPacketLog accumulator) { return accumulator; } + @Override + public int getBatchSize() { + return 1; + } }; } } diff --git a/features/events/sink/dispatcher/src/main/java/org/opennms/features/events/sink/module/EventSinkModule.java b/features/events/sink/dispatcher/src/main/java/org/opennms/features/events/sink/module/EventSinkModule.java index d2dec72c5611..86b7245a5ee7 100644 --- a/features/events/sink/dispatcher/src/main/java/org/opennms/features/events/sink/module/EventSinkModule.java +++ b/features/events/sink/dispatcher/src/main/java/org/opennms/features/events/sink/module/EventSinkModule.java @@ -90,6 +90,12 @@ public Log aggregate(Log eventLog, Event event) { public Log build(Log accumulator) { return accumulator; } + + @Override + public int getBatchSize() { + return m_config.getBatchSize(); + } + }; } diff --git a/features/events/syslog/src/main/java/org/opennms/netmgt/syslogd/SyslogSinkModule.java b/features/events/syslog/src/main/java/org/opennms/netmgt/syslogd/SyslogSinkModule.java index 52c8d16c77ce..548d8207ebea 100644 --- a/features/events/syslog/src/main/java/org/opennms/netmgt/syslogd/SyslogSinkModule.java +++ b/features/events/syslog/src/main/java/org/opennms/netmgt/syslogd/SyslogSinkModule.java @@ -90,6 +90,10 @@ public SyslogMessageLogDTO aggregate(SyslogMessageLogDTO accumulator, SyslogConn public SyslogMessageLogDTO build(SyslogMessageLogDTO accumulator) { return accumulator; } + @Override + public int getBatchSize() { + return config.getBatchSize(); + } }; } diff --git a/features/events/traps/src/main/java/org/opennms/netmgt/trapd/TrapSinkModule.java b/features/events/traps/src/main/java/org/opennms/netmgt/trapd/TrapSinkModule.java index 3c0af9d09f15..bd8e24a2b93b 100644 --- a/features/events/traps/src/main/java/org/opennms/netmgt/trapd/TrapSinkModule.java +++ b/features/events/traps/src/main/java/org/opennms/netmgt/trapd/TrapSinkModule.java @@ -105,6 +105,12 @@ public TrapLogDTO aggregate(TrapLogDTO accumulator, TrapInformationWrapper newMe public TrapLogDTO build(TrapLogDTO accumulator) { return accumulator; } + + @Override + public int getBatchSize() { + return config.getBatchSize(); + } + }; } diff --git a/features/telemetry/common/src/main/java/org/opennms/netmgt/telemetry/common/ipc/TelemetrySinkModule.java b/features/telemetry/common/src/main/java/org/opennms/netmgt/telemetry/common/ipc/TelemetrySinkModule.java index 61f422226260..c10282941894 100644 --- a/features/telemetry/common/src/main/java/org/opennms/netmgt/telemetry/common/ipc/TelemetrySinkModule.java +++ b/features/telemetry/common/src/main/java/org/opennms/netmgt/telemetry/common/ipc/TelemetrySinkModule.java @@ -138,6 +138,10 @@ public TelemetryProtos.TelemetryMessageLog.Builder aggregate(TelemetryProtos.Tel public TelemetryProtos.TelemetryMessageLog build(TelemetryProtos.TelemetryMessageLog.Builder accumulator) { return accumulator.build(); } + @Override + public int getBatchSize() { + return TelemetrySinkModule.this.queueConfig.getBatchSize().orElse(DEFAULT_BATCH_SIZE); + } }; }