1717package org .springframework .kafka .annotation ;
1818
1919import static org .assertj .core .api .Assertions .assertThat ;
20+ import static org .mockito .Mockito .never ;
21+ import static org .mockito .Mockito .spy ;
22+ import static org .mockito .Mockito .verify ;
2023
2124import java .util .ArrayList ;
2225import java .util .Collection ;
2831import java .util .concurrent .TimeUnit ;
2932import java .util .stream .Collectors ;
3033
34+ import org .apache .kafka .clients .admin .AdminClientConfig ;
3135import org .apache .kafka .clients .consumer .ConsumerConfig ;
3236import org .apache .kafka .common .serialization .ByteArraySerializer ;
3337import org .apache .kafka .common .serialization .BytesDeserializer ;
4448import org .springframework .kafka .config .KafkaListenerContainerFactory ;
4549import org .springframework .kafka .core .DefaultKafkaConsumerFactory ;
4650import org .springframework .kafka .core .DefaultKafkaProducerFactory ;
51+ import org .springframework .kafka .core .KafkaAdmin ;
4752import org .springframework .kafka .core .KafkaTemplate ;
4853import org .springframework .kafka .core .ProducerFactory ;
4954import org .springframework .kafka .listener .BatchListenerFailedException ;
@@ -110,7 +115,7 @@ private void doTest(Listener listener, String topic) throws InterruptedException
110115 }
111116
112117 @ Test
113- public void testBatchOfPojoMessages () throws Exception {
118+ public void testBatchOfPojoMessages (@ Autowired KafkaAdmin admin ) throws Exception {
114119 String topic = "blc3" ;
115120 this .template .send (new GenericMessage <>(
116121 new Foo ("bar" ), Collections .singletonMap (KafkaHeaders .TOPIC , topic )));
@@ -119,6 +124,7 @@ public void testBatchOfPojoMessages() throws Exception {
119124 assertThat (listener .received .size ()).isGreaterThan (0 );
120125 assertThat (listener .received .get (0 ).getPayload ()).isInstanceOf (Foo .class );
121126 assertThat (listener .received .get (0 ).getPayload ().getBar ()).isEqualTo ("bar" );
127+ verify (admin , never ()).clusterId ();
122128 }
123129
124130 @ Test
@@ -152,6 +158,11 @@ void conversionError() throws InterruptedException {
152158 @ EnableKafka
153159 public static class Config {
154160
161+ @ Bean
162+ KafkaAdmin admin (EmbeddedKafkaBroker broker ) {
163+ return spy (new KafkaAdmin (Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , broker .getBrokersAsString ())));
164+ }
165+
155166 @ Bean
156167 public KafkaListenerContainerFactory <?> kafkaListenerContainerFactory (EmbeddedKafkaBroker embeddedKafka ,
157168 KafkaTemplate <Integer , Object > template ) {
0 commit comments