1717package org .springframework .kafka .retrytopic ;
1818
1919import java .lang .reflect .Method ;
20- import java .util .Arrays ;
2120import java .util .Collection ;
2221import java .util .function .Consumer ;
23- import java .util .function .Function ;
24- import java .util .stream .Collectors ;
2522
2623import org .apache .commons .logging .LogFactory ;
2724import org .apache .kafka .clients .admin .NewTopic ;
3734import org .springframework .kafka .config .MethodKafkaListenerEndpoint ;
3835import org .springframework .kafka .config .MultiMethodKafkaListenerEndpoint ;
3936import org .springframework .kafka .listener .ListenerUtils ;
40- import org .springframework .kafka .retrytopic .RetryTopicNamesProviderFactory .RetryTopicNamesProvider ;
41- import org .springframework .kafka .support .TopicPartitionOffset ;
4237import org .springframework .lang .Nullable ;
4338
4439
@@ -284,7 +279,7 @@ private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
284279 String defaultContainerFactoryBeanName ) {
285280 this .destinationTopicProcessor
286281 .processDestinationTopicProperties (destinationTopicProperties ->
287- processAndRegisterEndpoints (mainEndpoint ,
282+ processAndRegisterEndpoint (mainEndpoint ,
288283 endpointProcessor ,
289284 factory ,
290285 defaultContainerFactoryBeanName ,
@@ -295,7 +290,7 @@ private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
295290 context );
296291 }
297292
298- private void processAndRegisterEndpoints (MethodKafkaListenerEndpoint <?, ?> mainEndpoint , EndpointProcessor endpointProcessor ,
293+ private void processAndRegisterEndpoint (MethodKafkaListenerEndpoint <?, ?> mainEndpoint , EndpointProcessor endpointProcessor ,
299294 KafkaListenerContainerFactory <?> factory ,
300295 String defaultFactoryBeanName ,
301296 KafkaListenerEndpointRegistrar registrar ,
@@ -321,14 +316,14 @@ private void processAndRegisterEndpoints(MethodKafkaListenerEndpoint<?, ?> mainE
321316 .forEach (topicNamesHolder ->
322317 this .destinationTopicProcessor
323318 .registerDestinationTopic (topicNamesHolder .getMainTopic (),
324- topicNamesHolder .getProcessedTopic (),
319+ topicNamesHolder .getCustomizedTopic (),
325320 destinationTopicProperties , context ));
326321
327322 registrar .registerEndpoint (endpoint , resolvedFactory );
328323 endpoint .setBeanFactory (this .beanFactory );
329324 }
330325
331- private EndpointHandlerMethod getEndpointHandlerMethod (MethodKafkaListenerEndpoint <?, ?> mainEndpoint ,
326+ protected EndpointHandlerMethod getEndpointHandlerMethod (MethodKafkaListenerEndpoint <?, ?> mainEndpoint ,
332327 RetryTopicConfiguration configuration ,
333328 DestinationTopic .Properties props ) {
334329 EndpointHandlerMethod dltHandlerMethod = configuration .getDltHandlerMethod ();
@@ -343,15 +338,15 @@ private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfigur
343338 : topics -> { };
344339 }
345340
346- private void createNewTopicBeans (Collection <String > topics , RetryTopicConfiguration .TopicCreation config ) {
341+ protected void createNewTopicBeans (Collection <String > topics , RetryTopicConfiguration .TopicCreation config ) {
347342 topics .forEach (topic ->
348343 ((DefaultListableBeanFactory ) this .beanFactory )
349344 .registerSingleton (topic + "-topicRegistrationBean" ,
350345 new NewTopic (topic , config .getNumPartitions (), config .getReplicationFactor ()))
351346 );
352347 }
353348
354- private EndpointCustomizer createEndpointCustomizer (
349+ protected EndpointCustomizer createEndpointCustomizer (
355350 EndpointHandlerMethod endpointBeanMethod , DestinationTopic .Properties destinationTopicProperties ) {
356351
357352 return new EndpointCustomizerFactory (destinationTopicProperties ,
@@ -407,99 +402,6 @@ default void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {
407402 }
408403 }
409404
410- private interface EndpointCustomizer extends Function <MethodKafkaListenerEndpoint <?, ?>, Collection <TopicNamesHolder >> {
411- default Collection <TopicNamesHolder > customizeEndpointAndCollectTopics (MethodKafkaListenerEndpoint <?, ?> listenerEndpoint ) {
412- return apply (listenerEndpoint );
413- }
414- }
415-
416-
417- static final class EndpointCustomizerFactory {
418-
419- private final DestinationTopic .Properties destinationProperties ;
420-
421- private final EndpointHandlerMethod beanMethod ;
422-
423- private final BeanFactory beanFactory ;
424-
425- private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory ;
426-
427- EndpointCustomizerFactory (DestinationTopic .Properties destinationProperties , EndpointHandlerMethod beanMethod ,
428- BeanFactory beanFactory , RetryTopicNamesProviderFactory retryTopicNamesProviderFactory ) {
429-
430- this .destinationProperties = destinationProperties ;
431- this .beanMethod = beanMethod ;
432- this .beanFactory = beanFactory ;
433- this .retryTopicNamesProviderFactory = retryTopicNamesProviderFactory ;
434- }
435-
436- public EndpointCustomizer createEndpointCustomizer () {
437- return addSuffixesAndMethod (this .destinationProperties , this .beanMethod .resolveBean (this .beanFactory ),
438- this .beanMethod .getMethod ());
439- }
440-
441- private EndpointCustomizer addSuffixesAndMethod (DestinationTopic .Properties properties , Object bean , Method method ) {
442- RetryTopicNamesProvider namesProvider = this .retryTopicNamesProviderFactory .createRetryTopicNamesProvider (properties );
443- return endpoint -> {
444- Collection <TopicNamesHolder > topics = customizeAndRegisterTopics (namesProvider , endpoint );
445- endpoint .setId (namesProvider .getEndpointId (endpoint ));
446- endpoint .setGroupId (namesProvider .getGroupId (endpoint ));
447- endpoint .setTopics (topics .stream ().map (TopicNamesHolder ::getProcessedTopic ).toArray (String []::new ));
448- endpoint .setClientIdPrefix (namesProvider .getClientIdPrefix (endpoint ));
449- endpoint .setGroup (namesProvider .getGroup (endpoint ));
450- endpoint .setBean (bean );
451- endpoint .setMethod (method );
452- return topics ;
453- };
454- }
455-
456- private Collection <TopicNamesHolder > customizeAndRegisterTopics (RetryTopicNamesProvider namesProvider ,
457- MethodKafkaListenerEndpoint <?, ?> endpoint ) {
458-
459- return getTopics (endpoint )
460- .stream ()
461- .map (topic -> new TopicNamesHolder (topic , namesProvider .getTopicName (topic )))
462- .collect (Collectors .toList ());
463- }
464-
465- private Collection <String > getTopics (MethodKafkaListenerEndpoint <?, ?> endpoint ) {
466- Collection <String > topics = endpoint .getTopics ();
467- if (topics .isEmpty ()) {
468- TopicPartitionOffset [] topicPartitionsToAssign = endpoint .getTopicPartitionsToAssign ();
469- if (topicPartitionsToAssign != null && topicPartitionsToAssign .length > 0 ) {
470- topics = Arrays .stream (topicPartitionsToAssign )
471- .map (TopicPartitionOffset ::getTopic )
472- .collect (Collectors .toList ());
473- }
474- }
475-
476- if (topics .isEmpty ()) {
477- throw new IllegalStateException ("No topics where provided for RetryTopicConfiguration." );
478- }
479- return topics ;
480- }
481- }
482-
483- private static final class TopicNamesHolder {
484-
485- private final String mainTopic ;
486-
487- private final String processedTopic ;
488-
489- TopicNamesHolder (String mainTopic , String processedTopic ) {
490- this .mainTopic = mainTopic ;
491- this .processedTopic = processedTopic ;
492- }
493-
494- String getMainTopic () {
495- return this .mainTopic ;
496- }
497-
498- String getProcessedTopic () {
499- return this .processedTopic ;
500- }
501- }
502-
503405 static class LoggingDltListenerHandlerMethod {
504406
505407 public static final String DEFAULT_DLT_METHOD_NAME = "logMessage" ;
0 commit comments