Skip to content

Commit bc45df9

Browse files
committed
Fix inconsistent sync on fields in embedded broker
1 parent db1ca60 commit bc45df9

File tree

2 files changed

+153
-2
lines changed

2 files changed

+153
-2
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public void setZkPort(int zkPort) {
260260
* @return the {@link EmbeddedKafkaBroker}.
261261
* @since 2.4
262262
*/
263-
public EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout) {
263+
public synchronized EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout) {
264264
this.zkConnectionTimeout = zkConnectionTimeout;
265265
return this;
266266
}
@@ -271,7 +271,7 @@ public EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout) {
271271
* @return the {@link EmbeddedKafkaBroker}.
272272
* @since 2.4
273273
*/
274-
public EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) {
274+
public synchronized EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) {
275275
this.zkSessionTimeout = zkSessionTimeout;
276276
return this;
277277
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
/**
20+
* @author Gary Russell
21+
* @since 5.2
22+
*
23+
*/
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
import java.util.Map;
27+
28+
import org.apache.kafka.clients.consumer.ConsumerConfig;
29+
import org.apache.kafka.clients.producer.ProducerConfig;
30+
import org.apache.kafka.common.serialization.IntegerDeserializer;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.annotation.EnableKafka;
37+
import org.springframework.kafka.annotation.KafkaListener;
38+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
40+
import org.springframework.kafka.core.ConsumerFactory;
41+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
42+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
43+
import org.springframework.kafka.core.KafkaTemplate;
44+
import org.springframework.kafka.core.ProducerFactory;
45+
import org.springframework.kafka.support.KafkaHeaders;
46+
import org.springframework.kafka.support.serializer.JsonDeserializer;
47+
import org.springframework.kafka.support.serializer.JsonSerializer;
48+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
49+
import org.springframework.kafka.test.context.EmbeddedKafka;
50+
import org.springframework.kafka.test.utils.KafkaTestUtils;
51+
import org.springframework.messaging.handler.annotation.Header;
52+
import org.springframework.test.annotation.DirtiesContext;
53+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
54+
import org.springframework.util.backoff.FixedBackOff;
55+
56+
@SpringJUnitConfig
57+
@EmbeddedKafka(topics = "sr2", controlledShutdown = true, partitions = 1, count = 1)
58+
@DirtiesContext
59+
public class RetryTests {
60+
61+
private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";
62+
63+
private static int MAX_RETRY = 2;
64+
65+
@Autowired
66+
private Config config;
67+
68+
@Autowired
69+
private KafkaTemplate<Integer, String> template;
70+
71+
@Test
72+
public void testStatefulRetry() throws Exception {
73+
this.template.send("sr2", "foo");
74+
Thread.sleep(10000);
75+
assertThat(this.config.listener1().count).isEqualTo(MAX_RETRY + 1);
76+
}
77+
78+
@Configuration
79+
@EnableKafka
80+
public static class Config {
81+
@Autowired
82+
private EmbeddedKafkaBroker embeddedKafkaBroker;
83+
84+
@Bean
85+
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
86+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
87+
factory.setConsumerFactory(consumerFactory());
88+
factory.setBatchListener(true);
89+
SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
90+
FixedBackOff backOff = new FixedBackOff(500, MAX_RETRY);
91+
errorHandler.setBackOff(backOff);
92+
factory.setBatchErrorHandler(errorHandler);
93+
return factory;
94+
}
95+
96+
@Bean
97+
public ConsumerFactory<Integer, String> consumerFactory() {
98+
return new DefaultKafkaConsumerFactory<>(consumerConfigs(200),
99+
new IntegerDeserializer(),
100+
new JsonDeserializer<>(String.class));
101+
}
102+
103+
private Map<String, Object> consumerConfigs(int maxPollRecord) {
104+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false",
105+
embeddedKafkaBroker);
106+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
107+
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecord);
108+
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
109+
return consumerProps;
110+
}
111+
112+
@Bean
113+
public KafkaTemplate<Integer, String> template() {
114+
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
115+
return kafkaTemplate;
116+
}
117+
118+
@Bean
119+
public ProducerFactory<Integer, String> producerFactory() {
120+
return new DefaultKafkaProducerFactory<>(producerConfigs());
121+
}
122+
123+
@Bean
124+
public Map<String, Object> producerConfigs() {
125+
Map<String, Object> config = KafkaTestUtils.producerProps(embeddedKafkaBroker);
126+
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
127+
return config;
128+
}
129+
130+
@Bean
131+
public Listener listener1() {
132+
return new Listener();
133+
}
134+
135+
}
136+
137+
public static class Listener {
138+
139+
public volatile int count = 0;
140+
141+
@KafkaListener(topics = "sr2", groupId = "sr1")
142+
public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
143+
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
144+
@Header(KafkaHeaders.OFFSET) long offset) {
145+
count++;
146+
throw new RuntimeException("consumer throw exception");
147+
}
148+
149+
}
150+
151+
}

0 commit comments

Comments
 (0)