Skip to content

Commit 0dd8471

Browse files
authored
KAFKA-19748: fix metrics leak in Kafka Streams (#20633)
This PR fixes a leak in StreamsMetricImpl not removing a store-level-metric correctly, and thus leaking objects. Reviewers: Eduwer Camacaro <[email protected]>, Bill Bejeck <[email protected]>
1 parent 7ddd0d7 commit 0dd8471

File tree

4 files changed

+34
-6
lines changed

4 files changed

+34
-6
lines changed

streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void add(final MeteredIterator iterator) {
6262

6363
public void remove(final MeteredIterator iterator) {
6464
if (openIterators.size() == 1) {
65-
streamsMetrics.removeMetric(metricName);
65+
streamsMetrics.removeStoreLevelMetric(metricName);
6666
}
6767
openIterators.remove(iterator);
6868
updateOldestStartTimestamp();

streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@
4141
import java.util.HashMap;
4242
import java.util.LinkedHashMap;
4343
import java.util.LinkedList;
44+
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Objects;
4647
import java.util.concurrent.ConcurrentHashMap;
4748
import java.util.concurrent.ConcurrentMap;
4849
import java.util.concurrent.TimeUnit;
4950
import java.util.function.Supplier;
51+
import java.util.stream.Collectors;
5052

5153
public class StreamsMetricsImpl implements StreamsMetrics {
5254

@@ -339,6 +341,30 @@ public void removeMetric(final MetricName metricName) {
339341
metrics.removeMetric(metricName);
340342
}
341343

344+
public void removeStoreLevelMetric(final MetricName metricName) {
345+
metrics.removeMetric(metricName);
346+
347+
final List<String> metricsScopeCandidates = metricName.tags().keySet().stream()
348+
.filter(tag -> !tag.equals(THREAD_ID_TAG) && !tag.equals(TASK_ID_TAG))
349+
.collect(Collectors.toList());
350+
if (metricsScopeCandidates.size() != 1) {
351+
// should never happen
352+
throw new IllegalStateException("Expected exactly one metric scope tag, but found " + metricsScopeCandidates);
353+
}
354+
355+
final Deque<MetricName> metricsForStore = storeLevelMetrics.get(
356+
storeSensorPrefix(
357+
metricName.tags().get(THREAD_ID_TAG),
358+
metricName.tags().get(TASK_ID_TAG),
359+
metricName.tags().get(metricsScopeCandidates.get(0))
360+
)
361+
);
362+
363+
if (metricsForStore != null) {
364+
metricsForStore.remove(metricName);
365+
}
366+
}
367+
342368
public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
343369
final Map<String, String> tagMap = new LinkedHashMap<>();
344370
tagMap.put(THREAD_ID_TAG, threadId);

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,9 +487,11 @@ private class MeteredKeyValueTimestampedIterator implements KeyValueIterator<K,
487487
private final long startTimestamp;
488488
private final Function<byte[], V> valueDeserializer;
489489

490-
private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]> iter,
491-
final Sensor sensor,
492-
final Function<byte[], V> valueDeserializer) {
490+
private MeteredKeyValueTimestampedIterator(
491+
final KeyValueIterator<Bytes, byte[]> iter,
492+
final Sensor sensor,
493+
final Function<byte[], V> valueDeserializer
494+
) {
493495
this.iter = iter;
494496
this.sensor = sensor;
495497
this.valueDeserializer = valueDeserializer;

streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ public void shouldCalculateOldestStartTimestampCorrectly() {
5858
assertThat(gauge.value(null, 0), is(2L));
5959

6060
openIterators.remove(meteredIterator2);
61-
verify(streamsMetrics, never()).removeMetric(any());
61+
verify(streamsMetrics, never()).removeStoreLevelMetric(any());
6262
assertThat(gauge.value(null, 0), is(5L));
6363

6464
openIterators.remove(meteredIterator1);
65-
verify(streamsMetrics).removeMetric(any());
65+
verify(streamsMetrics).removeStoreLevelMetric(any());
6666
assertThat(gauge.value(null, 0), is(5L));
6767

6868
openIterators.add(meteredIterator3);

0 commit comments

Comments
 (0)