1717package org .springframework .kafka .retrytopic ;
1818
1919import java .time .Clock ;
20- import java .util .Collections ;
2120import java .util .Comparator ;
2221import java .util .HashSet ;
2322import java .util .List ;
@@ -94,7 +93,7 @@ public class ListenerContainerFactoryConfigurer {
9493
9594 private final Clock clock ;
9695
97- ListenerContainerFactoryConfigurer (KafkaConsumerBackoffManager kafkaConsumerBackoffManager ,
96+ public ListenerContainerFactoryConfigurer (KafkaConsumerBackoffManager kafkaConsumerBackoffManager ,
9897 DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory ,
9998 @ Qualifier (RetryTopicInternalBeanNames
10099 .INTERNAL_BACKOFF_CLOCK_BEAN_NAME ) Clock clock ) {
@@ -116,7 +115,7 @@ public class ListenerContainerFactoryConfigurer {
116115 ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , Configuration configuration ) {
117116 return isCached (containerFactory )
118117 ? containerFactory
119- : addToCache (doConfigure (containerFactory , configuration . backOffValues ));
118+ : addToCache (doConfigure (containerFactory , configuration , true ));
120119 }
121120
122121 /**
@@ -126,14 +125,14 @@ public class ListenerContainerFactoryConfigurer {
126125 * @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
127126 * @return the configured factory instance.
128127 * @deprecated in favor of
129- * {@link #decorateFactoryWithoutBackOffValues (ConcurrentKafkaListenerContainerFactory, Configuration)}.
128+ * {@link #decorateFactoryWithoutSettingContainerProperties (ConcurrentKafkaListenerContainerFactory, Configuration)}.
130129 */
131130 @ Deprecated
132131 public ConcurrentKafkaListenerContainerFactory <?, ?> configureWithoutBackOffValues (
133132 ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , Configuration configuration ) {
134133 return isCached (containerFactory )
135134 ? containerFactory
136- : doConfigure (containerFactory , Collections . emptyList () );
135+ : doConfigure (containerFactory , configuration , false );
137136 }
138137
139138 /**
@@ -144,7 +143,7 @@ public class ListenerContainerFactoryConfigurer {
144143 */
145144 public KafkaListenerContainerFactory <?> decorateFactory (ConcurrentKafkaListenerContainerFactory <?, ?> factory ,
146145 Configuration configuration ) {
147- return new RetryTopicListenerContainerFactoryDecorator (factory , configuration . backOffValues );
146+ return new RetryTopicListenerContainerFactoryDecorator (factory , configuration , true );
148147 }
149148
150149 /**
@@ -154,18 +153,19 @@ public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerC
154153 * @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
155154 * @return the decorated factory instance.
156155 */
157- public KafkaListenerContainerFactory <?> decorateFactoryWithoutBackOffValues (
156+ public KafkaListenerContainerFactory <?> decorateFactoryWithoutSettingContainerProperties (
158157 ConcurrentKafkaListenerContainerFactory <?, ?> factory , Configuration configuration ) {
159- return new RetryTopicListenerContainerFactoryDecorator (factory , Collections . emptyList () );
158+ return new RetryTopicListenerContainerFactoryDecorator (factory , configuration , false );
160159 }
161160
162161 private ConcurrentKafkaListenerContainerFactory <?, ?> doConfigure (
163- ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , List <Long > backOffValues ) {
162+ ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , Configuration configuration ,
163+ boolean isSetContainerProperties ) {
164164
165165 containerFactory
166- .setContainerCustomizer (container -> setupBackoffAwareMessageListenerAdapter (container , backOffValues ));
166+ .setContainerCustomizer (container -> setupBackoffAwareMessageListenerAdapter (container , configuration , isSetContainerProperties ));
167167 containerFactory
168- .setCommonErrorHandler (createErrorHandler (this .deadLetterPublishingRecovererFactory .create ()));
168+ .setCommonErrorHandler (createErrorHandler (this .deadLetterPublishingRecovererFactory .create (), configuration ));
169169 return containerFactory ;
170170 }
171171
@@ -191,7 +191,8 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
191191 this .errorHandlerCustomizer = errorHandlerCustomizer ;
192192 }
193193
194- private CommonErrorHandler createErrorHandler (DeadLetterPublishingRecoverer deadLetterPublishingRecoverer ) {
194+ protected CommonErrorHandler createErrorHandler (DeadLetterPublishingRecoverer deadLetterPublishingRecoverer ,
195+ Configuration configuration ) {
195196 DefaultErrorHandler errorHandler = new DefaultErrorHandler (deadLetterPublishingRecoverer ,
196197 new FixedBackOff (0 , 0 ));
197198 errorHandler .setCommitRecovered (true );
@@ -200,52 +201,52 @@ private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer dead
200201 return errorHandler ;
201202 }
202203
203- private void setupBackoffAwareMessageListenerAdapter (ConcurrentMessageListenerContainer <?, ?> container ,
204- List < Long > backOffValues ) {
204+ protected void setupBackoffAwareMessageListenerAdapter (ConcurrentMessageListenerContainer <?, ?> container ,
205+ Configuration configuration , boolean isSetContainerProperties ) {
205206 AcknowledgingConsumerAwareMessageListener <?, ?> listener = checkAndCast (container .getContainerProperties ()
206207 .getMessageListener (), AcknowledgingConsumerAwareMessageListener .class );
207208
208- configurePollTimeoutAndIdlePartitionInterval (container , backOffValues );
209+ if (isSetContainerProperties && !configuration .backOffValues .isEmpty ()) {
210+ configurePollTimeoutAndIdlePartitionInterval (container , configuration );
211+ }
209212
210213 container .setupMessageListener (new KafkaBackoffAwareMessageListenerAdapter <>(listener ,
211214 this .kafkaConsumerBackoffManager , container .getListenerId (), this .clock )); // NOSONAR
212215
213216 this .containerCustomizer .accept (container );
214217 }
215218
216- private void configurePollTimeoutAndIdlePartitionInterval (ConcurrentMessageListenerContainer <?, ?> container ,
217- List <Long > backOffValues ) {
218- if (backOffValues .isEmpty ()) {
219- return ;
220- }
219+ protected void configurePollTimeoutAndIdlePartitionInterval (ConcurrentMessageListenerContainer <?, ?> container ,
220+ Configuration configuration ) {
221221
222222 ContainerProperties containerProperties = container .getContainerProperties ();
223223
224- long pollTimeoutValue = getPollTimeoutValue (containerProperties , backOffValues );
224+ long pollTimeoutValue = getPollTimeoutValue (containerProperties , configuration );
225225 long idlePartitionEventInterval = getIdlePartitionInterval (containerProperties , pollTimeoutValue );
226226
227227 LOGGER .debug (() -> "pollTimeout and idlePartitionEventInterval for back off values "
228- + backOffValues + " will be set to " + pollTimeoutValue
228+ + configuration . backOffValues + " will be set to " + pollTimeoutValue
229229 + " and " + idlePartitionEventInterval );
230230
231231 containerProperties
232232 .setIdlePartitionEventInterval (idlePartitionEventInterval );
233233 containerProperties .setPollTimeout (pollTimeoutValue );
234234 }
235235
236- private long getIdlePartitionInterval (ContainerProperties containerProperties , long pollTimeoutValue ) {
236+ protected long getIdlePartitionInterval (ContainerProperties containerProperties , long pollTimeoutValue ) {
237237 Long idlePartitionEventInterval = containerProperties .getIdlePartitionEventInterval ();
238238 return idlePartitionEventInterval != null && idlePartitionEventInterval > 0
239239 ? idlePartitionEventInterval
240240 : pollTimeoutValue ;
241241 }
242242
243- private long getPollTimeoutValue (ContainerProperties containerProperties , List <Long > backOffValues ) {
244- if (containerProperties .getPollTimeout () != ContainerProperties .DEFAULT_POLL_TIMEOUT ) {
243+ protected long getPollTimeoutValue (ContainerProperties containerProperties , Configuration configuration ) {
244+ if (containerProperties .getPollTimeout () != ContainerProperties .DEFAULT_POLL_TIMEOUT
245+ || configuration .backOffValues .isEmpty ()) {
245246 return containerProperties .getPollTimeout ();
246247 }
247248
248- Long lowestBackOff = backOffValues
249+ Long lowestBackOff = configuration . backOffValues
249250 .stream ()
250251 .min (Comparator .naturalOrder ())
251252 .orElseThrow (() -> new IllegalArgumentException ("No back off values found!" ));
@@ -267,14 +268,19 @@ private <T> T checkAndCast(Object obj, Class<T> clazz) {
267268 return (T ) obj ;
268269 }
269270
270- private class RetryTopicListenerContainerFactoryDecorator implements KafkaListenerContainerFactory <ConcurrentMessageListenerContainer <?, ?>> {
271+ private class RetryTopicListenerContainerFactoryDecorator
272+ implements KafkaListenerContainerFactory <ConcurrentMessageListenerContainer <?, ?>> {
271273
272274 private final ConcurrentKafkaListenerContainerFactory <?, ?> delegate ;
273- private final List <Long > backOffValues ;
275+ private final Configuration configuration ;
276+ private final boolean isSetContainerProperties ;
274277
275- RetryTopicListenerContainerFactoryDecorator (ConcurrentKafkaListenerContainerFactory <?, ?> delegate , List <Long > backOffValues ) {
278+ RetryTopicListenerContainerFactoryDecorator (ConcurrentKafkaListenerContainerFactory <?, ?> delegate ,
279+ Configuration configuration ,
280+ boolean isSetContainerProperties ) {
276281 this .delegate = delegate ;
277- this .backOffValues = backOffValues ;
282+ this .configuration = configuration ;
283+ this .isSetContainerProperties = isSetContainerProperties ;
278284 }
279285
280286 @ Override
@@ -283,10 +289,11 @@ private class RetryTopicListenerContainerFactoryDecorator implements KafkaListen
283289 }
284290
285291 private ConcurrentMessageListenerContainer <?, ?> decorate (ConcurrentMessageListenerContainer <?, ?> listenerContainer ) {
286- setupBackoffAwareMessageListenerAdapter (listenerContainer , this .backOffValues );
287292 listenerContainer
288293 .setCommonErrorHandler (createErrorHandler (
289- ListenerContainerFactoryConfigurer .this .deadLetterPublishingRecovererFactory .create ()));
294+ ListenerContainerFactoryConfigurer .this .deadLetterPublishingRecovererFactory .create (),
295+ this .configuration ));
296+ setupBackoffAwareMessageListenerAdapter (listenerContainer , this .configuration , this .isSetContainerProperties );
290297 return listenerContainer ;
291298 }
292299
0 commit comments