Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

require 'socket'


# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -546,16 +547,16 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
configProperties = configProviderProps;
classNameFilter = ignored -> true;
}
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The code attempts to close Plugin objects directly, but Plugin doesn't implement Closeable. This should call x.get() to retrieve the actual ConfigProvider instance before closing. Without this change, ConfigProvider resources won't be properly released.

        providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Commitable Suggestion
Suggested change
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Standards
  • Logic-Verification-Resource-Management
  • Algorithm-Correctness-API-Usage
  • Business-Rule-Resource-Cleanup


return new ResolvingMap<>(resolvedOriginals, originals);
}
Expand Down Expand Up @@ -594,7 +595,7 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
* @param classNameFilter Filter for config provider class names
* @return map of config provider name and its instance.
*/
private Map<String, ConfigProvider> instantiateConfigProviders(
private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
Map<String, String> indirectConfigs,
Map<String, ?> providerConfigProperties,
Predicate<String> classNameFilter
Expand All @@ -620,21 +621,22 @@ private Map<String, ConfigProvider> instantiateConfigProviders(
}
}
// Instantiate Config Providers
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Plugin wrapper for the ConfigProvider is created without the provider metric tag. The updated Javadoc for ConfigProvider states that the provider tag will be automatically added to all registered metrics. Other parts of the codebase, such as connect.runtime.isolation.Plugins#newConfigProvider, correctly add this tag.

To ensure consistency and adhere to the documented behavior, the provider name, which is available as entry.getKey(), should be included as a tag.

Suggested change
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG, Map.of("provider", entry.getKey()));

configProviderPluginInstances.put(entry.getKey(), providerPlugin);
Comment on lines +631 to +632
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Security Validation

The provider instance is wrapped without validation before usage. Malicious providers could be instantiated and wrapped without proper security checks, potentially allowing untrusted code execution. This creates a security risk when loading external config providers.

Standards
  • CWE-20
  • OWASP-A03
  • NIST-SSDF-PW.1

} catch (ClassNotFoundException e) {
Comment on lines +631 to 633
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix plugin tag key to avoid collapsing metrics

All provider plugins are wrapped with the constant key config.providers, so the resulting metrics tags become identical (config=config.providers, class=<providerSimpleName>). When the same provider class is configured under multiple aliases, their metrics now collide, hiding per-provider signals. Pass the provider alias (or another unique identifier) instead of the constant key so each plugin keeps distinct tags.

Apply this diff:

-                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, entry.getKey());
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 631 to 633, the plugin wrapper is being created with the constant
CONFIG_PROVIDERS_CONFIG which collapses metrics for different aliases into the
same tag; change the call to Plugin.wrapInstance(...) to pass the provider alias
(e.g., entry.getKey()) or another unique identifier instead of
CONFIG_PROVIDERS_CONFIG so each provider instance keeps distinct metric tags
(ensure any corresponding map keys or subsequent uses expect the alias).

log.error("Could not load config provider class {}", entry.getValue(), e);
throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
}
}

return configProviderInstances;
return configProviderPluginInstances;
}

private static String providerClassProperty(String providerName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -56,15 +57,15 @@ public class ConfigTransformer {
public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");
private static final String EMPTY_PATH = "";

private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;

/**
* Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
*
* @param configProviders a Map of provider names and {@link ConfigProvider} instances.
* @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
this.configProviders = configProviders;
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}
Comment on lines +67 to 69
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Resource Cleanup

The ConfigTransformer now accepts Plugin instead of ConfigProvider but doesn't update its close method. This inconsistency could lead to improper resource cleanup when ConfigTransformer is closed, potentially causing resource leaks in long-running applications.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Resource-Mgmt
  • SRE-Resource-Management


/**
Expand Down Expand Up @@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
Map<String, Long> ttls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
String providerName = entry.getKey();
ConfigProvider provider = configProviders.get(providerName);
Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
if (provider != null && keysByPath != null) {
if (providerPlugin != null && keysByPath != null) {
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
String path = pathWithKeys.getKey();
Set<String> keys = new HashSet<>(pathWithKeys.getValue());
ConfigData configData = provider.get(path, keys);
ConfigData configData = providerPlugin.get().get(path, keys);
Map<String, String> data = configData.data();
Long ttl = configData.ttl();
if (ttl != null && ttl >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
* and <code>provider</code> set to the provider name.
*/
public interface ConfigProvider extends Configurable, Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.config;

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -45,12 +46,12 @@ public class ConfigTransformerTest {

@BeforeEach
public void setup() {
configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Plugin Parameters

The Plugin.wrapInstance() call is missing the extraTags parameter that's used in other similar calls throughout the codebase. This inconsistency could lead to missing provider name tags in metrics, causing monitoring gaps for test providers.

Standards
  • Logic-Verification-API-Usage
  • Algorithm-Correctness-Parameter-Completeness

}

@Test
public void testReplaceVariable() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT, data.get(MY_KEY));
Expand All @@ -59,7 +60,7 @@ public void testReplaceVariable() {

@Test
public void testReplaceVariableWithTTL() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
Expand All @@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() {

@Test
public void testReplaceMultipleVariablesInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
Map<String, String> data = result.data();
assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
}

@Test
public void testNoReplacement() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
}

@Test
public void testSingleLevelOfIndirection() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
}

@Test
public void testReplaceVariableNoPath() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY));
Expand All @@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() {

@Test
public void testReplaceMultipleVariablesWithoutPathInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
Map<String, String> data = result.data();
assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config.provider;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

public class MonitorableConfigProvider implements ConfigProvider, Monitorable {
public static final String NAME = "name";
public static final String DESCRIPTION = "description";
protected boolean configured = false;

@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
}

@Override
public ConfigData get(String path) {
return null;
}

@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}

@Override
public void close() throws IOException {
}

@Override
public void configure(Map<String, ?> configs) {
configured = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
Expand Down Expand Up @@ -269,18 +270,19 @@ List<String> configProviders() {
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, ConfigProvider> providers = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
Comment on lines 274 to 282
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Filter out missing config provider plugins

plugins.newConfigProvider(...) returns null when the provider name lacks a <name>.class declaration. By unconditionally inserting that null we hand ConfigTransformer a null plugin (and later try to close it), which leads to an immediate NullPointerException instead of the previous “just skip it” behavior. Please restore the null check before adding to the map.

-            providerPlugins.put(name, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPlugins.put(name, configProviderPlugin);
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
for (String name : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
if (configProviderPlugin != null) {
providerPlugins.put(name, configProviderPlugin);
}
}
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 274 to 282, the loop unconditionally inserts the result of
plugins.newConfigProvider(...) into providerPlugins, but newConfigProvider can
return null when the provider lacks a <name>.class entry; restore a null check
before putting the plugin into providerPlugins (i.e., if the returned
Plugin<ConfigProvider> is null, skip adding it and optionally log a debug/warn
message) so ConfigTransformer never receives a null plugin and later close() is
not invoked on null.

ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,17 @@ public Worker(

private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
Comment on lines +215 to 221
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Skip null config provider plugins

Plugins.newConfigProvider returns null when the provider name is configured without a corresponding <name>.class. We now insert that null into providerPluginMap, so the first transform attempt dereferences a null plugin and blows up (previous code skipped these). Restore the guard so only successfully-instantiated providers are added.

-            providerPluginMap.put(providerName, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPluginMap.put(providerName, configProviderPlugin);
+            }
🤖 Prompt for AI Agents
In connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
around lines 215 to 221, the code unconditionally inserts the result of
Plugins.newConfigProvider into providerPluginMap even when that method returns
null; restore the previous null-guard so only non-null configProviderPlugin
instances are put into providerPluginMap (i.e., check if configProviderPlugin !=
null before providerPluginMap.put(...) and otherwise skip/continue, optionally
logging a debug/info message that the named provider had no class configured).

return new WorkerConfigTransformer(this, providerMap);
return new WorkerConfigTransformer(this, providerPluginMap);
}

public WorkerConfigTransformer configTransformer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
import org.apache.kafka.connect.util.Callback;
Expand All @@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable {
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;

public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.worker = worker;
this.configProviders = configProviders;
this.configTransformer = new ConfigTransformer(configProviders);
this.configProviderPlugins = configProviderPlugins;
this.configTransformer = new ConfigTransformer(configProviderPlugins);
}

public Map<String, String> transform(Map<String, String> configs) {
Expand Down Expand Up @@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {

@Override
public void close() {
configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
Expand Down Expand Up @@ -627,7 +629,8 @@ private <U> U newVersionedPlugin(
return plugin;
}

public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
String classPropertyName = providerPrefix + ".class";
Comment on lines +632 to 634
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Method Signature

Method signature changed from using providerPrefix to providerName, requiring internal reconstruction of the prefix. This creates inconsistency with previous implementation and increases maintenance complexity when caller code needs modification.

Standards
  • Clean-Code-Method-Design
  • Maintainability-Quality-Consistency

Map<String, String> originalConfig = config.originalsStrings();
if (!originalConfig.containsKey(classPropertyName)) {
Expand All @@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The plugin instance is wrapped without closing it if an exception occurs during wrapping. If Plugin.wrapInstance throws an exception, the already configured plugin won't be properly closed, potentially causing resource leaks. This violates proper resource management patterns.

        try {
            return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
        } catch (Exception e) {
            Utils.closeQuietly(plugin, "config provider");
            throw e;
        }
Commitable Suggestion
Suggested change
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
try {
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
} catch (Exception e) {
Utils.closeQuietly(plugin, "config provider");
throw e;
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Resource-Mgmt
  • SRE-Error-Handling

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Metrics Risk

The metrics parameter can be null (as seen in method signature allowing null) but is passed directly to Plugin.wrapInstance without null checking. This could cause NullPointerException when a ConfigProvider implementing Monitorable tries to use metrics, reducing system reliability.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition
  • SRE-Error-Handling

}

/**
Expand Down
Loading
Loading