Skip to content

Commit ca6e76f

Browse files
committed
Exposed user metric for histrogam flow batch byte size
1 parent fbef946 commit ca6e76f

File tree

4 files changed

+30
-3
lines changed

4 files changed

+30
-3
lines changed

logstash-core/lib/logstash/api/commands/stats.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ def report(stats, extended_stats = nil, opts = {})
225225
:batch_delay => stats.dig(:config, :batch_delay),
226226
}
227227
}
228-
ret[:batch] = refine_batch_metrics(stats) if stats.include?(:batch)
228+
# ret[:batch] = refine_batch_metrics(stats) if stats.include?(:batch)
229+
ret[:batch] = stats[:batch] if stats.include?(:batch)
229230
ret[:dead_letter_queue] = stats[:dlq] if stats.include?(:dlq)
230231

231232
# if extended_stats were provided, enrich the return value

logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import org.logstash.ackedqueue.QueueFactoryExt;
77
import org.logstash.ext.JrubyEventExtLibrary;
88
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
9+
import org.logstash.instrument.metrics.HdrHistogramFlowMetric;
10+
import org.logstash.instrument.metrics.HistogramFlowMetric;
911
import org.logstash.instrument.metrics.counter.LongCounter;
1012
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
1113

@@ -23,6 +25,7 @@ class QueueReadClientBatchMetrics {
2325
private LongCounter pipelineMetricBatchCount;
2426
private LongCounter pipelineMetricBatchByteSize;
2527
private LongCounter pipelineMetricBatchTotalEvents;
28+
private HistogramFlowMetric pipelineMetricBatchByteSizeFlowHistogram;
2629
private final SecureRandom random = new SecureRandom();
2730
private LazyDelegatingGauge currentBatchDimensions;
2831

@@ -39,6 +42,9 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) {
3942
pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS);
4043
pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES);
4144
currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY);
45+
pipelineMetricBatchByteSizeFlowHistogram = batchNamespace.asApiMetric()
46+
.namespace("batch_byte_size")
47+
.register("histogram", HdrHistogramFlowMetric.FACTORY);
4248
}
4349
}
4450

@@ -75,6 +81,7 @@ private void updateBatchSizeMetric(QueueBatch batch) {
7581
pipelineMetricBatchTotalEvents.increment(batch.filteredSize());
7682
pipelineMetricBatchByteSize.increment(totalByteSize);
7783
currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalByteSize));
84+
pipelineMetricBatchByteSizeFlowHistogram.recordValue(totalByteSize);
7885
} catch (IllegalArgumentException e) {
7986
LOG.error("Failed to calculate batch byte size for metrics", e);
8087
}

logstash-core/src/main/java/org/logstash/instrument/metrics/HdrHistogramFlowMetric.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.logstash.instrument.metrics;
2121

22+
import co.elastic.logstash.api.UserMetric;
2223
import org.HdrHistogram.Histogram;
2324
import org.HdrHistogram.Recorder;
2425
import org.apache.logging.log4j.LogManager;
@@ -114,6 +115,8 @@ HistogramMetricData computeAggregatedHistogramData() {
114115

115116
private static final Logger LOG = LogManager.getLogger(HdrHistogramFlowMetric.class);
116117

118+
public static UserMetric.Factory<HistogramFlowMetric> FACTORY = HistogramFlowMetric.PROVIDER.getFactory(HdrHistogramFlowMetric::new);
119+
117120
private static final List<FlowMetricRetentionPolicy> SUPPORTED_POLICIES = List.of(
118121
FlowMetricRetentionPolicy.BuiltInRetentionPolicy.LAST_1_MINUTE,
119122
FlowMetricRetentionPolicy.BuiltInRetentionPolicy.LAST_5_MINUTES,
@@ -139,7 +142,6 @@ public Map<String, HistogramMetricData> getValue() {
139142
final long currentTimeNanos = System.nanoTime();
140143
histogramsWindows.forEach((policy, window) -> {
141144
window.recordWindow.baseline(currentTimeNanos).ifPresent(baseline -> {
142-
LOG.info("getValue computing aggregated histogram");
143145
result.put(policy.policyName().toLowerCase(), window.computeAggregatedHistogramData());
144146
});
145147
});

logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramFlowMetric.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,24 @@
2424

2525
public interface HistogramFlowMetric extends UserMetric<Map<String, HistogramMetricData>>,
2626
org.logstash.instrument.metrics.Metric<Map<String, HistogramMetricData>> {
27-
27+
28+
Provider<HistogramFlowMetric> PROVIDER = new Provider<>(HistogramFlowMetric.class, new HistogramFlowMetric() {
29+
@Override
30+
public Map<String, HistogramMetricData> getValue() {
31+
return Map.of();
32+
}
33+
34+
@Override
35+
public String getName() {
36+
return "NULL";
37+
}
38+
39+
@Override
40+
public void recordValue(long totalByteSize) {
41+
// no-op
42+
}
43+
});
44+
2845
void recordValue(long totalByteSize);
2946

3047
@Override

0 commit comments

Comments
 (0)