Skip to content

Commit fdeb6a0

Browse files
andseljsvd
andauthored
Implements current batch event count and byte size metrics (#18160)
Exposes the current batch size in terms of events and estimated memory consumption. Introduce a new gauge metric to collect list of values, used a a couple, the first element count of events in the batch and the second one is the estimated memory occupation of the batch. This list is later grabbed in the API layer to populate the two current values for `event_count` and `batch_size`. Co-authored-by: João Duarte <[email protected]>
1 parent 2fab5f4 commit fdeb6a0

File tree

10 files changed

+62
-11
lines changed

10 files changed

+62
-11
lines changed

logstash-core/lib/logstash/api/commands/stats.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,14 +173,20 @@ def plugin_stats(stats, plugin_type)
173173
end
174174

175175
def refine_batch_metrics(stats)
176+
# current is a tuple of [event_count, byte_size] store the reference locally to avoid repeatedly
177+
# reading and retrieve unrelated values
178+
current_data_point = stats[:batch][:current]
176179
{
177180
:event_count => {
181+
# current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value
182+
:current => current_data_point.value[0],
178183
:average => {
179184
# average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
180185
:lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0
181186
}
182187
},
183188
:byte_size => {
189+
:current => current_data_point.value[1],
184190
:average => {
185191
:lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0
186192
}

logstash-core/spec/logstash/api/modules/node_stats_spec.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,13 @@
150150
},
151151
"batch" => {
152152
"event_count" => {
153+
"current" => Numeric,
153154
"average" => {
154155
"lifetime" => Numeric
155156
}
156157
},
157158
"byte_size" => {
159+
"current" => Numeric,
158160
"average" => {
159161
"lifetime" => Numeric
160162
}

logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import org.logstash.instrument.metrics.MetricType;
9393
import org.logstash.instrument.metrics.NullMetricExt;
9494
import org.logstash.instrument.metrics.UpScaledMetric;
95+
import org.logstash.instrument.metrics.gauge.TextGauge;
9596
import org.logstash.instrument.metrics.timer.TimerMetric;
9697
import org.logstash.instrument.metrics.UptimeMetric;
9798
import org.logstash.instrument.metrics.counter.LongCounter;

logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import org.logstash.ext.JrubyEventExtLibrary;
88
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
99
import org.logstash.instrument.metrics.counter.LongCounter;
10+
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
1011

1112
import java.security.SecureRandom;
13+
import java.util.Arrays;
1214

1315
import static org.logstash.instrument.metrics.MetricKeys.*;
1416

@@ -22,6 +24,7 @@ class QueueReadClientBatchMetrics {
2224
private LongCounter pipelineMetricBatchByteSize;
2325
private LongCounter pipelineMetricBatchTotalEvents;
2426
private final SecureRandom random = new SecureRandom();
27+
private LazyDelegatingGauge currentBatchDimensions;
2528

2629
public QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode batchMetricMode) {
2730
this.batchMetricMode = batchMetricMode;
@@ -35,16 +38,18 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) {
3538
pipelineMetricBatchCount = LongCounter.fromRubyBase(batchNamespace, BATCH_COUNT);
3639
pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS);
3740
pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES);
41+
currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY);
3842
}
3943
}
4044

4145
public void updateBatchMetrics(QueueBatch batch) {
42-
if (batch.events().isEmpty()) {
43-
// avoid to increment batch count for empty batches
46+
if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) {
4447
return;
4548
}
4649

47-
if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) {
50+
if (batch.events().isEmpty()) {
51+
// don't update averages for empty batches, but set current back to zero
52+
currentBatchDimensions.set(Arrays.asList(0L, 0L));
4853
return;
4954
}
5055

@@ -62,13 +67,14 @@ public void updateBatchMetrics(QueueBatch batch) {
6267
private void updateBatchSizeMetric(QueueBatch batch) {
6368
try {
6469
// if an error occurs in estimating the size of the batch, no counter has to be updated
65-
long totalSize = 0L;
70+
long totalByteSize = 0L;
6671
for (JrubyEventExtLibrary.RubyEvent rubyEvent : batch.events()) {
67-
totalSize += rubyEvent.getEvent().estimateMemory();
72+
totalByteSize += rubyEvent.getEvent().estimateMemory();
6873
}
6974
pipelineMetricBatchCount.increment();
7075
pipelineMetricBatchTotalEvents.increment(batch.filteredSize());
71-
pipelineMetricBatchByteSize.increment(totalSize);
76+
pipelineMetricBatchByteSize.increment(totalByteSize);
77+
currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalByteSize));
7278
} catch (IllegalArgumentException e) {
7379
LOG.error("Failed to calculate batch byte size for metrics", e);
7480
}

logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ public final class MetricExt extends AbstractSimpleMetricExt {
4444

4545
private static final long serialVersionUID = 1L;
4646

47-
public static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter");
47+
// These two metric type symbols need to be package-private because used in NamespacedMetricExt
48+
static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter");
49+
static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge");
4850

4951
private static final RubyFixnum ONE = RubyUtil.RUBY.newFixnum(1);
5052

5153
private static final RubySymbol INCREMENT = RubyUtil.RUBY.newSymbol("increment");
5254

5355
private static final RubySymbol DECREMENT = RubyUtil.RUBY.newSymbol("decrement");
54-
55-
private static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge");
5656
private static final RubySymbol TIMER = RubyUtil.RUBY.newSymbol("timer");
5757
private static final RubySymbol SET = RubyUtil.RUBY.newSymbol("set");
5858
private static final RubySymbol GET = RubyUtil.RUBY.newSymbol("get");

logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,6 @@ private MetricKeys() {
131131

132132
public static final RubySymbol BATCH_BYTE_SIZE_KEY = RubyUtil.RUBY.newSymbol("byte_size");
133133

134+
public static final RubySymbol BATCH_CURRENT_KEY = RubyUtil.RUBY.newSymbol("current");
135+
134136
}

logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ protected IRubyObject getCounter(final ThreadContext context, final IRubyObject
7676
@Override
7777
protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key,
7878
final IRubyObject value) {
79-
return metric.gauge(context, namespaceName, key, value);
79+
metric.gauge(context, namespaceName, key, value);
80+
return collector(context).callMethod(
81+
context, "get", new IRubyObject[]{namespaceName, key, MetricExt.GAUGE}
82+
);
8083
}
8184

8285
@Override

logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,16 @@
2323
import org.apache.logging.log4j.LogManager;
2424
import org.apache.logging.log4j.Logger;
2525
import org.jruby.RubyHash;
26+
import org.jruby.RubySymbol;
27+
import org.jruby.runtime.ThreadContext;
28+
import org.jruby.runtime.builtin.IRubyObject;
29+
import org.logstash.RubyUtil;
2630
import org.logstash.ext.JrubyTimestampExtLibrary.RubyTimestamp;
2731
import org.logstash.instrument.metrics.AbstractMetric;
32+
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
2833
import org.logstash.instrument.metrics.MetricType;
2934

35+
import java.util.Arrays;
3036
import java.util.List;
3137
import java.util.Optional;
3238

@@ -39,11 +45,27 @@ public class LazyDelegatingGauge extends AbstractMetric<Object> implements Gauge
3945

4046
private static final Logger LOGGER = LogManager.getLogger(LazyDelegatingGauge.class);
4147

48+
public static final LazyDelegatingGauge DUMMY_GAUGE = new LazyDelegatingGauge("dummy");
49+
4250
protected final String key;
4351

4452
@SuppressWarnings("rawtypes")
4553
private GaugeMetric lazyMetric;
4654

55+
56+
public static LazyDelegatingGauge fromRubyBase(final AbstractNamespacedMetricExt metric, final RubySymbol key) {
57+
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
58+
// just initialize an empty gauge
59+
final IRubyObject gauge = metric.gauge(context, key, context.runtime.newArray(context.runtime.newString("undefined"), context.runtime.newString("undefined")));
60+
final LazyDelegatingGauge javaGauge;
61+
if (LazyDelegatingGauge.class.isAssignableFrom(gauge.getJavaClass())) {
62+
javaGauge = gauge.toJava(LazyDelegatingGauge.class);
63+
} else {
64+
javaGauge = DUMMY_GAUGE;
65+
}
66+
return javaGauge;
67+
}
68+
4769
/**
4870
* Constructor - null initial value
4971
*

logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.jruby.runtime.builtin.IRubyObject;
1010
import org.logstash.RubyUtil;
1111
import org.logstash.instrument.metrics.counter.LongCounter;
12+
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
1213
import org.logstash.instrument.metrics.timer.TimerMetric;
1314

1415
import java.util.Objects;
@@ -36,7 +37,9 @@ public static MockNamespacedMetric create() {
3637

3738
@Override
3839
protected IRubyObject getGauge(ThreadContext context, IRubyObject key, IRubyObject value) {
39-
return null;
40+
Objects.requireNonNull(key);
41+
requireRubySymbol(key, "key");
42+
return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), LazyDelegatingGauge::new));
4043
}
4144

4245
@Override

qa/integration/specs/monitoring_api_spec.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,11 +269,17 @@
269269
expect(batch_stats["event_count"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
270270
expect(batch_stats["event_count"]["average"]["lifetime"]).to be > 0
271271

272+
expect(batch_stats["event_count"]["current"]).not_to be_nil
273+
expect(batch_stats["event_count"]["current"]).to be >= 0
274+
272275
expect(batch_stats["byte_size"]).not_to be_nil
273276
expect(batch_stats["byte_size"]["average"]).not_to be_nil
274277
expect(batch_stats["byte_size"]["average"]["lifetime"]).not_to be_nil
275278
expect(batch_stats["byte_size"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
276279
expect(batch_stats["byte_size"]["average"]["lifetime"]).to be > 0
280+
281+
expect(batch_stats["byte_size"]["current"]).not_to be_nil
282+
expect(batch_stats["byte_size"]["current"]).to be >= 0
277283
end
278284
end
279285
end

0 commit comments

Comments
 (0)