Skip to content

Commit 1247ee1

Browse files
garyrussellartembilan
authored andcommitted
GH-786: DKPF: ContextStoppedEvent
Fixes #786 `DefaultKafkaProducerFactory` - use `ContextStoppedEvent` instead of `Lifecycle` to close producer(s). - add `reset` method - deprecate `Lifecycle` methods
1 parent 44aa81b commit 1247ee1

File tree

2 files changed

+115
-21
lines changed

2 files changed

+115
-21
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,34 @@
4242
import org.apache.kafka.common.errors.ProducerFencedException;
4343
import org.apache.kafka.common.serialization.Serializer;
4444

45+
import org.springframework.beans.BeansException;
4546
import org.springframework.beans.factory.DisposableBean;
46-
import org.springframework.context.Lifecycle;
47+
import org.springframework.context.ApplicationContext;
48+
import org.springframework.context.ApplicationContextAware;
49+
import org.springframework.context.ApplicationListener;
50+
import org.springframework.context.event.ContextStoppedEvent;
4751
import org.springframework.lang.Nullable;
4852
import org.springframework.util.Assert;
4953

5054
/**
51-
* The {@link ProducerFactory} implementation for the {@code singleton} shared {@link Producer}
52-
* instance.
55+
* The {@link ProducerFactory} implementation for a {@code singleton} shared
56+
* {@link Producer} instance.
5357
* <p>
54-
* This implementation will produce a new {@link Producer} instance (if transactions are not enabled).
55-
* for provided {@link Map} {@code configs} and optional {@link Serializer} {@code keySerializer},
56-
* {@code valueSerializer} implementations on each {@link #createProducer()}
57-
* invocation.
58+
* This implementation will return the same {@link Producer} instance (if transactions are
59+
* not enabled) for the provided {@link Map} {@code configs} and optional {@link Serializer}
60+
* {@code keySerializer}, {@code valueSerializer} implementations on each
61+
* {@link #createProducer()} invocation.
5862
* <p>
59-
* The {@link Producer} instance is freed from the external {@link Producer#close()} invocation
60-
* with the internal wrapper. The real {@link Producer#close()} is called on the target
61-
* {@link Producer} during the {@link Lifecycle#stop()} or {@link DisposableBean#destroy()}.
63+
* The {@link Producer} is wrapped and the underlying {@link KafkaProducer} instance is
64+
* not actually closed when {@link Producer#close()} is invoked. The {@link KafkaProducer}
65+
* is physically closed when {@link DisposableBean#destroy()} is invoked or when the
66+
* application context publishes a {@link ContextStoppedEvent}. You can also invoke
67+
* {@link #reset()}.
6268
* <p>
63-
* Setting {@link #setTransactionIdPrefix(String)} enables transactions; in which case, a cache
64-
* of producers is maintained; closing the producer returns it to the cache.
69+
* Setting {@link #setTransactionIdPrefix(String)} enables transactions; in which case, a
70+
* cache of producers is maintained; closing a producer returns it to the cache. The
71+
* producers are closed and the cache is cleared when the factory is destroyed, the
72+
* application context stopped, or the {@link #reset()} method is called.
6573
*
6674
* @param <K> the key type.
6775
* @param <V> the value type.
@@ -70,7 +78,8 @@
7078
* @author Murali Reddy
7179
* @author Nakul Mishra
7280
*/
73-
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
81+
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, ApplicationContextAware,
82+
ApplicationListener<ContextStoppedEvent>, DisposableBean {
7483

7584
private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30;
7685

@@ -92,7 +101,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
92101

93102
private String transactionIdPrefix;
94103

95-
private volatile boolean running;
104+
private ApplicationContext applicationContext;
96105

97106
/**
98107
* Construct a factory with the provided configuration.
@@ -116,6 +125,11 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
116125
this.valueSerializer = valueSerializer;
117126
}
118127

128+
@Override
129+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
130+
this.applicationContext = applicationContext;
131+
}
132+
119133
public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
120134
this.keySerializer = keySerializer;
121135
}
@@ -192,26 +206,53 @@ public void destroy() throws Exception { //NOSONAR
192206
}
193207

194208
@Override
195-
public void start() {
196-
this.running = true;
209+
public void onApplicationEvent(ContextStoppedEvent event) {
210+
if (event.getApplicationContext().equals(this.applicationContext)) {
211+
reset();
212+
}
197213
}
198214

215+
/**
216+
* NoOp.
217+
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented.
218+
*/
219+
@Deprecated
220+
public void start() {
221+
// NOSONAR
222+
}
199223

200-
@Override
224+
/**
225+
* NoOp.
226+
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented;
227+
* use {@link #reset()} to close the {@link Producer}(s).
228+
*/
229+
@Deprecated
201230
public void stop() {
231+
reset();
232+
}
233+
234+
/**
235+
* Close the {@link Producer}(s) and clear the cache of transactional
236+
* {@link Producer}(s).
237+
* @since 2.2
238+
*/
239+
public void reset() {
202240
try {
203241
destroy();
204-
this.running = false;
205242
}
206243
catch (Exception e) {
207244
logger.error("Exception while closing producer", e);
208245
}
209246
}
210247

211-
212-
@Override
248+
/**
249+
* NoOp.
250+
* @return always true.
251+
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented.
252+
*/
253+
@Deprecated
213254
public boolean isRunning() {
214-
return this.running;
255+
return true;
215256
}
216257

217258
@Override

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.Mockito.mock;
2424

2525
import java.util.HashMap;
26+
import java.util.Queue;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.atomic.AtomicInteger;
2829

@@ -31,6 +32,8 @@
3132
import org.junit.jupiter.api.Test;
3233
import org.mockito.InOrder;
3334

35+
import org.springframework.context.ApplicationContext;
36+
import org.springframework.context.event.ContextStoppedEvent;
3437
import org.springframework.kafka.test.utils.KafkaTestUtils;
3538
import org.springframework.kafka.transaction.KafkaTransactionManager;
3639
import org.springframework.transaction.CannotCreateTransactionException;
@@ -98,4 +101,54 @@ protected Producer createTransactionalProducer() {
98101
pf.destroy();
99102
}
100103

104+
@Test
105+
@SuppressWarnings({ "rawtypes", "unchecked" })
106+
public void testResetSingle() throws Exception {
107+
final Producer producer = mock(Producer.class);
108+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
109+
110+
@Override
111+
protected Producer createKafkaProducer() {
112+
return producer;
113+
}
114+
115+
};
116+
Producer aProducer = pf.createProducer();
117+
assertThat(aProducer).isNotNull();
118+
aProducer.close();
119+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNotNull();
120+
Queue cache = KafkaTestUtils.getPropertyValue(pf, "cache", Queue.class);
121+
assertThat(cache.size()).isEqualTo(0);
122+
pf.reset();
123+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
124+
}
125+
126+
@Test
127+
@SuppressWarnings({ "rawtypes", "unchecked" })
128+
public void testResetTx() throws Exception {
129+
final Producer producer = mock(Producer.class);
130+
ApplicationContext ctx = mock(ApplicationContext.class);
131+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
132+
133+
@Override
134+
protected Producer createTransactionalProducer() {
135+
producer.initTransactions();
136+
BlockingQueue<Producer> cache = getCache();
137+
Producer cached = cache.poll();
138+
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
139+
}
140+
141+
};
142+
pf.setApplicationContext(ctx);
143+
pf.setTransactionIdPrefix("foo");
144+
Producer aProducer = pf.createProducer();
145+
assertThat(aProducer).isNotNull();
146+
aProducer.close();
147+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
148+
Queue cache = KafkaTestUtils.getPropertyValue(pf, "cache", Queue.class);
149+
assertThat(cache.size()).isEqualTo(1);
150+
pf.onApplicationEvent(new ContextStoppedEvent(ctx));
151+
assertThat(cache.size()).isEqualTo(0);
152+
}
153+
101154
}

0 commit comments

Comments
 (0)