diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java index b5b5d2c7c8..c4902dce2c 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; import org.springframework.beans.DirectFieldAccessor; import org.springframework.core.log.LogAccessor; @@ -57,6 +58,7 @@ * @author Gary Russell * @author Hugo Wood * @author Artem Bilan + * @author Sanghyeok An */ public final class KafkaTestUtils { @@ -82,6 +84,17 @@ public static Map consumerProps(String group, String autoCommit, return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit); } + /** + * Set up test properties for an {@code } consumer. + * @param brokers the bootstrapServers property. + * @param group the group id. + * @return the properties. + * @since 3.3 + */ + public static Map consumerProps(String brokers, String group) { + return consumerProps(brokers, group, "false"); + } + /** * Set up test properties for an {@code } producer. * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance. @@ -128,6 +141,20 @@ public static Map producerProps(String brokers) { return props; } + /** + * Set up test properties for the Kafka Streams. + * @param applicationId the applicationId for the Kafka Streams. + * @param brokers the bootstrapServers property. + * @return the properties. + * @since 3.3 + */ + public static Map streamsProps(String applicationId, String brokers) { + Map props = new HashMap<>(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + return props; + } + /** * Poll the consumer, expecting a single record for the specified topic. * @param consumer the consumer. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 9ae1567e67..f92e5b823f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -91,6 +91,7 @@ * @author Wang ZhiYang * @author Huijin Hong * @author Soby Chacko + * @author Sanghyeok An */ public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, AsyncRepliesAware { @@ -421,7 +422,7 @@ else if (this.hasMetadataParameter) { return this.handlerMethod.invoke(message, data, ack, consumer); } } - catch (org.springframework.messaging.converter.MessageConversionException ex) { + catch (MessageConversionException ex) { throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex)); } catch (MethodArgumentNotValidException ex) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java index 0efdc1d633..c12075ad62 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,6 +48,7 @@ import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -60,6 +60,7 @@ * @author Nurettin Yilmaz * @author Artem Bilan * @author Almog Gavra + * @author Sanghyeok An * * @since 2.1.5 */ @@ -90,7 +91,7 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf assertThat(STATE_LISTENER.getCurrentState()).isEqualTo(state); Properties properties = configuration.asProperties(); assertThat(properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) - .isEqualTo(Collections.singletonList(config.broker.getBrokersAsString())); + .isEqualTo(config.broker.getBrokersAsString()); assertThat(properties.get(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) .isEqualTo(Foo.class); assertThat(properties.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)) @@ -163,10 +164,8 @@ public void configureTopology(Topology topology) { @SuppressWarnings("deprecation") @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, - Collections.singletonList(this.broker.getBrokersAsString())); + Map props = + KafkaTestUtils.streamsProps(APPLICATION_ID, this.broker.getBrokersAsString()); props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Foo.class); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1000); return new KafkaStreamsConfiguration(props); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicClassLevelIntegrationTests.java index d937518ef6..fa3e5d4a58 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicClassLevelIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicClassLevelIntegrationTests.java @@ -53,6 +53,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; import org.springframework.scheduling.TaskScheduler; @@ -67,6 +68,7 @@ * Tests for ... * * @author Wang Zhiyang + * @author Sanghyeok An * * @since 3.2 */ @@ -284,9 +286,7 @@ static class KafkaProducerConfig { @Bean ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map configProps = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -319,13 +319,8 @@ KafkaAdmin kafkaAdmin() { @Bean ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "groupId"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java index 6241ef778e..d7e6bb72b4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java @@ -54,6 +54,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; import org.springframework.scheduling.TaskScheduler; @@ -67,6 +68,7 @@ /** * Tests for https://github.com/spring-projects/spring-kafka/issues/1828 * @author Deepesh Verma + * @author Sanghyeok An * @since 2.7 */ @SpringJUnitConfig @@ -276,9 +278,7 @@ public static class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map configProps = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -311,13 +311,8 @@ public KafkaAdmin kafkaAdmin() { @Bean public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "groupId"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelExceptionRoutingIntegrationTests.java index 6e0eb0eab4..6855b28c5f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelExceptionRoutingIntegrationTests.java @@ -53,6 +53,7 @@ import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; import org.springframework.scheduling.TaskScheduler; @@ -66,6 +67,7 @@ * Test class level non-blocking retries. * * @author Wang Zhiyang + * @author Sanghyeok An * * @since 3.2 */ @@ -417,9 +419,7 @@ static class KafkaProducerConfig { @Bean ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map configProps = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -452,13 +452,8 @@ KafkaAdmin kafkaAdmin() { @Bean ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "groupId"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java index 359f53cb1d..b60ff50b56 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java @@ -75,6 +75,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.converter.GenericMessageConverter; @@ -94,6 +95,7 @@ * * @author Wang Zhiyang * @author Artem Bilan + * @author Sanghyeok An * * @since 3.2 */ @@ -749,9 +751,7 @@ static class KafkaProducerConfig { @Bean ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map configProps = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -794,13 +794,8 @@ NewTopics topics() { @Bean ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "groupId"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index 934e584157..5084ba44e8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -54,6 +54,7 @@ import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; import org.springframework.scheduling.TaskScheduler; @@ -66,6 +67,7 @@ /** * @author Tomaz Fernandes * @author Wang Zhiyang + * @author Sanghyeok An * * @since 2.8.4 */ @@ -431,9 +433,7 @@ public static class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map configProps = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -466,13 +466,8 @@ public KafkaAdmin kafkaAdmin() { @Bean public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "groupId"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index 5dd6de3bd5..be12c30a33 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -76,6 +76,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.converter.GenericMessageConverter; @@ -94,6 +95,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Wang Zhiyang + * @author Sanghyeok An * @since 2.7 */ @SpringJUnitConfig @@ -776,9 +778,7 @@ public static class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map configProps = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -821,13 +821,8 @@ public NewTopics topics() { @Bean public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "groupId"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java index 3917de3b9f..95608a2b28 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -52,6 +51,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Component; @@ -63,6 +63,7 @@ * @author Tomaz Fernandes * @author Cenk Akin * @author Wang Zhiyang + * @author Sanghyeok An * * @since 2.8.3 */ @@ -353,9 +354,7 @@ private ConcurrentKafkaListenerContainerFactory createKafkaListe @Bean public ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map configProps = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -373,13 +372,8 @@ public KafkaTemplate kafkaTemplate() { @Bean public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "groupId"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer1Tests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer1Tests.java index a473fed795..b4dd39b00f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer1Tests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer1Tests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,11 +40,13 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** * @author Gary Russell + * @author Sanghyeok An * @since 2.7 * */ @@ -75,9 +76,7 @@ public static class Config { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "configurer1"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + Map props = KafkaTestUtils.streamsProps("configurer1", this.brokerAddresses); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer2Tests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer2Tests.java index b1e7ffe71a..8a52eea463 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer2Tests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer2Tests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,11 +40,13 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** * @author Gary Russell + * @author Sanghyeok An * @since 2.7 * */ @@ -75,9 +76,7 @@ public static class Config { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "configurer2"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + Map props = KafkaTestUtils.streamsProps("configurer2", this.brokerAddresses); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java index c145d29cd7..c6e5486e89 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsBranchTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -58,6 +57,7 @@ * @author Elliot Kennedy * @author Artem Bilan * @author Ivan Ponomarev + * @author Sanghyeok An * * @since 1.3.3 */ @@ -146,9 +146,7 @@ public Map producerConfigs() { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + Map props = KafkaTestUtils.streamsProps("testStreams", this.brokerAddresses); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); return new KafkaStreamsConfiguration(props); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java index f41ea0b0f7..afcd2829de 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify; import java.lang.reflect.Field; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -79,6 +78,7 @@ /** * @author Soby Chacko + * @author Sanghyeok An * @since 3.2.0 */ @SpringJUnitConfig @@ -248,9 +248,8 @@ public ConsumerFactory consumerFactory() { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "iqs-testStreams"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + Map props = KafkaTestUtils.streamsProps( + "iqs-testStreams", this.brokerAddresses); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java index 46112bfed4..a919ea5981 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -29,7 +28,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Printed; @@ -63,6 +61,7 @@ /** * @author Elliot Kennedy * @author Artem Bilan + * @author Sanghyeok An */ @SpringJUnitConfig @DirtiesContext @@ -204,9 +203,7 @@ public Map producerConfigs() { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + Map props = KafkaTestUtils.streamsProps("testStreams", this.brokerAddresses); return new KafkaStreamsConfiguration(props); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index a58b17e318..58372857ff 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -83,6 +83,7 @@ * @author Gary Russell * @author Elliot Metsger * @author Zach Olauson + * @author Sanghyeok An * * @since 1.1.4 */ @@ -195,9 +196,7 @@ public KafkaTemplate template() { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + Map props = KafkaTestUtils.streamsProps("testStreams", this.brokerAddresses); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java index a01bcba8dd..4ab43aa63d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java @@ -70,6 +70,7 @@ /** * @author Gary Russell * @author Soby Chacko + * @author Sanghyeok An * @since 2.3 * */ @@ -186,9 +187,7 @@ public KafkaTemplate template() { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { - Map props = new HashMap<>(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + Map props = KafkaTestUtils.streamsProps("testStreams", this.brokerAddresses); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, FailSerde.class); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,