Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.log.LogAccessor;
Expand All @@ -57,6 +58,7 @@
* @author Gary Russell
* @author Hugo Wood
* @author Artem Bilan
* @author Sanghyeok An
*/
public final class KafkaTestUtils {

Expand All @@ -82,6 +84,17 @@ public static Map<String, Object> consumerProps(String group, String autoCommit,
return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}

/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param brokers the bootstrapServers property.
* @param group the group id.
* @return the properties.
* @since 3.3
*/
public static Map<String, Object> consumerProps(String brokers, String group) {
return consumerProps(brokers, group, "false");
}

/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
Expand Down Expand Up @@ -128,6 +141,20 @@ public static Map<String, Object> producerProps(String brokers) {
return props;
}

/**
* Set up test properties for the Kafka Streams.
* @param applicationId the applicationId for the Kafka Streams.
* @param brokers the bootstrapServers property.
* @return the properties.
* @since 3.3
*/
public static Map<String, Object> streamsProps(String applicationId, String brokers) {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return props;
}

/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
* @author Wang ZhiYang
* @author Huijin Hong
* @author Soby Chacko
* @author Sanghyeok An
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, AsyncRepliesAware {

Expand Down Expand Up @@ -421,7 +422,7 @@ else if (this.hasMetadataParameter) {
return this.handlerMethod.invoke(message, data, ack, consumer);
}
}
catch (org.springframework.messaging.converter.MessageConversionException ex) {
catch (MessageConversionException ex) {
throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex));
}
catch (MethodArgumentNotValidException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -49,6 +48,7 @@
import org.springframework.kafka.streams.KafkaStreamsMicrometerListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand All @@ -60,6 +60,7 @@
* @author Nurettin Yilmaz
* @author Artem Bilan
* @author Almog Gavra
* @author Sanghyeok An
*
* @since 2.1.5
*/
Expand Down Expand Up @@ -90,7 +91,7 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf
assertThat(STATE_LISTENER.getCurrentState()).isEqualTo(state);
Properties properties = configuration.asProperties();
assertThat(properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList(config.broker.getBrokersAsString()));
.isEqualTo(config.broker.getBrokersAsString());
assertThat(properties.get(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG))
.isEqualTo(Foo.class);
assertThat(properties.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
Expand Down Expand Up @@ -163,10 +164,8 @@ public void configureTopology(Topology topology) {
@SuppressWarnings("deprecation")
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList(this.broker.getBrokersAsString()));
Map<String, Object> props =
KafkaTestUtils.streamsProps(APPLICATION_ID, this.broker.getBrokersAsString());
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Foo.class);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1000);
return new KafkaStreamsConfiguration(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -67,6 +68,7 @@
* Tests for <a href="https://github.com/spring-projects/spring-kafka/issues/1828">...</a>
*
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 3.2
*/
Expand Down Expand Up @@ -284,9 +286,7 @@ static class KafkaProducerConfig {

@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -319,13 +319,8 @@ KafkaAdmin kafkaAdmin() {

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -67,6 +68,7 @@
/**
* Tests for https://github.com/spring-projects/spring-kafka/issues/1828
* @author Deepesh Verma
* @author Sanghyeok An
* @since 2.7
*/
@SpringJUnitConfig
Expand Down Expand Up @@ -276,9 +278,7 @@ public static class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -311,13 +311,8 @@ public KafkaAdmin kafkaAdmin() {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -66,6 +67,7 @@
* Test class level non-blocking retries.
*
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 3.2
*/
Expand Down Expand Up @@ -417,9 +419,7 @@ static class KafkaProducerConfig {

@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -452,13 +452,8 @@ KafkaAdmin kafkaAdmin() {

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.GenericMessageConverter;
Expand All @@ -94,6 +95,7 @@
*
* @author Wang Zhiyang
* @author Artem Bilan
* @author Sanghyeok An
*
* @since 3.2
*/
Expand Down Expand Up @@ -749,9 +751,7 @@ static class KafkaProducerConfig {

@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -794,13 +794,8 @@ NewTopics topics() {

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -66,6 +67,7 @@
/**
* @author Tomaz Fernandes
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 2.8.4
*/
Expand Down Expand Up @@ -431,9 +433,7 @@ public static class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -466,13 +466,8 @@ public KafkaAdmin kafkaAdmin() {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.GenericMessageConverter;
Expand All @@ -94,6 +95,7 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Wang Zhiyang
* @author Sanghyeok An
* @since 2.7
*/
@SpringJUnitConfig
Expand Down Expand Up @@ -776,9 +778,7 @@ public static class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -821,13 +821,8 @@ public NewTopics topics() {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Loading