Skip to content

Commit 07efad9

Browse files
GH-1765 - Fix using same factory for retry & main (#1768)
Fixes #1765
1 parent 45be8f0 commit 07efad9

File tree

6 files changed

+16
-20
lines changed

6 files changed

+16
-20
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackoffManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContaine
139139
@Override
140140
public void backOffIfNecessary(Context context) {
141141
long backoffTime = context.getDueTimestamp() - getCurrentMillisFromClock();
142+
LOGGER.debug(() -> "Back off time: " + backoffTime + " Context: " + context);
142143
if (backoffTime > 0) {
143144
pauseConsumptionAndThrow(context, backoffTime);
144145
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,10 @@ public class ListenerContainerFactoryConfigurer {
9898
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
9999
return isCached(containerFactory)
100100
? containerFactory
101-
: doConfigure(containerFactory, configuration.backOffValues);
101+
: addToCache(doConfigure(containerFactory, configuration.backOffValues));
102102
}
103103

104-
public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOff(
104+
public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOffValues(
105105
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
106106
return isCached(containerFactory)
107107
? containerFactory
@@ -114,7 +114,7 @@ public class ListenerContainerFactoryConfigurer {
114114
setupBackoffAwareMessageListenerAdapter(container, backOffValues));
115115
containerFactory
116116
.setErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create()));
117-
return addToCache(containerFactory);
117+
return containerFactory;
118118
}
119119

120120
private boolean isCached(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandl
355355
.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName,
356356
configuration.forContainerFactoryResolver());
357357
return this.listenerContainerFactoryConfigurer
358-
.configureWithoutBackOff(resolvedFactory, configuration.forContainerFactoryConfigurer());
358+
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
359359
}
360360

361361
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForRetryEndpoint(

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ void shouldNotSetPolltimoutAndPartitionIdleIfNoBackOffProvided() {
304304
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
305305
deadLetterPublishingRecovererFactory, clock);
306306
configurer
307-
.configureWithoutBackOff(containerFactory, configuration.forContainerFactoryConfigurer());
307+
.configureWithoutBackOffValues(containerFactory, configuration.forContainerFactoryConfigurer());
308308

309309
// then
310310
then(containerFactory).should(times(1))

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ void shouldConfigureRetryEndpoints() {
225225
defaultFactoryBeanName, factoryResolverConfig);
226226
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configure(containerFactory,
227227
lcfcConfiguration);
228-
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configureWithoutBackOff(containerFactory,
228+
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configureWithoutBackOffValues(containerFactory,
229229
lcfcConfiguration);
230230

231231
RetryTopicConfigurer configurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver,
Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@
7272
*/
7373
@SpringJUnitConfig
7474
@DirtiesContext
75-
@EmbeddedKafka(topics = { RetryableTopicIntegrationTests.FIRST_TOPIC,
76-
RetryableTopicIntegrationTests.SECOND_TOPIC,
77-
RetryableTopicIntegrationTests.THIRD_TOPIC,
78-
RetryableTopicIntegrationTests.FOURTH_TOPIC }, partitions = 1)
75+
@EmbeddedKafka(topics = { RetryTopicIntegrationTests.FIRST_TOPIC,
76+
RetryTopicIntegrationTests.SECOND_TOPIC,
77+
RetryTopicIntegrationTests.THIRD_TOPIC,
78+
RetryTopicIntegrationTests.FOURTH_TOPIC }, partitions = 1)
7979
@TestPropertySource(properties = "five.attempts=5")
80-
public class RetryableTopicIntegrationTests {
80+
public class RetryTopicIntegrationTests {
8181

82-
private static final Logger logger = LoggerFactory.getLogger(RetryableTopicIntegrationTests.class);
82+
private static final Logger logger = LoggerFactory.getLogger(RetryTopicIntegrationTests.class);
8383

8484
public final static String FIRST_TOPIC = "myRetryTopic1";
8585

@@ -183,7 +183,7 @@ static class ThirdTopicListener {
183183
CountDownLatchContainer container;
184184

185185
@RetryableTopic(attempts = "${five.attempts}",
186-
backoff = @Backoff(delay = 70, maxDelay = 120, multiplier = 1.5),
186+
backoff = @Backoff(delay = 250, maxDelay = 1000, multiplier = 1.5),
187187
numPartitions = "#{3}",
188188
timeout = "${missing.property:2000}",
189189
include = MyRetryException.class, kafkaTemplate = "kafkaTemplate")
@@ -207,7 +207,7 @@ static class FourthTopicListener {
207207
@Autowired
208208
CountDownLatchContainer container;
209209

210-
@RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "4", backoff = @Backoff(50),
210+
@RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "4", backoff = @Backoff(300),
211211
kafkaTemplate = "kafkaTemplate")
212212
@KafkaListener(topics = FOURTH_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
213213
public void listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
@@ -320,7 +320,7 @@ public RetryTopicConfiguration firstRetryTopic(KafkaTemplate<String, String> tem
320320
public RetryTopicConfiguration secondRetryTopic(KafkaTemplate<String, String> template) {
321321
return RetryTopicConfigurationBuilder
322322
.newInstance()
323-
.exponentialBackoff(50, 2, 10000)
323+
.exponentialBackoff(500, 2, 10000)
324324
.retryOn(Arrays.asList(IllegalStateException.class, IllegalAccessException.class))
325325
.traversingCauses()
326326
.includeTopic(SECOND_TOPIC)
@@ -344,7 +344,6 @@ public ThirdTopicListener thirdTopicListener() {
344344
return new ThirdTopicListener();
345345
}
346346

347-
348347
@Bean
349348
public FourthTopicListener fourthTopicListener() {
350349
return new FourthTopicListener();
@@ -482,10 +481,6 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
482481
ConsumerFactory<String, String> consumerFactory) {
483482

484483
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
485-
ContainerProperties props = factory.getContainerProperties();
486-
props.setIdleEventInterval(100L);
487-
props.setPollTimeout(50L);
488-
props.setIdlePartitionEventInterval(100L);
489484
factory.setConsumerFactory(consumerFactory);
490485
factory.setConcurrency(1);
491486
return factory;

0 commit comments

Comments
 (0)