11/*
2- * Copyright 2016-2017 the original author or authors.
2+ * Copyright 2016-2018 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
2020import static org .mockito .ArgumentMatchers .anyMap ;
2121import static org .mockito .ArgumentMatchers .anyString ;
2222import static org .mockito .BDDMockito .willAnswer ;
23+ import static org .mockito .BDDMockito .willReturn ;
24+ import static org .mockito .BDDMockito .willThrow ;
2325import static org .mockito .Mockito .mock ;
2426import static org .mockito .Mockito .spy ;
2527
5052import org .springframework .context .annotation .Configuration ;
5153import org .springframework .context .event .EventListener ;
5254import org .springframework .context .support .PropertySourcesPlaceholderConfigurer ;
55+ import org .springframework .core .convert .converter .Converter ;
5356import org .springframework .kafka .config .ConcurrentKafkaListenerContainerFactory ;
5457import org .springframework .kafka .config .KafkaListenerContainerFactory ;
5558import org .springframework .kafka .config .KafkaListenerEndpointRegistry ;
104107 * @author Artem Bilan
105108 * @author Dariusz Szablinski
106109 * @author Venil Noronha
110+ * @author Dimitri Penner
107111 */
108112@ ContextConfiguration
109113@ RunWith (SpringJUnit4ClassRunner .class )
@@ -120,7 +124,7 @@ public class EnableKafkaIntegrationTests {
120124 "annotated18" , "annotated19" , "annotated20" , "annotated21" , "annotated21reply" , "annotated22" ,
121125 "annotated22reply" , "annotated23" , "annotated23reply" , "annotated24" , "annotated24reply" ,
122126 "annotated25" , "annotated25reply1" , "annotated25reply2" , "annotated26" , "annotated27" , "annotated28" ,
123- "annotated29" , "annotated30" , "annotated30reply" , "annotated31" );
127+ "annotated29" , "annotated30" , "annotated30reply" , "annotated31" , "annotated32" );
124128
125129// @Rule
126130// public Log4jLevelAdjuster adjuster = new Log4jLevelAdjuster(Level.TRACE,
@@ -159,6 +163,9 @@ public class EnableKafkaIntegrationTests {
159163 @ Autowired
160164 private List <?> quxGroup ;
161165
166+ @ Autowired
167+ private FooConverter fooConverter ;
168+
162169 @ Test
163170 public void testAnonymous () {
164171 MessageListenerContainer container = this .registry
@@ -598,6 +605,25 @@ public void testBadAckConfig() throws Exception {
598605 + "the listener container must have a MANUAL Ackmode to populate the Acknowledgment." );
599606 }
600607
608+ @ Test
609+ public void testConverterBean () throws Exception {
610+ @ SuppressWarnings ("unchecked" )
611+ Converter <String , Foo > converterDelegate = mock (Converter .class );
612+ fooConverter .setDelegate (converterDelegate );
613+
614+ Foo foo = new Foo ();
615+ willReturn (foo ).given (converterDelegate ).convert ("{'bar':'foo'}" );
616+ template .send ("annotated32" , 0 , 1 , "{'bar':'foo'}" );
617+ assertThat (this .listener .latch20 .await (10 , TimeUnit .SECONDS )).isTrue ();
618+ assertThat (this .listener .listen16foo ).isEqualTo (foo );
619+
620+ willThrow (new RuntimeException ()).given (converterDelegate ).convert ("foobar" );
621+ template .send ("annotated32" , 0 , 1 , "foobar" );
622+ assertThat (this .config .listen16ErrorLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
623+ assertThat (this .config .listen16Exception ).isNotNull ();
624+ assertThat (this .config .listen16Message ).isEqualTo ("foobar" );
625+ }
626+
601627 @ Configuration
602628 @ EnableKafka
603629 @ EnableTransactionManagement (proxyTargetClass = true )
@@ -769,6 +795,19 @@ public KafkaListenerContainerFactory<?> batchManualFactory2() {
769795 return factory ;
770796 }
771797
798+ @ Bean
799+ public KafkaListenerContainerFactory <ConcurrentMessageListenerContainer <Integer , String >>
800+ recordAckListenerContainerFactory () {
801+ ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
802+ new ConcurrentKafkaListenerContainerFactory <>();
803+ factory .setConsumerFactory (manualConsumerFactory ("clientIdViaProps4" ));
804+ ContainerProperties props = factory .getContainerProperties ();
805+ props .setAckMode (AckMode .RECORD );
806+ props .setAckOnError (true );
807+ props .setErrorHandler (listen16ErrorHandler ());
808+ return factory ;
809+ }
810+
772811 @ Bean
773812 public DefaultKafkaConsumerFactory <Integer , String > consumerFactory () {
774813 return new DefaultKafkaConsumerFactory <>(consumerConfigs ());
@@ -976,6 +1015,26 @@ public KafkaListenerErrorHandler voidSendToErrorHandler() {
9761015 };
9771016 }
9781017
1018+ private Throwable listen16Exception ;
1019+
1020+ private Object listen16Message ;
1021+
1022+ private CountDownLatch listen16ErrorLatch = new CountDownLatch (1 );
1023+
1024+ @ Bean
1025+ public ConsumerAwareErrorHandler listen16ErrorHandler () {
1026+ return (e , r , c ) -> {
1027+ listen16Exception = e ;
1028+ listen16Message = r .value ();
1029+ listen16ErrorLatch .countDown ();
1030+ };
1031+ }
1032+
1033+ @ Bean
1034+ public FooConverter fooConverter () {
1035+ return new FooConverter ();
1036+ }
1037+
9791038 }
9801039
9811040 static class Listener implements ConsumerSeekAware {
@@ -1020,6 +1079,8 @@ static class Listener implements ConsumerSeekAware {
10201079
10211080 private final CountDownLatch latch19 = new CountDownLatch (1 );
10221081
1082+ private final CountDownLatch latch20 = new CountDownLatch (1 );
1083+
10231084 private final CountDownLatch eventLatch = new CountDownLatch (1 );
10241085
10251086 private volatile Integer partition ;
@@ -1036,6 +1097,8 @@ static class Listener implements ConsumerSeekAware {
10361097
10371098 private Foo foo ;
10381099
1100+ private Foo listen16foo ;
1101+
10391102 private volatile ListenerContainerIdleEvent event ;
10401103
10411104 private volatile List <Integer > keys ;
@@ -1214,6 +1277,13 @@ public void listen15(List<Message<?>> list, Acknowledgment ack) {
12141277 this .latch15 .countDown ();
12151278 }
12161279
1280+ @ KafkaListener (id = "converter" , topics = "annotated32" , containerFactory = "recordAckListenerContainerFactory" ,
1281+ groupId = "converter.explicitGroupId" )
1282+ public void listen16 (Foo foo ) {
1283+ this .listen16foo = foo ;
1284+ this .latch20 .countDown ();
1285+ }
1286+
12171287 @ KafkaListener (id = "errorHandler" , topics = "annotated20" , errorHandler = "consumeException" )
12181288 public String errorHandler (String data ) throws Exception {
12191289 throw new Exception ("return this" );
@@ -1405,4 +1475,22 @@ public boolean filter(ConsumerRecord<Integer, String> consumerRecord) {
14051475
14061476 }
14071477
1478+ public static class FooConverter implements Converter <String , Foo > {
1479+
1480+ private Converter <String , Foo > delegate ;
1481+
1482+ public Converter <String , Foo > getDelegate () {
1483+ return delegate ;
1484+ }
1485+
1486+ public void setDelegate (
1487+ Converter <String , Foo > delegate ) {
1488+ this .delegate = delegate ;
1489+ }
1490+
1491+ @ Override
1492+ public Foo convert (String source ) {
1493+ return delegate .convert (source );
1494+ }
1495+ }
14081496}
0 commit comments