Skip to content

Commit 161b367

Browse files
garyrussellartembilan
authored andcommitted
GH-439: Container Lifecycle Fixes
Fixes #439 Ignore `stop()` if container is not running, `start()` if container is running. __cherry-pick to all branches__ * Use `doStop(Runnable)` optimization from the `stop()` # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java (cherry picked from commit 3b060d8)
1 parent 2336229 commit 161b367

File tree

1 file changed

+23
-15
lines changed

1 file changed

+23
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -196,35 +196,43 @@ public void setupMessageListener(Object messageListener) {
196196
@Override
197197
public final void start() {
198198
synchronized (this.lifecycleMonitor) {
199-
Assert.isTrue(
200-
this.containerProperties.getMessageListener() instanceof KafkaDataListener,
201-
"A " + KafkaDataListener.class.getName() + " implementation must be provided");
202-
doStart();
199+
if (!isRunning()) {
200+
Assert.isTrue(
201+
this.containerProperties.getMessageListener() instanceof GenericMessageListener,
202+
"A " + GenericMessageListener.class.getName() + " implementation must be provided");
203+
doStart();
204+
}
203205
}
204206
}
205207

206208
protected abstract void doStart();
207209

208210
@Override
209211
public final void stop() {
210-
final CountDownLatch latch = new CountDownLatch(1);
211-
stop(new Runnable() {
212-
@Override
213-
public void run() {
214-
latch.countDown();
212+
synchronized (this.lifecycleMonitor) {
213+
if (isRunning()) {
214+
final CountDownLatch latch = new CountDownLatch(1);
215+
doStop(new Runnable() {
216+
@Override
217+
public void run() {
218+
latch.countDown();
219+
}
220+
});
221+
try {
222+
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
223+
}
224+
catch (InterruptedException e) {
225+
}
215226
}
216-
});
217-
try {
218-
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
219-
}
220-
catch (InterruptedException e) {
221227
}
222228
}
223229

224230
@Override
225231
public void stop(Runnable callback) {
226232
synchronized (this.lifecycleMonitor) {
227-
doStop(callback);
233+
if (isRunning()) {
234+
doStop(callback);
235+
}
228236
}
229237
}
230238

0 commit comments

Comments
 (0)