Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

require 'socket'


# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -546,16 +547,16 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
configProperties = configProviderProps;
classNameFilter = ignored -> true;
}
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

Comment on lines +550 to 560
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Always close provider plugins on failure (use finally) to avoid leaks

If ConfigTransformer construction or transform throws, provider plugins aren’t closed. Wrap with try/finally and include provider name in close logs.

Apply this diff:

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins = Collections.emptyMap();
+        try {
+            providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            for (Map.Entry<String, Plugin<ConfigProvider>> e : providerPlugins.entrySet()) {
+                Utils.closeQuietly(e.getValue(), "config provider plugin '" + e.getKey() + "'");
+            }
+        }

If ConfigTransformer implements AutoCloseable, we should also close it. Check with:


🏁 Script executed:

#!/bin/bash
rg -nP 'class\s+ConfigTransformer\b.*implements\s+AutoCloseable' -g '!**/test/**'
rg -nP '\bvoid\s+close\s*\(' clients/ connect/

Length of output: 42678


🏁 Script executed:

find . -type f -name ConfigTransformer.java

Length of output: 77


🏁 Script executed:

#!/bin/bash
sed -n '1,200p' clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java

Length of output: 8388


Always close provider plugins on failure (use try/finally)
Wrap instantiation and transformation in a try/finally to guarantee plugins are closed on any exception—include the provider name in the close log. ConfigTransformer does not implement AutoCloseable, so it need not be closed.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins = Collections.emptyMap();
+        try {
+            providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            for (Map.Entry<String, Plugin<ConfigProvider>> e : providerPlugins.entrySet()) {
+                Utils.closeQuietly(e.getValue(), "config provider plugin '" + e.getKey() + "'");
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Map<String, Plugin<ConfigProvider>> providerPlugins = Collections.emptyMap();
try {
providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
} finally {
for (Map.Entry<String, Plugin<ConfigProvider>> e : providerPlugins.entrySet()) {
Utils.closeQuietly(e.getValue(), "config provider plugin '" + e.getKey() + "'");
}
}
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 550 to 560, the provider plugins are instantiated and transformed
but only closed after the success path; wrap the instantiation and the call to
ConfigTransformer.transform(indirectVariables) in a try/finally so that
providerPlugins.values() are closed in the finally block on any exception, and
when closing each plugin use Utils.closeQuietly(plugin, "config provider plugin
" + pluginName) or otherwise include the provider name in the close log;
ConfigTransformer need not be closed.

return new ResolvingMap<>(resolvedOriginals, originals);
}
Expand Down Expand Up @@ -594,7 +595,7 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
* @param classNameFilter Filter for config provider class names
* @return map of config provider name and its instance.
*/
private Map<String, ConfigProvider> instantiateConfigProviders(
private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
Map<String, String> indirectConfigs,
Map<String, ?> providerConfigProperties,
Predicate<String> classNameFilter
Expand All @@ -620,21 +621,22 @@ private Map<String, ConfigProvider> instantiateConfigProviders(
}
}
// Instantiate Config Providers
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
} catch (ClassNotFoundException e) {
log.error("Could not load config provider class {}", entry.getValue(), e);
throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
}
}

return configProviderInstances;
return configProviderPluginInstances;
}

private static String providerClassProperty(String providerName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -56,15 +57,15 @@ public class ConfigTransformer {
public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");
private static final String EMPTY_PATH = "";

private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;

/**
* Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
*
* @param configProviders a Map of provider names and {@link ConfigProvider} instances.
* @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
this.configProviders = configProviders;
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}

/**
Expand Down Expand Up @@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
Map<String, Long> ttls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
String providerName = entry.getKey();
ConfigProvider provider = configProviders.get(providerName);
Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
if (provider != null && keysByPath != null) {
if (providerPlugin != null && keysByPath != null) {
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
String path = pathWithKeys.getKey();
Set<String> keys = new HashSet<>(pathWithKeys.getValue());
ConfigData configData = provider.get(path, keys);
ConfigData configData = providerPlugin.get().get(path, keys);
Map<String, String> data = configData.data();
Long ttl = configData.ttl();
if (ttl != null && ttl >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
* and <code>provider</code> set to the provider name.
*/
public interface ConfigProvider extends Configurable, Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.config;

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -45,12 +46,12 @@ public class ConfigTransformerTest {

@BeforeEach
public void setup() {
configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
}

@Test
public void testReplaceVariable() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT, data.get(MY_KEY));
Expand All @@ -59,7 +60,7 @@ public void testReplaceVariable() {

@Test
public void testReplaceVariableWithTTL() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
Expand All @@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() {

@Test
public void testReplaceMultipleVariablesInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
Map<String, String> data = result.data();
assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
}

@Test
public void testNoReplacement() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
}

@Test
public void testSingleLevelOfIndirection() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
}

@Test
public void testReplaceVariableNoPath() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY));
Expand All @@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() {

@Test
public void testReplaceMultipleVariablesWithoutPathInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
Map<String, String> data = result.data();
assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config.provider;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

public class MonitorableConfigProvider implements ConfigProvider, Monitorable {
public static final String NAME = "name";
public static final String DESCRIPTION = "description";
protected boolean configured = false;

@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
}

@Override
public ConfigData get(String path) {
return null;
}

@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}
Comment on lines +41 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return non-null ConfigData from get methods to avoid NPEs.

ConfigTransformer expects a non-null ConfigData; current null return can crash tests using this provider.

     @Override
     public ConfigData get(String path) {
-        return null;
+        return new ConfigData(Map.of());
     }

     @Override
     public ConfigData get(String path, Set<String> keys) {
-        return null;
+        return new ConfigData(Map.of());
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public ConfigData get(String path) {
return null;
}
@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}
@Override
public ConfigData get(String path) {
return new ConfigData(Map.of());
}
@Override
public ConfigData get(String path, Set<String> keys) {
return new ConfigData(Map.of());
}
🤖 Prompt for AI Agents
In
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
around lines 41 to 48, the two get methods currently return null causing NPEs;
replace those null returns with non-null ConfigData instances (e.g., an empty
Map for data and a null/empty version) and in the keyed get(String path,
Set<String> keys) return a ConfigData whose data contains only the requested
keys (or an empty map if none), ensuring both methods always return a valid
ConfigData object.


@Override
public void close() throws IOException {
}

@Override
public void configure(Map<String, ?> configs) {
configured = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
Expand Down Expand Up @@ -269,18 +270,19 @@ List<String> configProviders() {
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, ConfigProvider> providers = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
Comment on lines +273 to 286
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Close provider plugins in a finally block and skip null plugins.

Prevents resource leaks if transform throws and avoids null values in the map.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
-        for (String name : providerNames) {
-            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
-                    this,
-                    name,
-                    Plugins.ClassLoaderUsage.PLUGINS,
-                    null
-            );
-            providerPlugins.put(name, configProviderPlugin);
-        }
-        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
-        Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
-        return transformed;
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+        try {
+            for (String name : providerNames) {
+                Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+                        this,
+                        name,
+                        Plugins.ClassLoaderUsage.PLUGINS,
+                        null
+                );
+                if (configProviderPlugin != null) {
+                    providerPlugins.put(name, configProviderPlugin);
+                }
+            }
+            ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+            return transformer.transform(props).data();
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
try {
for (String name : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
if (configProviderPlugin != null) {
providerPlugins.put(name, configProviderPlugin);
}
}
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
return transformer.transform(props).data();
} finally {
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 273 to 286, the provider plugins are added to the map and closed
after transform, but if transformer.transform(props) throws you leak resources
and you may insert null plugins into the map; wrap plugin creation/transform
usage in a try/finally where you close all created plugins in the finally block,
and when constructing the map skip adding null Plugin instances (i.e., only put
non-null plugins), and when closing iterate and skip nulls to avoid NPEs.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,17 @@ public Worker(

private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
return new WorkerConfigTransformer(this, providerMap);
return new WorkerConfigTransformer(this, providerPluginMap);
}

public WorkerConfigTransformer configTransformer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
import org.apache.kafka.connect.util.Callback;
Expand All @@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable {
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;

public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.worker = worker;
this.configProviders = configProviders;
this.configTransformer = new ConfigTransformer(configProviders);
this.configProviderPlugins = configProviderPlugins;
this.configTransformer = new ConfigTransformer(configProviderPlugins);
}

public Map<String, String> transform(Map<String, String> configs) {
Expand Down Expand Up @@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {

@Override
public void close() {
configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
Expand Down Expand Up @@ -627,7 +629,8 @@ private <U> U newVersionedPlugin(
return plugin;
}

public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

API change: verify all call sites and document behavior

Method now returns Plugin<ConfigProvider> and requires Metrics. Ensure all usages are updated and that release notes call out this source/binary change.

Run to find call sites and check that metrics are threaded through:


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# All invocations of newConfigProvider
rg -nP --type=java -C2 '\bnewConfigProvider\s*\('

# Quick sanity: confirm files calling it also reference Metrics
rg -nP --type=java -C2 '\bnewConfigProvider\s*\(' \
  | sed -n 's/^\([^:]*\):.*/\1/p' | sort -u \
  | xargs -I{} rg -nP --type=java -H '\bMetrics\b|\bmetrics\b' {}

Length of output: 17259


🏁 Script executed:

#!/usr/bin/env bash
set -euo pipefail

# List all non-test Java files invoking newConfigProvider
files=$(rg -lP --type=java 'newConfigProvider\s*\(' | grep -v '/src/test/' || true)

for file in $files; do
  echo "=== $file ==="
  rg -nP --type=java -C3 'newConfigProvider' "$file"
  echo
done

Length of output: 1541


🏁 Script executed:

#!/usr/bin/env bash
set -euo pipefail

echo "MirrorMakerConfig.java (lines 270–285):"
sed -n '270,285p' connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
echo
echo "Worker.java (lines 210–225):"
sed -n '210,225p' connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

Length of output: 1669


Pass a real Metrics to newConfigProvider in MirrorMakerConfig

  • In connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (around line 275), you’re invoking
    plugins.newConfigProvider(this, name, Plugins.ClassLoaderUsage.PLUGINS, null)
    — replace the null with a valid Metrics instance (e.g. create or inject a ConnectMetrics.metrics() object) to avoid NPEs and enable metrics collection.
  • Document in the release notes that newConfigProvider now requires a Metrics argument, as this is a breaking API change.
🤖 Prompt for AI Agents
In
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
around line 632, the newConfigProvider signature now requires a non-null Metrics
parameter; update callers (notably
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around line ~275) to pass a real Metrics instance instead of null (e.g., obtain
or inject a ConnectMetrics/ConnectMetrics.metrics() instance or create a Metrics
from the current ConnectMetrics context), add necessary imports and wiring to
ensure the Metrics object is available where MirrorMakerConfig constructs the
provider, and update the release notes to state that newConfigProvider now
requires a Metrics argument (breaking API change).

String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
String classPropertyName = providerPrefix + ".class";
Map<String, String> originalConfig = config.originalsStrings();
if (!originalConfig.containsKey(classPropertyName)) {
Expand All @@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
}

/**
Expand Down
Loading
Loading