Skip to content

Commit 563f347

Browse files
committed
KAFKA-18894: Add KIP-877 support for ConfigProvider
1 parent 6dd2cc7 commit 563f347

File tree

12 files changed

+181
-64
lines changed

12 files changed

+181
-64
lines changed

clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.KafkaException;
2121
import org.apache.kafka.common.config.provider.ConfigProvider;
2222
import org.apache.kafka.common.config.types.Password;
23+
import org.apache.kafka.common.internals.Plugin;
2324
import org.apache.kafka.common.utils.Utils;
2425

2526
import org.slf4j.Logger;
@@ -546,16 +547,16 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
546547
configProperties = configProviderProps;
547548
classNameFilter = ignored -> true;
548549
}
549-
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
550+
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
550551

551-
if (!providers.isEmpty()) {
552-
ConfigTransformer configTransformer = new ConfigTransformer(providers);
552+
if (!providerPlugins.isEmpty()) {
553+
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
553554
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
554555
if (!result.data().isEmpty()) {
555556
resolvedOriginals.putAll(result.data());
556557
}
557558
}
558-
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
559+
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
559560

560561
return new ResolvingMap<>(resolvedOriginals, originals);
561562
}
@@ -587,14 +588,14 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
587588
* config.providers : A comma-separated list of names for providers.
588589
* config.providers.{name}.class : The Java class name for a provider.
589590
* config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization.
590-
* returns a map of config provider name and its instance.
591+
* returns a map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
591592
*
592593
* @param indirectConfigs The map of potential variable configs
593594
* @param providerConfigProperties The map of config provider configs
594595
* @param classNameFilter Filter for config provider class names
595-
* @return map of config provider name and its instance.
596+
* @return map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
596597
*/
597-
private Map<String, ConfigProvider> instantiateConfigProviders(
598+
private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
598599
Map<String, String> indirectConfigs,
599600
Map<String, ?> providerConfigProperties,
600601
Predicate<String> classNameFilter
@@ -620,21 +621,22 @@ private Map<String, ConfigProvider> instantiateConfigProviders(
620621
}
621622
}
622623
// Instantiate Config Providers
623-
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
624+
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
624625
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
625626
try {
626627
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
627628
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
628629
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
629630
provider.configure(configProperties);
630-
configProviderInstances.put(entry.getKey(), provider);
631+
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
632+
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
631633
} catch (ClassNotFoundException e) {
632634
log.error("Could not load config provider class {}", entry.getValue(), e);
633635
throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
634636
}
635637
}
636638

637-
return configProviderInstances;
639+
return configProviderPluginInstances;
638640
}
639641

640642
private static String providerClassProperty(String providerName) {

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.config.provider.ConfigProvider;
2020
import org.apache.kafka.common.config.provider.FileConfigProvider;
21+
import org.apache.kafka.common.internals.Plugin;
2122

2223
import java.util.ArrayList;
2324
import java.util.HashMap;
@@ -56,15 +57,16 @@ public class ConfigTransformer {
5657
public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");
5758
private static final String EMPTY_PATH = "";
5859

59-
private final Map<String, ConfigProvider> configProviders;
60+
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
6061

6162
/**
6263
* Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
6364
*
64-
* @param configProviders a Map of provider names and {@link ConfigProvider} instances.
65+
* @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances, where each instance
66+
* is wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
6567
*/
66-
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
67-
this.configProviders = configProviders;
68+
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
69+
this.configProviderPlugins = configProviderPlugins;
6870
}
6971

7072
/**
@@ -94,7 +96,7 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
9496
Map<String, Long> ttls = new HashMap<>();
9597
for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
9698
String providerName = entry.getKey();
97-
ConfigProvider provider = configProviders.get(providerName);
99+
ConfigProvider provider = configProviderPlugins.get(providerName).get();
98100
Map<String, Set<String>> keysByPath = entry.getValue();
99101
if (provider != null && keysByPath != null) {
100102
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {

clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
3030
* To support this, implementations of this interface should also contain a service provider configuration file in
3131
* {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
32+
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
33+
* The following tags are automatically added to all metrics registered: <code>config</code> set to
34+
* <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
35+
* and <code>provider</code> set to the provider name.
3236
*/
3337
public interface ConfigProvider extends Configurable, Closeable {
3438

clients/src/main/java/org/apache/kafka/common/internals/Plugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key
4444
return wrapInstance(instance, metrics, () -> tags(key, instance));
4545
}
4646

47+
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, Map<String, String> extraTags) {
48+
Map<String, String> tags = tags(key, instance);
49+
tags.putAll(extraTags);
50+
return wrapInstance(instance, metrics, () -> tags);
51+
}
52+
4753
private static <T> Map<String, String> tags(String key, T instance) {
4854
Map<String, String> tags = new LinkedHashMap<>();
4955
tags.put("config", key);

clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.common.config;
1818

1919
import org.apache.kafka.common.config.provider.ConfigProvider;
20+
import org.apache.kafka.common.internals.Plugin;
2021

2122
import org.junit.jupiter.api.BeforeEach;
2223
import org.junit.jupiter.api.Test;
@@ -45,12 +46,12 @@ public class ConfigTransformerTest {
4546

4647
@BeforeEach
4748
public void setup() {
48-
configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
49+
configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
4950
}
5051

5152
@Test
5253
public void testReplaceVariable() {
53-
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
54+
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
5455
Map<String, String> data = result.data();
5556
Map<String, Long> ttls = result.ttls();
5657
assertEquals(TEST_RESULT, data.get(MY_KEY));
@@ -59,7 +60,7 @@ public void testReplaceVariable() {
5960

6061
@Test
6162
public void testReplaceVariableWithTTL() {
62-
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
63+
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
6364
Map<String, String> data = result.data();
6465
Map<String, Long> ttls = result.ttls();
6566
assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
@@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() {
6869

6970
@Test
7071
public void testReplaceMultipleVariablesInValue() {
71-
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
72+
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
7273
Map<String, String> data = result.data();
7374
assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
7475
}
7576

7677
@Test
7778
public void testNoReplacement() {
78-
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
79+
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}"));
7980
Map<String, String> data = result.data();
8081
assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
8182
}
8283

8384
@Test
8485
public void testSingleLevelOfIndirection() {
85-
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
86+
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}"));
8687
Map<String, String> data = result.data();
8788
assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
8889
}
8990

9091
@Test
9192
public void testReplaceVariableNoPath() {
92-
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}"));
93+
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}"));
9394
Map<String, String> data = result.data();
9495
Map<String, Long> ttls = result.ttls();
9596
assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY));
@@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() {
9899

99100
@Test
100101
public void testReplaceMultipleVariablesWithoutPathInValue() {
101-
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
102+
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
102103
Map<String, String> data = result.data();
103104
assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY));
104105
}

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.config.ConfigDef.Type;
2424
import org.apache.kafka.common.config.ConfigTransformer;
2525
import org.apache.kafka.common.config.provider.ConfigProvider;
26+
import org.apache.kafka.common.internals.Plugin;
2627
import org.apache.kafka.common.security.auth.SecurityProtocol;
2728
import org.apache.kafka.common.utils.Utils;
2829
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -269,18 +270,19 @@ List<String> configProviders() {
269270
Map<String, String> transform(Map<String, String> props) {
270271
// transform worker config according to config.providers
271272
List<String> providerNames = configProviders();
272-
Map<String, ConfigProvider> providers = new HashMap<>();
273+
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
273274
for (String name : providerNames) {
274-
ConfigProvider configProvider = plugins.newConfigProvider(
275+
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
275276
this,
276-
CONFIG_PROVIDERS_CONFIG + "." + name,
277-
Plugins.ClassLoaderUsage.PLUGINS
277+
name,
278+
Plugins.ClassLoaderUsage.PLUGINS,
279+
null
278280
);
279-
providers.put(name, configProvider);
281+
providerPlugins.put(name, configProviderPlugin);
280282
}
281-
ConfigTransformer transformer = new ConfigTransformer(providers);
283+
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
282284
Map<String, String> transformed = transformer.transform(props).data();
283-
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
285+
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
284286
return transformed;
285287
}
286288

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,17 @@ public Worker(
209209

210210
private WorkerConfigTransformer initConfigTransformer() {
211211
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
212-
Map<String, ConfigProvider> providerMap = new HashMap<>();
212+
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
213213
for (String providerName : providerNames) {
214-
ConfigProvider configProvider = plugins.newConfigProvider(
215-
config,
216-
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
217-
ClassLoaderUsage.PLUGINS
214+
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
215+
config,
216+
providerName,
217+
ClassLoaderUsage.PLUGINS,
218+
metrics.metrics()
218219
);
219-
providerMap.put(providerName, configProvider);
220+
providerPluginMap.put(providerName, configProviderPlugin);
220221
}
221-
return new WorkerConfigTransformer(this, providerMap);
222+
return new WorkerConfigTransformer(this, providerPluginMap);
222223
}
223224

224225
public WorkerConfigTransformer configTransformer() {

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.config.ConfigTransformer;
2121
import org.apache.kafka.common.config.ConfigTransformerResult;
2222
import org.apache.kafka.common.config.provider.ConfigProvider;
23+
import org.apache.kafka.common.internals.Plugin;
2324
import org.apache.kafka.common.utils.Utils;
2425
import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
2526
import org.apache.kafka.connect.util.Callback;
@@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable {
4243
private final Worker worker;
4344
private final ConfigTransformer configTransformer;
4445
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
45-
private final Map<String, ConfigProvider> configProviders;
46+
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
4647

47-
public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
48+
public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
4849
this.worker = worker;
49-
this.configProviders = configProviders;
50-
this.configTransformer = new ConfigTransformer(configProviders);
50+
this.configProviderPlugins = configProviderPlugins;
51+
this.configTransformer = new ConfigTransformer(configProviderPlugins);
5152
}
5253

5354
public Map<String, String> transform(Map<String, String> configs) {
@@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {
9798

9899
@Override
99100
public void close() {
100-
configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
101+
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
101102
}
102103
}

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.common.Configurable;
2020
import org.apache.kafka.common.config.AbstractConfig;
2121
import org.apache.kafka.common.config.provider.ConfigProvider;
22+
import org.apache.kafka.common.internals.Plugin;
23+
import org.apache.kafka.common.metrics.Metrics;
2224
import org.apache.kafka.common.utils.Utils;
2325
import org.apache.kafka.connect.components.Versioned;
2426
import org.apache.kafka.connect.connector.Connector;
@@ -627,7 +629,8 @@ private <U> U newVersionedPlugin(
627629
return plugin;
628630
}
629631

630-
public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
632+
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
633+
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
631634
String classPropertyName = providerPrefix + ".class";
632635
Map<String, String> originalConfig = config.originalsStrings();
633636
if (!originalConfig.containsKey(classPropertyName)) {
@@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr
643646
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
644647
plugin.configure(configProviderConfig);
645648
}
646-
return plugin;
649+
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
647650
}
648651

649652
/**

0 commit comments

Comments
 (0)