Skip to content

Commit fc21172

Browse files
committed
Fix failed Junits
1 parent 8baa676 commit fc21172

File tree

6 files changed

+225
-65
lines changed

6 files changed

+225
-65
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
9494

9595
protected final ReentrantLock lifecycleLock = new ReentrantLock(); // NOSONAR
9696

97-
protected final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean();
97+
private final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean();
9898

9999
private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();
100100

@@ -286,11 +286,11 @@ public boolean isRunning() {
286286
return this.running;
287287
}
288288

289-
protected void setFenced(boolean fenced) {
289+
void setFenced(boolean fenced) {
290290
this.fenced = fenced;
291291
}
292292

293-
boolean isFenced() {
293+
protected boolean isFenced() {
294294
return this.fenced;
295295
}
296296

@@ -299,6 +299,14 @@ protected boolean isPaused() {
299299
return this.paused;
300300
}
301301

302+
protected boolean isEnforceRebalanceRequested() {
303+
return this.enforceRebalanceRequested.get();
304+
}
305+
306+
protected void setEnforceRebalanceRequested(boolean enforceRebalance) {
307+
this.enforceRebalanceRequested.set(enforceRebalance);
308+
}
309+
302310
@Override
303311
public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
304312
return this.pauseRequestedPartitions.contains(topicPartition);
@@ -734,7 +742,7 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
734742
protected void publishContainerStoppedEvent() {
735743
ApplicationEventPublisher eventPublisher = getApplicationEventPublisher();
736744
if (eventPublisher != null) {
737-
eventPublisher.publishEvent(new ContainerStoppedEvent(this, parentContainerOrThis()));
745+
eventPublisher.publishEvent(new ContainerStoppedEvent(this, parentOrThis()));
738746
}
739747
}
740748

@@ -747,20 +755,6 @@ protected void publishContainerStoppedEvent() {
747755
return this;
748756
}
749757

750-
/**
751-
* Return the actual {@link ConcurrentMessageListenerContainer} if the parent is instance of
752-
* {@link ConcurrentMessageListenerContainerRef}.
753-
*
754-
* @return the parent or this
755-
* @since 3.3
756-
*/
757-
AbstractMessageListenerContainer<?, ?> parentContainerOrThis() {
758-
if (parentOrThis() instanceof ConcurrentMessageListenerContainerRef) {
759-
return ((ConcurrentMessageListenerContainerRef) parentOrThis()).getConcurrentContainer();
760-
}
761-
return parentOrThis();
762-
}
763-
764758
/**
765759
* Make any default consumer override properties explicit properties.
766760
* @return the properties.

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,12 @@ private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperti
308308
ConcurrentMessageListenerContainerRef concurrentMessageListenerContainerRef =
309309
new ConcurrentMessageListenerContainerRef<>(this, this.lifecycleLock);
310310
if (topicPartitions == null) {
311-
container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this.consumerFactory,
312-
containerProperties); // NOSONAR
311+
container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this,
312+
this.consumerFactory, containerProperties); // NOSONAR
313313
}
314314
else {
315-
container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this.consumerFactory,
316-
containerProperties, partitionSubset(containerProperties, i)); // NOSONAR
315+
container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this,
316+
this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); // NOSONAR
317317
}
318318
concurrentMessageListenerContainerRef.setKafkaMessageListenerContainer(container);
319319
return container;

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java

Lines changed: 136 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919
import java.util.Collection;
2020
import java.util.Map;
2121
import java.util.concurrent.locks.ReentrantLock;
22+
import java.util.function.Function;
2223

2324
import org.apache.commons.logging.LogFactory;
2425
import org.apache.kafka.common.Metric;
2526
import org.apache.kafka.common.MetricName;
2627
import org.apache.kafka.common.TopicPartition;
2728

29+
import org.springframework.context.ApplicationContext;
30+
import org.springframework.context.ApplicationEventPublisher;
2831
import org.springframework.core.log.LogAccessor;
32+
import org.springframework.kafka.core.KafkaAdmin;
2933
import org.springframework.kafka.event.ConsumerStoppedEvent;
34+
import org.springframework.lang.Nullable;
3035

3136
/**
3237
* Reference of {@link ConcurrentMessageListenerContainer} to be passed to the {@link KafkaMessageListenerContainer}.
@@ -268,18 +273,141 @@ else if (this.concurrentMessageListenerContainer.isFenced() &&
268273
}
269274
}
270275

271-
AbstractMessageListenerContainer<?, ?> getConcurrentContainer() {
272-
return this.concurrentMessageListenerContainer;
276+
@Nullable
277+
protected ApplicationContext getApplicationContext() {
278+
return this.concurrentMessageListenerContainer.getApplicationContext();
273279
}
274280

275-
@Override
276-
public int hashCode() {
277-
return this.concurrentMessageListenerContainer.hashCode();
281+
/**
282+
* Get the event publisher.
283+
* @return the publisher
284+
*/
285+
@Nullable
286+
public ApplicationEventPublisher getApplicationEventPublisher() {
287+
return this.concurrentMessageListenerContainer.getApplicationEventPublisher();
278288
}
279289

280-
@Override
281-
public boolean equals(Object obj) {
282-
return this.concurrentMessageListenerContainer.equals(obj);
290+
/**
291+
* Get the {@link CommonErrorHandler}.
292+
* @return the handler.
293+
* @since 2.8
294+
*/
295+
@Nullable
296+
public CommonErrorHandler getCommonErrorHandler() {
297+
return this.concurrentMessageListenerContainer.getCommonErrorHandler();
298+
}
299+
300+
protected boolean isStoppedNormally() {
301+
return this.concurrentMessageListenerContainer.isStoppedNormally();
302+
}
303+
304+
protected void setStoppedNormally(boolean stoppedNormally) {
305+
this.concurrentMessageListenerContainer.setStoppedNormally(stoppedNormally);
306+
}
307+
308+
protected void setRunning(boolean running) {
309+
this.concurrentMessageListenerContainer.setRunning(running);
310+
}
311+
312+
protected boolean isEnforceRebalanceRequested() {
313+
return this.concurrentMessageListenerContainer.isEnforceRebalanceRequested();
314+
}
315+
316+
protected void setEnforceRebalanceRequested(boolean enforceRebalance) {
317+
this.concurrentMessageListenerContainer.setEnforceRebalanceRequested(enforceRebalance);
318+
}
319+
320+
/**
321+
* Return the currently configured {@link AfterRollbackProcessor}.
322+
* @return the after rollback processor.
323+
* @since 2.2.14
324+
*/
325+
public AfterRollbackProcessor<? super K, ? super V> getAfterRollbackProcessor() {
326+
return this.concurrentMessageListenerContainer.getAfterRollbackProcessor();
327+
}
328+
329+
public boolean isChangeConsumerThreadName() {
330+
return this.concurrentMessageListenerContainer.isChangeConsumerThreadName();
331+
}
332+
333+
/**
334+
* Set to true to instruct the container to change the consumer thread name during
335+
* initialization.
336+
* @param changeConsumerThreadName true to change.
337+
* @since 3.0.1
338+
* @see #setThreadNameSupplier(Function)
339+
*/
340+
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) {
341+
this.concurrentMessageListenerContainer.setChangeConsumerThreadName(changeConsumerThreadName);
342+
}
343+
344+
/**
345+
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
346+
* present.
347+
* @return the kafkaAdmin
348+
* @since 3.0.5
349+
*/
350+
@Nullable
351+
public KafkaAdmin getKafkaAdmin() {
352+
return this.concurrentMessageListenerContainer.getKafkaAdmin();
353+
}
354+
355+
public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
356+
this.concurrentMessageListenerContainer.setKafkaAdmin(kafkaAdmin);
357+
}
358+
359+
protected RecordInterceptor<K, V> getRecordInterceptor() {
360+
return this.concurrentMessageListenerContainer.getRecordInterceptor();
361+
}
362+
363+
/**
364+
* Set an interceptor to be called before calling the record listener.
365+
* Does not apply to batch listeners.
366+
* @param recordInterceptor the interceptor.
367+
* @since 2.2.7
368+
* @see #setInterceptBeforeTx(boolean)
369+
*/
370+
public void setRecordInterceptor(RecordInterceptor recordInterceptor) {
371+
this.concurrentMessageListenerContainer.setRecordInterceptor(recordInterceptor);
372+
}
373+
374+
protected BatchInterceptor<K, V> getBatchInterceptor() {
375+
return this.concurrentMessageListenerContainer.getBatchInterceptor();
376+
}
377+
378+
/**
379+
* Set an interceptor to be called before calling the record listener.
380+
* @param batchInterceptor the interceptor.
381+
* @since 2.6.6
382+
* @see #setInterceptBeforeTx(boolean)
383+
*/
384+
public void setBatchInterceptor(BatchInterceptor batchInterceptor) {
385+
this.concurrentMessageListenerContainer.setBatchInterceptor(batchInterceptor);
386+
}
387+
388+
protected boolean isInterceptBeforeTx() {
389+
return this.concurrentMessageListenerContainer.isInterceptBeforeTx();
390+
}
391+
392+
/**
393+
* When false, invoke the interceptor after the transaction starts.
394+
* @param interceptBeforeTx false to intercept within the transaction.
395+
* Default true since 2.8.
396+
* @since 2.3.4
397+
* @see #setRecordInterceptor(RecordInterceptor)
398+
* @see #setBatchInterceptor(BatchInterceptor)
399+
*/
400+
public void setInterceptBeforeTx(boolean interceptBeforeTx) {
401+
this.concurrentMessageListenerContainer.setInterceptBeforeTx(interceptBeforeTx);
402+
}
403+
404+
/**
405+
* Return this or a parent container if this has a parent.
406+
* @return the parent or this.
407+
* @since 2.2.1
408+
*/
409+
protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
410+
return this;
283411
}
284412

285413
}

0 commit comments

Comments
 (0)