Skip to content

Commit fe1392e

Browse files
authored
KafkaBinderMetrics' metrics should be unregistered before it's thread
* KafkaBinderMetrics' metrics should be unregistered before it's threadpool is shutdown. * update authors and copyright years
1 parent 8e5f67a commit fe1392e

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,6 +67,7 @@
6767
* @author Lars Bilger
6868
* @author Tomek Szmytka
6969
* @author Nico Heller
70+
* @author Kurt Hong
7071
*/
7172
public class KafkaBinderMetrics
7273
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable {
@@ -263,6 +264,9 @@ public void onApplicationEvent(BindingCreatedEvent event) {
263264

264265
@Override
265266
public void close() throws Exception {
267+
if (this.meterRegistry != null) {
268+
this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
269+
}
266270
Optional.ofNullable(scheduler).ifPresent(ExecutorService::shutdown);
267271
}
268272
}

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@
5656
* @author Lars Bilger
5757
* @author Tomek Szmytka
5858
* @author Nico Heller
59+
* @author Kurt Hong
5960
*/
6061
class KafkaBinderMetricsTest {
6162

@@ -91,7 +92,7 @@ public void setup() {
9192
org.mockito.BDDMockito.given(kafkaBinderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval())
9293
.willReturn(Duration.ofSeconds(60));
9394
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties,
94-
consumerFactory, null
95+
consumerFactory, meterRegistry
9596
);
9697
org.mockito.BDDMockito
9798
.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
@@ -351,6 +352,19 @@ public void shouldShutdownSchedulerOnClose() throws Exception {
351352
assertThat(metrics.scheduler.isShutdown()).isTrue();
352353
}
353354

355+
@Test
356+
public void shouldUnregisterMetersOnClose() throws Exception {
357+
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
358+
topicsInUse.put(
359+
TEST_TOPIC,
360+
new TopicInformation("group4-metrics", partitions, false)
361+
);
362+
metrics.bindTo(meterRegistry);
363+
assertThat(meterRegistry.find(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME).meters()).hasSize(1);
364+
metrics.close();
365+
assertThat(meterRegistry.find(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME).meters()).isEmpty();
366+
}
367+
354368
private List<PartitionInfo> partitions(Node... nodes) {
355369
List<PartitionInfo> partitions = new ArrayList<>();
356370
for (int i = 0; i < nodes.length; i++) {

0 commit comments

Comments
 (0)