Skip to content

Commit 8baa676

Browse files
committed
Forbid fenced Container to stop ConcurContainer
[DRAFT] Fixes #GH-3448 #3448 Issue: Fenced Child Container could stop the running ConcurrentContainer Fix: Configure KafkaMessageListenerContainer (KMLC) to use ConcurrentMessagleListenerContainerRef instead ofConcurrentContainer. Internally, ConcurrentContainerRef checks if KMLC is fenced when stop operations are called on Concurrent Container. If KMLC is fenced, suppress the `stop` related operations. If KMLC is not fenced, delegate the stop call to ConcurrentContainer.
1 parent 6a7d02e commit 8baa676

File tree

6 files changed

+629
-24
lines changed

6 files changed

+629
-24
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,14 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
193193
}
194194
}
195195

196+
/**
197+
* To be used only with {@link ConcurrentMessageListenerContainerRef}.
198+
*/
199+
AbstractMessageListenerContainer() {
200+
this.containerProperties = null;
201+
this.consumerFactory = null;
202+
}
203+
196204
@Override
197205
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
198206
this.applicationContext = applicationContext;
@@ -282,6 +290,10 @@ protected void setFenced(boolean fenced) {
282290
this.fenced = fenced;
283291
}
284292

293+
boolean isFenced() {
294+
return this.fenced;
295+
}
296+
285297
@Deprecated(since = "3.2", forRemoval = true)
286298
protected boolean isPaused() {
287299
return this.paused;
@@ -722,7 +734,7 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
722734
protected void publishContainerStoppedEvent() {
723735
ApplicationEventPublisher eventPublisher = getApplicationEventPublisher();
724736
if (eventPublisher != null) {
725-
eventPublisher.publishEvent(new ContainerStoppedEvent(this, parentOrThis()));
737+
eventPublisher.publishEvent(new ContainerStoppedEvent(this, parentContainerOrThis()));
726738
}
727739
}
728740

@@ -735,6 +747,20 @@ protected void publishContainerStoppedEvent() {
735747
return this;
736748
}
737749

750+
/**
751+
* Return the actual {@link ConcurrentMessageListenerContainer} if the parent is instance of
752+
* {@link ConcurrentMessageListenerContainerRef}.
753+
*
754+
* @return the parent or this
755+
* @since 3.3
756+
*/
757+
AbstractMessageListenerContainer<?, ?> parentContainerOrThis() {
758+
if (parentOrThis() instanceof ConcurrentMessageListenerContainerRef) {
759+
return ((ConcurrentMessageListenerContainerRef) parentOrThis()).getConcurrentContainer();
760+
}
761+
return parentOrThis();
762+
}
763+
738764
/**
739765
* Make any default consumer override properties explicit properties.
740766
* @return the properties.

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,13 +305,17 @@ private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperti
305305
@Nullable TopicPartitionOffset[] topicPartitions, int i) {
306306

307307
KafkaMessageListenerContainer<K, V> container;
308+
ConcurrentMessageListenerContainerRef concurrentMessageListenerContainerRef =
309+
new ConcurrentMessageListenerContainerRef<>(this, this.lifecycleLock);
308310
if (topicPartitions == null) {
309-
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); // NOSONAR
311+
container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this.consumerFactory,
312+
containerProperties); // NOSONAR
310313
}
311314
else {
312-
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, // NOSONAR
313-
containerProperties, partitionSubset(containerProperties, i));
315+
container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this.consumerFactory,
316+
containerProperties, partitionSubset(containerProperties, i)); // NOSONAR
314317
}
318+
concurrentMessageListenerContainerRef.setKafkaMessageListenerContainer(container);
315319
return container;
316320
}
317321

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collection;
20+
import java.util.Map;
21+
import java.util.concurrent.locks.ReentrantLock;
22+
23+
import org.apache.commons.logging.LogFactory;
24+
import org.apache.kafka.common.Metric;
25+
import org.apache.kafka.common.MetricName;
26+
import org.apache.kafka.common.TopicPartition;
27+
28+
import org.springframework.core.log.LogAccessor;
29+
import org.springframework.kafka.event.ConsumerStoppedEvent;
30+
31+
/**
32+
* Reference of {@link ConcurrentMessageListenerContainer} to be passed to the {@link KafkaMessageListenerContainer}.
33+
* This container is used for internal purpose. Detects if the {@link KafkaMessageListenerContainer} is fenced and
34+
* forbids `stop` calls on {@link ConcurrentMessageListenerContainer}
35+
*
36+
* @param <K> the key type.
37+
* @param <V> the value type.
38+
* @author Lokesh Alamuri
39+
*/
40+
class ConcurrentMessageListenerContainerRef<K, V> extends AbstractMessageListenerContainer {
41+
42+
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); // NOSONAR
43+
44+
private final ConcurrentMessageListenerContainer concurrentMessageListenerContainer;
45+
46+
private final ReentrantLock lifecycleLock;
47+
48+
private KafkaMessageListenerContainer kafkaMessageListenerContainer;
49+
50+
ConcurrentMessageListenerContainerRef(ConcurrentMessageListenerContainer concurrentMessageListenerContainer,
51+
ReentrantLock lifecycleLock) {
52+
super();
53+
this.concurrentMessageListenerContainer = concurrentMessageListenerContainer;
54+
this.lifecycleLock = lifecycleLock;
55+
}
56+
57+
void setKafkaMessageListenerContainer(KafkaMessageListenerContainer kafkaMessageListenerContainer) {
58+
this.kafkaMessageListenerContainer = kafkaMessageListenerContainer;
59+
}
60+
61+
@Override
62+
public void setupMessageListener(Object messageListener) {
63+
throw new UnsupportedOperationException("This container doesn't support setting up MessageListener");
64+
}
65+
66+
@Override
67+
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
68+
return this.concurrentMessageListenerContainer.metrics();
69+
}
70+
71+
@Override
72+
public ContainerProperties getContainerProperties() {
73+
return this.concurrentMessageListenerContainer.getContainerProperties();
74+
}
75+
76+
@Override
77+
public Collection<TopicPartition> getAssignedPartitions() {
78+
return this.concurrentMessageListenerContainer.getAssignedPartitions();
79+
}
80+
81+
@Override
82+
public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
83+
return this.concurrentMessageListenerContainer.getAssignmentsByClientId();
84+
}
85+
86+
@Override
87+
public void enforceRebalance() {
88+
this.concurrentMessageListenerContainer.enforceRebalance();
89+
}
90+
91+
@Override
92+
public void pause() {
93+
this.concurrentMessageListenerContainer.pause();
94+
}
95+
96+
@Override
97+
public void resume() {
98+
this.concurrentMessageListenerContainer.resume();
99+
}
100+
101+
@Override
102+
public void pausePartition(TopicPartition topicPartition) {
103+
this.concurrentMessageListenerContainer.pausePartition(topicPartition);
104+
}
105+
106+
@Override
107+
public void resumePartition(TopicPartition topicPartition) {
108+
this.concurrentMessageListenerContainer.resumePartition(topicPartition);
109+
}
110+
111+
@Override
112+
public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
113+
return this.concurrentMessageListenerContainer.isPartitionPauseRequested(topicPartition);
114+
}
115+
116+
@Override
117+
public boolean isPartitionPaused(TopicPartition topicPartition) {
118+
return this.concurrentMessageListenerContainer.isPartitionPaused(topicPartition);
119+
}
120+
121+
@Override
122+
public boolean isPauseRequested() {
123+
return this.concurrentMessageListenerContainer.isPauseRequested();
124+
}
125+
126+
@Override
127+
public boolean isContainerPaused() {
128+
return this.concurrentMessageListenerContainer.isContainerPaused();
129+
}
130+
131+
@Override
132+
public String getGroupId() {
133+
return this.concurrentMessageListenerContainer.getGroupId();
134+
}
135+
136+
@Override
137+
public String getListenerId() {
138+
return this.concurrentMessageListenerContainer.getListenerId();
139+
}
140+
141+
@Override
142+
public String getMainListenerId() {
143+
return this.concurrentMessageListenerContainer.getMainListenerId();
144+
}
145+
146+
@Override
147+
public byte[] getListenerInfo() {
148+
return this.concurrentMessageListenerContainer.getListenerInfo();
149+
}
150+
151+
@Override
152+
public boolean isChildRunning() {
153+
return this.concurrentMessageListenerContainer.isChildRunning();
154+
}
155+
156+
@Override
157+
public boolean isInExpectedState() {
158+
return this.concurrentMessageListenerContainer.isInExpectedState();
159+
}
160+
161+
@Override
162+
public void stopAbnormally(Runnable callback) {
163+
this.lifecycleLock.lock();
164+
try {
165+
if (!this.kafkaMessageListenerContainer.isFenced()) {
166+
// kafkaMessageListenerContainer is not fenced. Allow stopAbnormally call on
167+
// concurrentMessageListenerContainer
168+
this.concurrentMessageListenerContainer.stopAbnormally(callback);
169+
}
170+
else if (this.concurrentMessageListenerContainer.isFenced() &&
171+
!this.concurrentMessageListenerContainer.isRunning()) {
172+
// kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow
173+
// callback to run
174+
callback.run();
175+
}
176+
else {
177+
this.logger.error(() -> String.format("Suppressed `stopAbnormal` operation called by " +
178+
"MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]"));
179+
}
180+
}
181+
finally {
182+
this.lifecycleLock.unlock();
183+
}
184+
}
185+
186+
@Override
187+
protected void doStop(Runnable callback, boolean normal) {
188+
this.lifecycleLock.lock();
189+
try {
190+
if (!this.kafkaMessageListenerContainer.isFenced()) {
191+
// kafkaMessageListenerContainer is not fenced. Allow doStop call on
192+
// concurrentMessageListenerContainer
193+
this.concurrentMessageListenerContainer.doStop(callback, normal);
194+
}
195+
else if (this.concurrentMessageListenerContainer.isFenced() &&
196+
!this.concurrentMessageListenerContainer.isRunning()) {
197+
// kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow
198+
// callback to run
199+
callback.run();
200+
}
201+
else {
202+
this.logger.error(() -> String.format("Suppressed `doStop` operation called by " +
203+
"MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]"));
204+
}
205+
}
206+
finally {
207+
this.lifecycleLock.unlock();
208+
}
209+
}
210+
211+
@Override
212+
public MessageListenerContainer getContainerFor(String topic, int partition) {
213+
return this.concurrentMessageListenerContainer.getContainerFor(topic, partition);
214+
}
215+
216+
@Override
217+
public void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) {
218+
this.concurrentMessageListenerContainer.childStopped(child, reason);
219+
}
220+
221+
@Override
222+
public void childStarted(MessageListenerContainer child) {
223+
this.concurrentMessageListenerContainer.childStarted(child);
224+
}
225+
226+
@Override
227+
protected void doStart() {
228+
this.concurrentMessageListenerContainer.doStart();
229+
}
230+
231+
@Override
232+
public boolean isRunning() {
233+
return this.concurrentMessageListenerContainer.isRunning();
234+
}
235+
236+
@Override
237+
public boolean isAutoStartup() {
238+
return this.concurrentMessageListenerContainer.isAutoStartup();
239+
}
240+
241+
@Override
242+
public void setAutoStartup(boolean autoStartup) {
243+
throw new UnsupportedOperationException("This container doesn't support `setAutoStartup`");
244+
}
245+
246+
@Override
247+
public void stop(Runnable callback) {
248+
this.lifecycleLock.lock();
249+
try {
250+
if (!this.kafkaMessageListenerContainer.isFenced()) {
251+
// kafkaMessageListenerContainer is not fenced. Allow stop call on
252+
// concurrentMessageListenerContainer
253+
this.concurrentMessageListenerContainer.stop(callback);
254+
}
255+
else if (this.concurrentMessageListenerContainer.isFenced() &&
256+
!this.concurrentMessageListenerContainer.isRunning()) {
257+
// kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow
258+
// callback to run
259+
callback.run();
260+
}
261+
else {
262+
this.logger.error(() -> String.format("Suppressed `stop` operation called by " +
263+
"MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]"));
264+
}
265+
}
266+
finally {
267+
this.lifecycleLock.unlock();
268+
}
269+
}
270+
271+
AbstractMessageListenerContainer<?, ?> getConcurrentContainer() {
272+
return this.concurrentMessageListenerContainer;
273+
}
274+
275+
@Override
276+
public int hashCode() {
277+
return this.concurrentMessageListenerContainer.hashCode();
278+
}
279+
280+
@Override
281+
public boolean equals(Object obj) {
282+
return this.concurrentMessageListenerContainer.equals(obj);
283+
}
284+
285+
}

0 commit comments

Comments
 (0)