diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java index dd043851cd..dc192bf3e6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java @@ -46,7 +46,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; @@ -58,12 +58,14 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Sanghyeok An * * @since 2.2 * */ @SpringJUnitConfig @DirtiesContext +@EmbeddedKafka(partitions = 1, topics = "alias.tests") public class AliasPropertiesTests { @Autowired @@ -109,6 +111,9 @@ public void testAliasFor() throws Exception { @EnableKafka public static class Config { + @Autowired + EmbeddedKafkaBroker broker; + final CountDownLatch latch = new CountDownLatch(1); static AtomicBoolean orderedCalledFirst = new AtomicBoolean(true); @@ -137,11 +142,6 @@ public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { return new KafkaListenerEndpointRegistry(); } - @Bean - public EmbeddedKafkaBroker embeddedKafka() { - return new EmbeddedKafkaKraftBroker(1, 1, "alias.tests"); - } - @Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = @@ -158,7 +158,7 @@ public DefaultKafkaConsumerFactory consumerFactory() { @Bean public Map consumerConfigs() { Map consumerProps = - KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka()); + KafkaTestUtils.consumerProps(this.broker.getBrokersAsString(), "myAliasGroup", "false"); return consumerProps; } @@ -174,7 +174,7 @@ public ProducerFactory producerFactory() { @Bean public Map producerConfigs() { - return KafkaTestUtils.producerProps(embeddedKafka()); + return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); } @MyListener(id = "onMethodInConfigClass", value = "alias.tests") diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java index b0198e2a84..ebc4409678 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java @@ -51,7 +51,7 @@ import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -62,14 +62,16 @@ /** * @author Gary Russell + * @author Sanghyeok An * @since 2.2 * */ @SpringJUnitConfig @DirtiesContext +@EmbeddedKafka(partitions = 1, topics = ErrorHandlingDeserializerTests.TOPIC) public class ErrorHandlingDeserializerTests { - private static final String TOPIC = "ehdt"; + static final String TOPIC = "ehdt"; @Autowired public Config config; @@ -180,6 +182,9 @@ public boolean supports(Class clazz) { @EnableKafka public static class Config { + @Autowired + EmbeddedKafkaBroker broker; + private final CountDownLatch latch = new CountDownLatch(6); private final AtomicInteger goodCount = new AtomicInteger(); @@ -202,11 +207,6 @@ public void listen2(ConsumerRecord record) { this.latch.countDown(); } - @Bean - public EmbeddedKafkaBroker embeddedKafka() { - return new EmbeddedKafkaKraftBroker(1, 1, TOPIC); - } - @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { return factory(cf()); @@ -245,7 +245,7 @@ else if (r.key() == null && t.getCause() instanceof DeserializationException) { @Bean public ConsumerFactory cf() { - Map props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka()); + Map props = KafkaTestUtils.consumerProps(this.broker.getBrokersAsString(), TOPIC + ".g1", "false"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ExtendedEHD.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class); @@ -255,7 +255,7 @@ public ConsumerFactory cf() { @Bean public ConsumerFactory cfWithExplicitDeserializers() { - Map props = KafkaTestUtils.consumerProps(TOPIC + ".g2", "false", embeddedKafka()); + Map props = KafkaTestUtils.consumerProps(this.broker.getBrokersAsString(), TOPIC + ".g2", "false"); return new DefaultKafkaConsumerFactory<>(props, new ErrorHandlingDeserializer(new FailSometimesDeserializer()).keyDeserializer(true), new ErrorHandlingDeserializer(new FailSometimesDeserializer())); @@ -263,7 +263,7 @@ public ConsumerFactory cfWithExplicitDeserializers() { @Bean public ProducerFactory pf() { - Map props = KafkaTestUtils.producerProps(embeddedKafka()); + Map props = KafkaTestUtils.producerProps(broker.getBrokersAsString()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); }