Skip to content

Commit 70b8037

Browse files
authored
Merge pull request #3064 from artembilan/GH-3062
GH-3062: Fix `KafkaBinderMetrics` for resource leaks
2 parents 534277f + f259c55 commit 70b8037

File tree

3 files changed

+85
-15
lines changed

3 files changed

+85
-15
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,16 +20,15 @@
2020
import java.util.LinkedList;
2121
import java.util.List;
2222
import java.util.Map;
23-
import java.util.Optional;
2423
import java.util.concurrent.ConcurrentHashMap;
2524
import java.util.concurrent.ExecutionException;
26-
import java.util.concurrent.ExecutorService;
2725
import java.util.concurrent.Executors;
2826
import java.util.concurrent.Future;
2927
import java.util.concurrent.ScheduledExecutorService;
3028
import java.util.concurrent.ScheduledThreadPoolExecutor;
3129
import java.util.concurrent.TimeUnit;
3230
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicBoolean;
3332
import java.util.concurrent.locks.ReentrantLock;
3433
import java.util.function.ToDoubleFunction;
3534

@@ -50,10 +49,12 @@
5049
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
5150
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
5251
import org.springframework.context.ApplicationListener;
52+
import org.springframework.context.Lifecycle;
5353
import org.springframework.kafka.core.ConsumerFactory;
5454
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5555
import org.springframework.lang.Nullable;
5656
import org.springframework.util.ObjectUtils;
57+
import org.springframework.util.ReflectionUtils;
5758

5859
/**
5960
* Metrics for Kafka binder.
@@ -72,7 +73,7 @@
7273
* @author Omer Celik
7374
*/
7475
public class KafkaBinderMetrics
75-
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable {
76+
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable, Lifecycle {
7677

7778
private static final int DEFAULT_TIMEOUT = 5;
7879

@@ -101,6 +102,8 @@ public class KafkaBinderMetrics
101102

102103
private final ReentrantLock consumerFactoryLock = new ReentrantLock();
103104

105+
private final AtomicBoolean running = new AtomicBoolean();
106+
104107
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
105108
KafkaBinderConfigurationProperties binderConfigurationProperties,
106109
ConsumerFactory<?, ?> defaultConsumerFactory,
@@ -125,14 +128,14 @@ public void setTimeout(int timeout) {
125128

126129
@Override
127130
public void bindTo(MeterRegistry registry) {
128-
/**
131+
/*
129132
* We can't just replace one scheduler with another.
130133
* Before and even after the old one is gathered by GC, it's threads still exist, consume memory and CPU resources to switch contexts.
131134
* Theoretically, as a result of processing n topics, there will be about (1+n)*n/2 threads simultaneously at the same time.
132135
*/
133136
if (this.scheduler != null) {
134137
LOG.info("Try to shutdown the old scheduler with " + ((ScheduledThreadPoolExecutor) scheduler).getPoolSize() + " threads");
135-
this.scheduler.shutdown();
138+
this.scheduler.shutdownNow();
136139
}
137140

138141
this.scheduler = Executors.newScheduledThreadPool(this.binder.getTopicsInUse().size());
@@ -278,10 +281,50 @@ public void onApplicationEvent(BindingCreatedEvent event) {
278281
}
279282

280283
@Override
281-
public void close() throws Exception {
282-
if (this.meterRegistry != null) {
283-
this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
284+
public void close() {
285+
if (this.scheduler != null) {
286+
this.consumerFactoryLock.lock();
287+
try {
288+
if (this.meterRegistry != null) {
289+
this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
290+
}
291+
this.scheduler.shutdownNow();
292+
try {
293+
this.scheduler.awaitTermination(
294+
binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().toSeconds(),
295+
TimeUnit.SECONDS);
296+
}
297+
catch (InterruptedException ex) {
298+
Thread.currentThread().interrupt();
299+
ReflectionUtils.rethrowRuntimeException(ex);
300+
}
301+
}
302+
finally {
303+
this.scheduler = null;
304+
this.metadataConsumers.values().forEach(Consumer::close);
305+
this.metadataConsumers.clear();
306+
this.consumerFactoryLock.unlock();
307+
}
284308
}
285-
Optional.ofNullable(scheduler).ifPresent(ExecutorService::shutdown);
286309
}
310+
311+
@Override
312+
public void start() {
313+
this.running.set(true);
314+
// Nothing else to do here. The 'bindTo()' is called from the 'onApplicationEvent()',
315+
// which, in turn, is emitted from the 'AbstractBindingLifecycle.start()' logic.
316+
}
317+
318+
@Override
319+
public void stop() {
320+
if (this.running.compareAndSet(true, false)) {
321+
close();
322+
}
323+
}
324+
325+
@Override
326+
public boolean isRunning() {
327+
return this.running.get();
328+
}
329+
287330
}

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,6 +57,7 @@
5757
* @author Tomek Szmytka
5858
* @author Nico Heller
5959
* @author Kurt Hong
60+
* @author Artem Bilan
6061
*/
6162
class KafkaBinderMetricsTest {
6263

@@ -346,10 +347,11 @@ public void usesBeginningOffsetIfNoCommittedOffsetFound() {
346347
}
347348

348349
@Test
349-
public void shouldShutdownSchedulerOnClose() throws Exception {
350+
public void shouldShutdownSchedulerOnClose() {
350351
metrics.bindTo(meterRegistry);
352+
assertThat(metrics.scheduler).isNotNull();
351353
metrics.close();
352-
assertThat(metrics.scheduler.isShutdown()).isTrue();
354+
assertThat(metrics.scheduler).isNull();
353355
}
354356

355357
@Test

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2023 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import java.util.Map.Entry;
3232
import java.util.Properties;
3333
import java.util.Set;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3435
import java.util.concurrent.locks.ReentrantLock;
3536
import java.util.stream.Stream;
3637

@@ -47,6 +48,7 @@
4748
import org.springframework.context.ApplicationEvent;
4849
import org.springframework.context.ApplicationListener;
4950
import org.springframework.context.ConfigurableApplicationContext;
51+
import org.springframework.context.SmartLifecycle;
5052
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5153
import org.springframework.context.support.GenericApplicationContext;
5254
import org.springframework.core.convert.converter.Converter;
@@ -78,7 +80,7 @@
7880
* @author Byungjun You
7981
* @author Omer Celik
8082
*/
81-
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware {
83+
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware, SmartLifecycle {
8284

8385
protected final Log logger = LogFactory.getLog(getClass());
8486

@@ -94,6 +96,8 @@ public class DefaultBinderFactory implements BinderFactory, DisposableBean, Appl
9496

9597
private final BinderCustomizer binderCustomizer;
9698

99+
private final AtomicBoolean running = new AtomicBoolean();
100+
97101
private volatile ConfigurableApplicationContext context;
98102

99103
private Collection<Listener> listeners;
@@ -144,6 +148,27 @@ public void destroy() {
144148
this.defaultBinderForBindingTargetType.clear();
145149
}
146150

151+
@Override
152+
public void start() {
153+
// This is essentially used when CRaC checkpoint is restored
154+
if (this.running.compareAndSet(false, true)) {
155+
this.binderInstanceCache.values().stream().map(Entry::getValue).forEach(ConfigurableApplicationContext::start);
156+
}
157+
}
158+
159+
@Override
160+
public void stop() {
161+
// Makes sense for CRaC checkpoint
162+
if (this.running.compareAndSet(true, false)) {
163+
this.binderInstanceCache.values().stream().map(Entry::getValue).forEach(ConfigurableApplicationContext::stop);
164+
}
165+
}
166+
167+
@Override
168+
public boolean isRunning() {
169+
return this.running.get();
170+
}
171+
147172
@SuppressWarnings({ "unchecked", "rawtypes" })
148173
@Override
149174
public <T> Binder<T, ?, ?> getBinder(String name, Class<? extends T> bindingTargetType) {

0 commit comments

Comments
 (0)