Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ public interface AggregationPolicy<S, T, U> {
*/
T build(U accumulator);


int getBatchSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public List<S> build(List<S> accumulator) {
return accumulator;
}

@Override
public int getBatchSize() {
return m_completionSize;
}

@Override
public int getCompletionSize() {
return m_completionSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public T build(T accumulator) {
return accumulator;
}

@Override
public int getBatchSize() {
return 1;
}

@Override
public int getCompletionSize() {
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ public T build(T accumulator) {
return accumulator;
}

@Override
public int getBatchSize() {
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public AsyncDispatcherImpl(DispatcherState<W, S, T> state, AsyncPolicy asyncPoli

state.getMetrics().register(queueSizeMetricName(), (Gauge<Integer>) activeDispatchers::get);

if (state.getModule() != null) {
state.getMetrics().register(
batchSizeMetricName(),
(Gauge<Integer>) () -> state.getModule().getAggregationPolicy().getBatchSize()
);
}
droppedCounter = state.getMetrics().counter(MetricRegistry.name(state.getModule().getId(), "dropped"));

executor = Executors.newFixedThreadPool(asyncPolicy.getNumThreads(),
Expand All @@ -115,6 +121,11 @@ private String queueSizeMetricName() {
return MetricRegistry.name(state.getModule().getId(), "queue-size");
}

private String batchSizeMetricName() {
return MetricRegistry.name(state.getModule().getId(), "batch-size");
}


private void dispatchFromQueue() {
while (true) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ public UDPPacketLog aggregate(UDPPacketLog oldPacketLog, UDPPacket newPacket) {
public UDPPacketLog build(UDPPacketLog accumulator) {
return accumulator;
}

@Override
public int getBatchSize() {
return COMPLETION_SIZE;
}

};
}
}
Expand Down Expand Up @@ -359,6 +365,13 @@ public UDPPacketLog aggregate(UDPPacketLog oldPacketLog, UDPPacket newPacket) {
public UDPPacketLog build(UDPPacketLog accumulator) {
return accumulator;
}

@Override
public int getBatchSize() {
return COMPLETION_SIZE;
}


};
}
}
Expand Down Expand Up @@ -398,6 +411,10 @@ public UDPPacketLog aggregate(UDPPacketLog oldPacketLog, UDPPacket newPacket) {
public UDPPacketLog build(UDPPacketLog accumulator) {
return accumulator;
}
@Override
public int getBatchSize() {
return 1;
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public Log aggregate(Log eventLog, Event event) {
public Log build(Log accumulator) {
return accumulator;
}

@Override
public int getBatchSize() {
return m_config.getBatchSize();
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public SyslogMessageLogDTO aggregate(SyslogMessageLogDTO accumulator, SyslogConn
public SyslogMessageLogDTO build(SyslogMessageLogDTO accumulator) {
return accumulator;
}
@Override
public int getBatchSize() {
return config.getBatchSize();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public TrapLogDTO aggregate(TrapLogDTO accumulator, TrapInformationWrapper newMe
public TrapLogDTO build(TrapLogDTO accumulator) {
return accumulator;
}

@Override
public int getBatchSize() {
return config.getBatchSize();
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public TelemetryProtos.TelemetryMessageLog.Builder aggregate(TelemetryProtos.Tel
public TelemetryProtos.TelemetryMessageLog build(TelemetryProtos.TelemetryMessageLog.Builder accumulator) {
return accumulator.build();
}
@Override
public int getBatchSize() {
return TelemetrySinkModule.this.queueConfig.getBatchSize().orElse(DEFAULT_BATCH_SIZE);
}
};
}

Expand Down
Loading