Skip to content

Commit 8347b95

Browse files
garyrussellartembilan
authored andcommitted
Support switching bootstrap servers at runtime
1 parent 62c5692 commit 8347b95

File tree

10 files changed

+263
-19
lines changed

10 files changed

+263
-19
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.function.Supplier;
21+
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* A {@link Supplier} for bootstrap servers that can toggle between 2 lists of servers.
26+
*
27+
* @author Gary Russell
28+
* @since 2.5
29+
*
30+
*/
31+
public class ABSwitchCluster implements Supplier<String> {
32+
33+
private final AtomicBoolean which = new AtomicBoolean(true);
34+
35+
private final String primary;
36+
37+
private final String secondary;
38+
39+
/**
40+
* Construct an instance with primary and secondary bootstrap servers.
41+
* @param primary the primary.
42+
* @param secondary the secondary.
43+
*/
44+
public ABSwitchCluster(String primary, String secondary) {
45+
Assert.hasText(primary, "'primary' is required");
46+
Assert.hasText(secondary, "'secondary' is required");
47+
this.primary = primary;
48+
this.secondary = secondary;
49+
}
50+
51+
@Override
52+
public String get() {
53+
return this.which.get() ? this.primary : this.secondary;
54+
}
55+
56+
/**
57+
* Get whether or not the primary cluster is active.
58+
* @return true for primary, false for secondary.
59+
*/
60+
public boolean isPrimary() {
61+
return this.which.get();
62+
}
63+
64+
/**
65+
* Use the primary cluster.
66+
*/
67+
public void primary() {
68+
this.which.set(true);
69+
}
70+
71+
/**
72+
* Use the secondary cluster.
73+
*/
74+
public void secondary() {
75+
this.which.set(false);
76+
}
77+
78+
}

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@
6969
* @author Artem Bilan
7070
* @author Chris Gilbert
7171
*/
72-
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V>, BeanNameAware {
72+
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
73+
implements ConsumerFactory<K, V>, BeanNameAware {
7374

7475
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));
7576

@@ -144,7 +145,9 @@ public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
144145

145146
@Override
146147
public Map<String, Object> getConfigurationProperties() {
147-
return Collections.unmodifiableMap(this.configs);
148+
Map<String, Object> configs2 = new HashMap<>(this.configs);
149+
checkBootstrap(configs2);
150+
return Collections.unmodifiableMap(configs2);
148151
}
149152

150153
@Override
@@ -236,7 +239,7 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
236239
if (groupId == null
237240
&& (properties == null || properties.stringPropertyNames().size() == 0)
238241
&& !shouldModifyClientId) {
239-
return createKafkaConsumer(this.configs);
242+
return createKafkaConsumer(new HashMap<>(this.configs));
240243
}
241244
else {
242245
return createConsumerWithAdjustedProperties(groupId, clientIdPrefix, properties, overrideClientIdPrefix,
@@ -279,6 +282,7 @@ private void checkForUnsupportedProps(Properties properties) {
279282

280283
@SuppressWarnings("resource")
281284
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
285+
checkBootstrap(configProps);
282286
Consumer<K, V> kafkaConsumer = createRawConsumer(configProps);
283287

284288
if (this.listeners.size() > 0) {

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@
108108
* @author Artem Bilan
109109
* @author Chris Gilbert
110110
*/
111-
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, ApplicationContextAware,
111+
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
112+
implements ProducerFactory<K, V>, ApplicationContextAware,
112113
BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {
113114

114115
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaProducerFactory.class));
@@ -329,7 +330,9 @@ public Supplier<Serializer<V>> getValueSerializerSupplier() {
329330
*/
330331
@Override
331332
public Map<String, Object> getConfigurationProperties() {
332-
return Collections.unmodifiableMap(this.configs);
333+
Map<String, Object> configs2 = new HashMap<>(this.configs);
334+
checkBootstrap(configs2);
335+
return Collections.unmodifiableMap(configs2);
333336
}
334337

335338
/**
@@ -507,15 +510,17 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
507510
* @return the producer.
508511
*/
509512
protected Producer<K, V> createKafkaProducer() {
513+
Map<String, Object> newConfigs;
510514
if (this.clientIdPrefix == null) {
511-
return createRawProducer(this.configs);
515+
newConfigs = new HashMap<>(this.configs);
512516
}
513517
else {
514-
Map<String, Object> newConfigs = new HashMap<>(this.configs);
518+
newConfigs = new HashMap<>(this.configs);
515519
newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
516520
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
517-
return createRawProducer(newConfigs);
518521
}
522+
checkBootstrap(newConfigs);
523+
return createRawProducer(newConfigs);
519524
}
520525

521526
protected Producer<K, V> createTransactionalProducerForPartition() {
@@ -630,6 +635,7 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
630635
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
631636
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
632637
}
638+
checkBootstrap(newProducerConfigs);
633639
newProducer = createRawProducer(newProducerConfigs);
634640
newProducer.initTransactions();
635641
CloseSafeProducer<K, V> closeSafeProducer =

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@
5656
*
5757
* @since 1.3
5858
*/
59-
public class KafkaAdmin implements ApplicationContextAware, SmartInitializingSingleton {
59+
public class KafkaAdmin extends KafkaResourceFactory implements ApplicationContextAware, SmartInitializingSingleton {
6060

6161
/**
6262
* The default close timeout duration as 10 seconds.
@@ -67,7 +67,7 @@ public class KafkaAdmin implements ApplicationContextAware, SmartInitializingSin
6767

6868
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaAdmin.class));
6969

70-
private final Map<String, Object> config;
70+
private final Map<String, Object> configs;
7171

7272
private ApplicationContext applicationContext;
7373

@@ -87,7 +87,7 @@ public class KafkaAdmin implements ApplicationContextAware, SmartInitializingSin
8787
* @param config the configuration for the {@link AdminClient}.
8888
*/
8989
public KafkaAdmin(Map<String, Object> config) {
90-
this.config = new HashMap<>(config);
90+
this.configs = new HashMap<>(config);
9191
}
9292

9393
@Override
@@ -129,12 +129,25 @@ public void setAutoCreate(boolean autoCreate) {
129129
this.autoCreate = autoCreate;
130130
}
131131

132+
132133
/**
133134
* Get an unmodifiable copy of this admin's configuration.
134135
* @return the configuration map.
136+
* @deprecated in favor of {@link #getConfigurationProperties()}.
135137
*/
138+
@Deprecated
136139
public Map<String, Object> getConfig() {
137-
return Collections.unmodifiableMap(this.config);
140+
return getConfigurationProperties();
141+
}
142+
143+
/**
144+
* Get an unmodifiable copy of this admin's configuration.
145+
* @return the configuration map.
146+
*/
147+
public Map<String, Object> getConfigurationProperties() {
148+
Map<String, Object> configs2 = new HashMap<>(this.configs);
149+
checkBootstrap(configs2);
150+
return Collections.unmodifiableMap(configs2);
138151
}
139152

140153
@Override
@@ -159,7 +172,9 @@ public final boolean initialize() {
159172
if (newTopics.size() > 0) {
160173
AdminClient adminClient = null;
161174
try {
162-
adminClient = AdminClient.create(this.config);
175+
Map<String, Object> configs2 = new HashMap<>(this.configs);
176+
checkBootstrap(configs2);
177+
adminClient = AdminClient.create(configs2);
163178
}
164179
catch (Exception e) {
165180
if (!this.initializingContext || this.fatalIfBrokerNotAvailable) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.util.Map;
20+
import java.util.function.Supplier;
21+
22+
import org.apache.kafka.clients.CommonClientConfigs;
23+
24+
import org.springframework.lang.Nullable;
25+
26+
/**
27+
* Base class for consumer/producer/admin creators.
28+
*
29+
* @author Gary Russell
30+
* @since 2.5
31+
*
32+
*/
33+
public abstract class KafkaResourceFactory {
34+
35+
private Supplier<String> bootstrapServersSupplier;
36+
37+
@Nullable
38+
protected String getBootstrapServers() {
39+
return this.bootstrapServersSupplier == null ? null : this.bootstrapServersSupplier.get();
40+
}
41+
42+
/**
43+
* Set a supplier for the bootstrap server list to override any configured in a
44+
* subclass.
45+
* @param bootstrapServersSupplier the supplier.
46+
*/
47+
public void setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier) {
48+
this.bootstrapServersSupplier = bootstrapServersSupplier;
49+
}
50+
51+
/**
52+
* Enhance the properties by calling the
53+
* {@link #setBootstrapServersSupplier(Supplier)} amd replace the bootstrap servers
54+
* properties.
55+
* @param configs the configs.
56+
*/
57+
protected void checkBootstrap(Map<String, Object> configs) {
58+
String bootstrapServers = getBootstrapServers();
59+
if (bootstrapServers != null) {
60+
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
61+
}
62+
}
63+
64+
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,25 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
105105
assertThat(configPassedToKafkaConsumer).isEqualTo(originalConfig);
106106
}
107107

108+
@Test
109+
public void testBootstrapServersSupplier() {
110+
Map<String, Object> originalConfig = Collections.emptyMap();
111+
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
112+
DefaultKafkaConsumerFactory<String, String> target =
113+
new DefaultKafkaConsumerFactory<String, String>(originalConfig) {
114+
115+
116+
@Override
117+
protected Consumer<String, String> createRawConsumer(Map<String, Object> configProps) {
118+
configPassedToKafkaConsumer.putAll(configProps);
119+
return null;
120+
}
121+
};
122+
target.setBootstrapServersSupplier(() -> "foo");
123+
target.createConsumer(null, null, null, null);
124+
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo("foo");
125+
}
126+
108127
@Test
109128
public void testPropertyOverridesWhenCreatingConsumer() {
110129
Map<String, Object> originalConfig = Stream

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.apache.kafka.clients.producer.Callback;
4141
import org.apache.kafka.clients.producer.Producer;
42+
import org.apache.kafka.clients.producer.ProducerConfig;
4243
import org.apache.kafka.common.KafkaException;
4344
import org.apache.kafka.common.Metric;
4445
import org.apache.kafka.common.MetricName;
@@ -369,4 +370,29 @@ public void producerRemoved(String id, Producer producer) {
369370
assertThat(removals).hasSize(5);
370371
}
371372

373+
@Test
374+
@SuppressWarnings({ "rawtypes", "unchecked" })
375+
void testBootstrapSupplier() {
376+
final Producer producer = mock(Producer.class);
377+
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
378+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
379+
380+
@Override
381+
protected Producer createRawProducer(Map configs) {
382+
configPassedToKafkaConsumer.clear();
383+
configPassedToKafkaConsumer.putAll(configs);
384+
return producer;
385+
}
386+
387+
};
388+
pf.setBootstrapServersSupplier(() -> "foo");
389+
Producer aProducer = pf.createProducer();
390+
assertThat(configPassedToKafkaConsumer.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo("foo");
391+
pf.setBootstrapServersSupplier(() -> "bar");
392+
pf.setTransactionIdPrefix("tx.");
393+
aProducer = pf.createProducer();
394+
assertThat(configPassedToKafkaConsumer.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo("bar");
395+
pf.destroy();
396+
}
397+
372398
}

0 commit comments

Comments
 (0)