Skip to content

Commit 63b2bcb

Browse files
pszymczykgaryrussell
authored andcommitted
Configurable KafkaStreams cleanup execution
542 few minor fixes Docs, Javadocs
1 parent 2526509 commit 63b2bcb

File tree

4 files changed

+190
-1
lines changed

4 files changed

+190
-1
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import org.apache.kafka.streams.KafkaStreams;
20+
21+
/**
22+
* Specifies time of {@link KafkaStreams#cleanUp()} execution.
23+
*
24+
* @author Pawel Szymczyk
25+
*/
26+
public class CleanupConfig {
27+
28+
private final boolean onStart;
29+
private final boolean onStop;
30+
31+
public CleanupConfig() {
32+
this(false, true);
33+
}
34+
35+
public CleanupConfig(boolean onStart, boolean onStop) {
36+
this.onStart = onStart;
37+
this.onStop = onStop;
38+
}
39+
40+
public boolean cleanupOnStart() {
41+
return this.onStart;
42+
}
43+
44+
public boolean cleanupOnStop() {
45+
return this.onStop;
46+
}
47+
}

spring-kafka/src/main/java/org/springframework/kafka/core/StreamsBuilderFactoryBean.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
4444
private static final int DEFAULT_CLOSE_TIMEOUT = 10;
4545

4646
private final StreamsConfig streamsConfig;
47+
private final CleanupConfig cleanupConfig;
4748

4849
private KafkaStreams kafkaStreams;
4950

@@ -61,14 +62,46 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
6162

6263
private volatile boolean running;
6364

65+
/**
66+
* Construct an instance with the supplied streams configuration.
67+
* @param streamsConfig the streams configuration.
68+
*/
6469
public StreamsBuilderFactoryBean(StreamsConfig streamsConfig) {
70+
this(streamsConfig, new CleanupConfig());
71+
}
72+
73+
/**
74+
* Construct an instance with the supplied streams configuration and
75+
* clean up configuration.
76+
* @param streamsConfig the streams configuration.
77+
* @param cleanupConfig the cleanup configuration.
78+
* @since 2.1.2.
79+
*/
80+
public StreamsBuilderFactoryBean(StreamsConfig streamsConfig, CleanupConfig cleanupConfig) {
6581
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
6682
this.streamsConfig = streamsConfig;
83+
this.cleanupConfig = cleanupConfig;
6784
}
6885

86+
/**
87+
* Construct an instance with the supplied streams configuration.
88+
* @param streamsConfig the streams configuration.
89+
*/
6990
public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig) {
91+
this(streamsConfig, new CleanupConfig());
92+
}
93+
94+
/**
95+
* Construct an instance with the supplied streams configuration and
96+
* clean up configuration.
97+
* @param streamsConfig the streams configuration.
98+
* @param cleanupConfig the cleanup configuration.
99+
* @since 2.1.2.
100+
*/
101+
public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig, CleanupConfig cleanupConfig) {
70102
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
71103
this.streamsConfig = new StreamsConfig(streamsConfig);
104+
this.cleanupConfig = cleanupConfig;
72105
}
73106

74107
public void setClientSupplier(KafkaClientSupplier clientSupplier) {
@@ -133,6 +166,9 @@ public synchronized void start() {
133166
this.kafkaStreams = new KafkaStreams(getObject().build(), this.streamsConfig, this.clientSupplier);
134167
this.kafkaStreams.setStateListener(this.stateListener);
135168
this.kafkaStreams.setUncaughtExceptionHandler(this.exceptionHandler);
169+
if (this.cleanupConfig.cleanupOnStart()) {
170+
this.kafkaStreams.cleanUp();
171+
}
136172
this.kafkaStreams.start();
137173
this.running = true;
138174
}
@@ -148,7 +184,9 @@ public synchronized void stop() {
148184
try {
149185
if (this.kafkaStreams != null) {
150186
this.kafkaStreams.close(this.closeTimeout, TimeUnit.SECONDS);
151-
this.kafkaStreams.cleanUp();
187+
if (this.cleanupConfig.cleanupOnStop()) {
188+
this.kafkaStreams.cleanUp();
189+
}
152190
this.kafkaStreams = null;
153191
}
154192
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.io.IOException;
22+
import java.nio.file.Files;
23+
import java.nio.file.Path;
24+
import java.nio.file.Paths;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
import org.apache.kafka.streams.StreamsConfig;
29+
import org.junit.BeforeClass;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.beans.factory.annotation.Value;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.kafka.annotation.EnableKafka;
38+
import org.springframework.kafka.annotation.EnableKafkaStreams;
39+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
40+
import org.springframework.kafka.test.context.EmbeddedKafka;
41+
import org.springframework.kafka.test.rule.KafkaEmbedded;
42+
import org.springframework.test.annotation.DirtiesContext;
43+
import org.springframework.test.context.junit4.SpringRunner;
44+
45+
/**
46+
* @author Pawel Szymczyk
47+
*/
48+
@RunWith(SpringRunner.class)
49+
@DirtiesContext
50+
@EmbeddedKafka
51+
public class StreamsBuilderFactoryBeanTest {
52+
53+
private static final String APPLICATION_ID = "testCleanupStreams";
54+
55+
private static Path stateStoreDir;
56+
57+
@BeforeClass
58+
public static void setup() throws IOException {
59+
stateStoreDir = Files.createTempDirectory("test-state-dir");
60+
}
61+
62+
@Autowired
63+
private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
64+
65+
@Test
66+
public void testCleanupStreams() throws IOException {
67+
Path stateStore = Files.createDirectory(Paths.get(stateStoreDir.toString(), APPLICATION_ID, "0_0"));
68+
assertThat(stateStore).exists();
69+
streamsBuilderFactoryBean.stop();
70+
assertThat(stateStore).doesNotExist();
71+
72+
stateStore = Files.createDirectory(Paths.get(stateStoreDir.toString(), APPLICATION_ID, "0_0"));
73+
assertThat(stateStore).exists();
74+
streamsBuilderFactoryBean.start();
75+
assertThat(stateStore).doesNotExist();
76+
}
77+
78+
@Configuration
79+
@EnableKafka
80+
@EnableKafkaStreams
81+
public static class KafkaStreamsConfiguration {
82+
83+
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
84+
private String brokerAddresses;
85+
86+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
87+
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() throws IOException {
88+
return new StreamsBuilderFactoryBean(kStreamsConfigs(), new CleanupConfig(true, true));
89+
}
90+
91+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
92+
public StreamsConfig kStreamsConfigs() throws IOException {
93+
Map<String, Object> props = new HashMap<>();
94+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
95+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
96+
props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString());
97+
return new StreamsConfig(props);
98+
}
99+
}
100+
101+
}

src/reference/asciidoc/streams.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ Only you need is to declare `StreamsConfig` bean with the `defaultKafkaStreamsCo
119119
A `StreamsBuilder` bean with the `defaultKafkaStreamsBuilder` name will be declare in the application context automatically.
120120
Any additional `StreamsBuilderFactoryBean` beans can be declared and used as well.
121121

122+
By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
123+
Starting with _version 2.1.2_, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to allow you to control whether the `cleanUp()` method is called during `start()`, `stop()`, or neither.
124+
122125
==== Kafka Streams Example
123126

124127
Putting it all together:

0 commit comments

Comments
 (0)