Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.stereotype.Component;
import org.springframework.test.annotation.DirtiesContext;
Expand All @@ -58,12 +58,14 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 2.2
*
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = "alias.tests")
public class AliasPropertiesTests {

@Autowired
Expand Down Expand Up @@ -109,6 +111,9 @@ public void testAliasFor() throws Exception {
@EnableKafka
public static class Config {

@Autowired
EmbeddedKafkaBroker broker;

final CountDownLatch latch = new CountDownLatch(1);

static AtomicBoolean orderedCalledFirst = new AtomicBoolean(true);
Expand Down Expand Up @@ -137,11 +142,6 @@ public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry();
}

@Bean
public EmbeddedKafkaBroker embeddedKafka() {
return new EmbeddedKafkaKraftBroker(1, 1, "alias.tests");
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
Expand All @@ -158,7 +158,7 @@ public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka());
KafkaTestUtils.consumerProps(this.broker.getBrokersAsString(), "myAliasGroup", "false");
return consumerProps;
}

Expand All @@ -174,7 +174,7 @@ public ProducerFactory<Integer, String> producerFactory() {

@Bean
public Map<String, Object> producerConfigs() {
return KafkaTestUtils.producerProps(embeddedKafka());
return KafkaTestUtils.producerProps(this.broker.getBrokersAsString());
}

@MyListener(id = "onMethodInConfigClass", value = "alias.tests")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
Expand All @@ -62,14 +62,16 @@

/**
* @author Gary Russell
* @author Sanghyeok An
* @since 2.2
*
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = ErrorHandlingDeserializerTests.TOPIC)
public class ErrorHandlingDeserializerTests {

private static final String TOPIC = "ehdt";
static final String TOPIC = "ehdt";

@Autowired
public Config config;
Expand Down Expand Up @@ -180,6 +182,9 @@ public boolean supports(Class<?> clazz) {
@EnableKafka
public static class Config {

@Autowired
EmbeddedKafkaBroker broker;

private final CountDownLatch latch = new CountDownLatch(6);

private final AtomicInteger goodCount = new AtomicInteger();
Expand All @@ -202,11 +207,6 @@ public void listen2(ConsumerRecord<String, String> record) {
this.latch.countDown();
}

@Bean
public EmbeddedKafkaBroker embeddedKafka() {
return new EmbeddedKafkaKraftBroker(1, 1, TOPIC);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
return factory(cf());
Expand Down Expand Up @@ -245,7 +245,7 @@ else if (r.key() == null && t.getCause() instanceof DeserializationException) {

@Bean
public ConsumerFactory<String, String> cf() {
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka());
Map<String, Object> props = KafkaTestUtils.consumerProps(this.broker.getBrokersAsString(), TOPIC + ".g1", "false");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ExtendedEHD.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class);
Expand All @@ -255,15 +255,15 @@ public ConsumerFactory<String, String> cf() {

@Bean
public ConsumerFactory<String, String> cfWithExplicitDeserializers() {
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g2", "false", embeddedKafka());
Map<String, Object> props = KafkaTestUtils.consumerProps(this.broker.getBrokersAsString(), TOPIC + ".g2", "false");
return new DefaultKafkaConsumerFactory<>(props,
new ErrorHandlingDeserializer<String>(new FailSometimesDeserializer()).keyDeserializer(true),
new ErrorHandlingDeserializer<String>(new FailSometimesDeserializer()));
}

@Bean
public ProducerFactory<String, String> pf() {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka());
Map<String, Object> props = KafkaTestUtils.producerProps(broker.getBrokersAsString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
Expand Down