5151import org .springframework .kafka .support .serializer .ErrorHandlingDeserializer ;
5252import org .springframework .kafka .support .serializer .SerializationUtils ;
5353import org .springframework .kafka .test .EmbeddedKafkaBroker ;
54- import org .springframework .kafka .test .EmbeddedKafkaKraftBroker ;
54+ import org .springframework .kafka .test .context . EmbeddedKafka ;
5555import org .springframework .kafka .test .utils .KafkaTestUtils ;
5656import org .springframework .test .annotation .DirtiesContext ;
5757import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
6262
6363/**
6464 * @author Gary Russell
65+ * @author Sanghyeok An
6566 * @since 2.2
6667 *
6768 */
6869@ SpringJUnitConfig
6970@ DirtiesContext
71+ @ EmbeddedKafka (partitions = 1 , topics = ErrorHandlingDeserializerTests .TOPIC )
7072public class ErrorHandlingDeserializerTests {
7173
72- private static final String TOPIC = "ehdt" ;
74+ static final String TOPIC = "ehdt" ;
7375
7476 @ Autowired
7577 public Config config ;
@@ -180,6 +182,9 @@ public boolean supports(Class<?> clazz) {
180182 @ EnableKafka
181183 public static class Config {
182184
185+ @ Autowired
186+ EmbeddedKafkaBroker broker ;
187+
183188 private final CountDownLatch latch = new CountDownLatch (6 );
184189
185190 private final AtomicInteger goodCount = new AtomicInteger ();
@@ -202,11 +207,6 @@ public void listen2(ConsumerRecord<String, String> record) {
202207 this .latch .countDown ();
203208 }
204209
205- @ Bean
206- public EmbeddedKafkaBroker embeddedKafka () {
207- return new EmbeddedKafkaKraftBroker (1 , 1 , TOPIC );
208- }
209-
210210 @ Bean
211211 public ConcurrentKafkaListenerContainerFactory <String , String > kafkaListenerContainerFactory () {
212212 return factory (cf ());
@@ -245,7 +245,7 @@ else if (r.key() == null && t.getCause() instanceof DeserializationException) {
245245
246246 @ Bean
247247 public ConsumerFactory <String , String > cf () {
248- Map <String , Object > props = KafkaTestUtils .consumerProps (TOPIC + ".g1" , "false" , embeddedKafka () );
248+ Map <String , Object > props = KafkaTestUtils .consumerProps (this . broker . getBrokersAsString (), TOPIC + ".g1" , "false" );
249249 props .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , ExtendedEHD .class .getName ());
250250 props .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , ErrorHandlingDeserializer .class );
251251 props .put (ErrorHandlingDeserializer .KEY_DESERIALIZER_CLASS , FailSometimesDeserializer .class );
@@ -255,15 +255,15 @@ public ConsumerFactory<String, String> cf() {
255255
256256 @ Bean
257257 public ConsumerFactory <String , String > cfWithExplicitDeserializers () {
258- Map <String , Object > props = KafkaTestUtils .consumerProps (TOPIC + ".g2" , "false" , embeddedKafka () );
258+ Map <String , Object > props = KafkaTestUtils .consumerProps (this . broker . getBrokersAsString (), TOPIC + ".g2" , "false" );
259259 return new DefaultKafkaConsumerFactory <>(props ,
260260 new ErrorHandlingDeserializer <String >(new FailSometimesDeserializer ()).keyDeserializer (true ),
261261 new ErrorHandlingDeserializer <String >(new FailSometimesDeserializer ()));
262262 }
263263
264264 @ Bean
265265 public ProducerFactory <String , String > pf () {
266- Map <String , Object > props = KafkaTestUtils .producerProps (embeddedKafka ());
266+ Map <String , Object > props = KafkaTestUtils .producerProps (broker . getBrokersAsString ());
267267 props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class );
268268 return new DefaultKafkaProducerFactory <>(props );
269269 }
0 commit comments