Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Sep 29, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Added metrics exposure for configuration providers, including standard tags (config, class, provider) and integration across Connect and MirrorMaker components.
    • Support for monitorable configuration providers to register metrics automatically.
  • Refactor

    • Unified plugin-based handling of configuration providers for consistent lifecycle management and observability.
  • Documentation

    • Expanded guidance on configuration provider metrics and monitorability.
  • Tests

    • New tests validating metrics registration for monitorable providers and plugin-based provider handling.

@coderabbitai
Copy link

coderabbitai bot commented Sep 29, 2025

Walkthrough

Refactors config provider handling to use Plugin-wrapped ConfigProvider instances across clients and Connect. Updates constructors, fields, and call sites to pass Map<String, Plugin>; adapts provider instantiation, transformation, and cleanup flows; adds metrics-aware provider creation; expands Javadoc; and introduces tests for monitorable providers and metrics.

Changes

Cohort / File(s) Summary
Core config transformation (clients)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java, clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
Switches from direct ConfigProvider to Plugin in provider instantiation, wiring, and cleanup; updates ConfigTransformer constructor, fields, and access to use plugin.get().
Clients tests and docs
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java, clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java, clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
Tests updated to wrap providers with Plugin and use Map.of; adds Monitorable test provider; extends ConfigProvider Javadoc about Monitorable and metric tags.
Connect runtime: worker and transformer
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
Migrates provider maps to Plugin; updates construction of WorkerConfigTransformer and closes plugin wrappers.
Connect isolation and provider creation
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
Changes newConfigProvider to return Plugin; derives prefix from provider name; adds Metrics parameter; configures provider, passes PluginMetrics, and wraps via Plugin.wrapInstance.
Connect MirrorMaker
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
Updates transform path to use Plugin; adjusts provider creation, transformer construction, and plugin cleanup.
Connect tests
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
Aligns tests with Plugin-wrapped providers; adds tests for Monitorable provider metrics and Plugin-based creation; introduces helper assertions and a custom monitorable provider subclass.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Caller as Worker/MirrorMakerConfig/AbstractConfig
  participant Plugins as Plugins.newConfigProvider
  participant Prov as ConfigProvider (impl)
  participant Metrics as PluginMetrics
  participant Wrapper as Plugin<ConfigProvider>

  Caller->>Plugins: newConfigProvider(config, providerName, classLoaderUsage, metrics)
  Plugins->>Plugins: Resolve class & instantiate
  Plugins->>Prov: configure(configs)
  alt Provider implements Monitorable
    Plugins->>Prov: withPluginMetrics(Metrics)
  end
  Plugins->>Wrapper: Plugin.wrapInstance(Prov, metrics)
  Plugins-->>Caller: Wrapper
Loading
sequenceDiagram
  autonumber
  participant C as Component (Worker/AbstractConfig/…)
  participant CT as ConfigTransformer
  participant Map as Map<String, Plugin<ConfigProvider>>
  participant P as Plugin<ConfigProvider>
  participant Prov as ConfigProvider

  C->>CT: new ConfigTransformer(Map)
  C->>CT: transform(config)
  loop For each ${provider:path}
    CT->>Map: get(provider)
    alt Found
      Map-->>CT: P
      CT->>P: get()
      P-->>CT: Prov
      CT->>Prov: get(path[, keys])
      Prov-->>CT: ConfigData
    else Missing
      CT-->>C: error for unknown provider
    end
  end
  C->>Map: close all
  Map->>P: close()
  P-->>C: closed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I wrapped a key in a plugin coat,
With metrics tags on every note.
Providers hopped to measured beats,
Transforming paths in tidy fleets.
Close the wrappers, carrots stacked—
A rabbit’s merge: precise, exact. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The pull request title “Clone kafka 18894” does not describe the primary changes which involve introducing Plugin wrappers for ConfigProvider instances, updating the config resolution path in AbstractConfig, ConfigTransformer, MirrorMakerConfig, Worker, and associated tests. Instead it references an issue number and a clone action without conveying the functional scope or impact of the changes. This makes it difficult for reviewers or future maintainers to quickly understand the intent of the PR. Please rename the title to a concise summary of the main change, for example “Wrap ConfigProvider instances in Plugin wrappers for configuration resolution,” so that the PR clearly communicates its purpose at a glance.
Description Check ⚠️ Warning The pull request description is still the default placeholder template and lacks any actual summary of changes, motivation, or testing strategy as required by the repository’s description template. It does not explain what has been modified, why these changes were made, or how they have been tested, leaving reviewers without critical context. All required sections from the description template remain unfilled. Please replace the placeholder text with a detailed description that includes a summary of the changes (such as wrapping ConfigProvider instances in Plugin wrappers and updating related classes and tests), the motivation for these updates, and a description of the testing strategy including unit and integration tests. Ensure that each section from the repository’s description template is fully completed.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors the handling of ConfigProvider instances within Kafka by introducing a generic Plugin abstraction. This change standardizes the lifecycle management of configuration providers and, crucially, enables automatic metric registration for providers that implement the Monitorable interface. The update ensures better observability and a more extensible architecture for external plugins across Kafka's client and Connect components.

Highlights

  • ConfigProvider Plugin Abstraction: Introduced a new Plugin wrapper for ConfigProvider instances across various Kafka components, including AbstractConfig, ConfigTransformer, MirrorMakerConfig, Worker, and WorkerConfigTransformer. This refactoring standardizes the management and lifecycle of configuration providers.
  • Metrics Integration for ConfigProviders: Enabled automatic metric registration for ConfigProvider implementations that also implement the Monitorable interface. The Plugin wrapper now handles the withPluginMetrics callback, allowing providers to expose operational metrics with predefined tags.
  • Updated Plugin Instantiation Logic: The Plugins utility class now returns Plugin<ConfigProvider> instead of raw ConfigProvider instances and accepts Metrics as an argument during instantiation, facilitating the new metrics integration.
  • Test Coverage for New Functionality: Added new test cases and updated existing ones in ConfigTransformerTest, WorkerTest, and PluginsTest to validate the correct behavior of the Plugin wrapper, metric registration, and the updated ConfigProvider instantiation flow.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the ConfigProvider instantiation logic to wrap providers in a Plugin instance. This change enables metrics for ConfigProvider implementations that are Monitorable. The changes are applied consistently across AbstractConfig, MirrorMakerConfig, and the Connect runtime.

My main feedback is to ensure that when wrapping ConfigProviders in AbstractConfig, the provider name is included as a tag for metrics. This will prevent metric collisions when multiple providers of the same class are used and aligns with the implementation in other parts of the codebase.

ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ConfigProvider is being wrapped in a Plugin without including the provider name as a tag. This could lead to metric collisions if multiple config providers of the same class are configured, as they would share the same metric tags.

To ensure metrics are uniquely identifiable per provider instance, the provider name should be added as a tag. This is consistent with the implementation in connect/runtime and the updated Javadoc for ConfigProvider.

Suggested change
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG, Map.of("provider", entry.getKey()));

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (9)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)

49-49: Correctly wrapping TestConfigProvider with Plugin; key tag choice is fine.

Passing metrics as null is appropriate here; no monitorable behavior is exercised in this test. If you want extra coverage later, consider a small, separate test with a Monitorable provider to validate metrics wiring and close semantics, but not required for this class.

clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)

32-35: Clarify metrics tag guarantees in Javadoc

The “provider” tag is added because callers wrap providers with Plugin.wrapInstance(..., key, Map.of("provider", name)). If any caller uses the simpler overload without extraTags, “provider” won’t be present. Suggest scoping the statement to “when wrapped via Plugin in Kafka components (e.g., Connect)”.

Consider this tweak:

- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
- * The following tags are automatically added to all metrics registered: <code>config</code> set to
- * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
- * and <code>provider</code> set to the provider name.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
+ * When wrapped via Kafka’s plugin mechanism (e.g., Connect), metrics will include tags:
+ * <code>config</code>=<code>config.providers</code>, <code>class</code>=simple name,
+ * and <code>provider</code>=configured provider name.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)

632-650: Wrap looks good; document nullability and contract

Method can return null when no config.providers.<name>.class is set. Please document this (and optionally annotate as nullable) to ensure all callers guard against nulls. The extra “provider” tag in wrapInstance matches the new Javadoc guidance.

Verify all call sites (Worker, MirrorMaker, tests) handle a null return and close the plugin wrapper in a finally block to avoid leaks on transform failures.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)

212-223: Provider plugin wiring looks correct; consider tiny improvements

  • Passing metrics into plugins.newConfigProvider(..., metrics.metrics()) and handing Map<String, Plugin<ConfigProvider>> to the transformer is consistent with the Plugin-based flow. LGTM.
  • Minor: pre-size the HashMap to avoid a resize on large provider lists.

Example:

-Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
+Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>(providerNames.size());

Also, please confirm WorkerConfigTransformer.close() now closes the Plugin-wrapped providers so Worker.stop doesn’t leak provider resources.

clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)

29-58: Test helper is fine; consider returning a non-null ConfigData

Current get(...) methods return null. If these are ever used outside metrics-only tests, a null could cause NPEs. Optional: return an empty ConfigData instead.

-    public ConfigData get(String path) {
-        return null;
-    }
+    public ConfigData get(String path) {
+        return new ConfigData(Map.of());
+    }
...
-    public ConfigData get(String path, Set<String> keys) {
-        return null;
-    }
+    public ConfigData get(String path, Set<String> keys) {
+        return new ConfigData(Map.of());
+    }

Otherwise, looks good for validating Monitorable metrics wiring.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)

382-389: Unify provider stubbing to propagate Metrics into Plugin.wrapInstance.

Current stub wraps with null metrics; not wrong for MockFileConfigProvider, but using the passed Metrics mirrors production wiring and future‑proofs tests.

Apply:

-        Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(mockFileConfigProvider,
-            null,
-            WorkerConfig.CONFIG_PROVIDERS_CONFIG,
-            Map.of("provider", "file"));
-        when(plugins.newConfigProvider(any(AbstractConfig.class),
-            eq("file"),
-            any(ClassLoaderUsage.class),
-            any(Metrics.class))).thenReturn(providerPlugin);
+        when(plugins.newConfigProvider(any(AbstractConfig.class),
+            eq("file"),
+            any(ClassLoaderUsage.class),
+            any(Metrics.class))).thenAnswer(inv -> {
+                Metrics m = inv.getArgument(3);
+                return Plugin.wrapInstance(
+                    mockFileConfigProvider,
+                    m,
+                    WorkerConfig.CONFIG_PROVIDERS_CONFIG,
+                    Map.of("provider", "file"));
+            });

2904-2928: Close Worker to avoid metrics/JMX leakage between tests.

Add a stop at the end of the test.

     assertMetrics(metrics,
         1,
         expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2")));
-}
+        worker.stop();
+}
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)

391-402: Close the returned Plugin to keep tests hermetic.

Use try‑with‑resources so the provider and its plugin metrics are closed.

-        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
-            config,
-            providerName,
-            ClassLoaderUsage.PLUGINS,
-            null
-        );
-
-        assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
-        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
+        try (Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
+            config, providerName, ClassLoaderUsage.PLUGINS, null)) {
+            assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
+            Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
             assertTrue(samples.containsKey("configure"));
-        assertPluginClassLoaderAlwaysActive(plugin.get());
+            assertPluginClassLoaderAlwaysActive(plugin.get());
+        }

404-413: Also close the Plugin here.

Mirror the pattern to ensure proper cleanup.

-        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
-        assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+        try (Plugin<ConfigProvider> plugin =
+                 plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics())) {
+            assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+        }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and b92a2bb.

📒 Files selected for processing (12)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (4)
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala (1)
  • MockFileConfigProvider (1636-1641)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
🔇 Additional comments (11)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (2)

20-20: Import of Plugin aligns test with new provider wrapping.

Good adjustment to the updated constructor signature expecting Plugin-wrapped providers.


54-54: Migration to Map.of for input maps looks good.

Immutable inputs improve clarity. Keeping Collections.singletonMap for the null-value case (Line 109) is correct since Map.of disallows nulls.

Also applies to: 63-63, 72-72, 79-79, 86-86, 93-93, 102-102

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (2)

69-70: Good: tests now exercise Plugin‑wrapped providers

Using Plugin.wrapInstance(..., "config.providers") aligns tests with runtime behavior and future metrics wiring.


150-158: Good: Map.of(...) and explicit TTLs in ConfigData

The switch to Map.of(...) is concise and immutable; TTL cases remain clear.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)

40-48: Imports for plugin-wrapped providers and metrics look correct.

Adds ConfigProvider, MonitorableConfigProvider, Plugin, Metrics, and LinkedHashMap where needed.

Also applies to: 117-118


2929-2951: Metrics assertion helpers are solid.

Deterministic tag construction and group/name/description checks read well.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)

46-52: API migration to Plugin is correct.

Field, ctor, and transformer wiring align with the new plugin wrapper.


100-102: Close plugin wrappers on shutdown.

Using Utils.closeQuietly on the Plugin instances is the right lifecycle behavior.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)

25-29: Imports for monitorable and plugin metrics are appropriate.


810-816: CustomMonitorableConfigProvider assertion is precise.

Asserting withPluginMetrics runs after configure enforces the intended contract.

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)

60-69: Switch to Plugin and access via get() is correct.

Constructor, field, and provider resolution changes are consistent; transformation logic remains unchanged.

Also applies to: 98-105

Comment on lines +550 to 561
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

return new ResolvingMap<>(resolvedOriginals, originals);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close provider plugins in a finally block to avoid leaks on transform failure

If ConfigTransformer.transform(...) throws, providerPlugins.values().forEach(closeQuietly) is skipped, leaking provider resources. Wrap the transform block in try/finally.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins =
+                instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+        try {
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return new ResolvingMap<>(resolvedOriginals, originals);
Map<String, Plugin<ConfigProvider>> providerPlugins =
instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
try {
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
} finally {
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
return new ResolvingMap<>(resolvedOriginals, originals);
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 550 to 561, the provider plugins are instantiated and used to
transform config but closed only after the transform call; if transform throws
their close is skipped and resources leak. Fix by wrapping the transform and any
use of providerPlugins in a try block and moving
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider
plugin")) into a finally block so plugins are always closed (rethrow the
original exception if one occurs).

Comment on lines +624 to 633
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
} catch (ClassNotFoundException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close provider instance if configure() throws to prevent resource leaks

If provider.configure(...) throws, the newly created provider isn’t closed. Ensure we close it before rethrowing.

-                ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
-                provider.configure(configProperties);
-                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
+                try {
+                    provider.configure(configProperties);
+                } catch (RuntimeException e) {
+                    Utils.closeQuietly(provider, "config provider");
+                    throw e;
+                }
+                Plugin<ConfigProvider> providerPlugin =
+                        Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
                 configProviderPluginInstances.put(entry.getKey(), providerPlugin);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
} catch (ClassNotFoundException e) {
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
try {
provider.configure(configProperties);
} catch (RuntimeException e) {
Utils.closeQuietly(provider, "config provider");
throw e;
}
Plugin<ConfigProvider> providerPlugin =
Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);

Comment on lines 270 to 287
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, ConfigProvider> providers = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix NPE risk and ensure plugins are always closed

  • If a provider name lacks config.providers.<name>.class, newConfigProvider(...) returns null; adding it to the map can NPE inside ConfigTransformer.
  • Close plugin wrappers in a finally block to avoid leaks if transform throws.

Apply:

-        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
         for (String name : providerNames) {
             Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                     this,
                     name,
                     Plugins.ClassLoaderUsage.PLUGINS,
                     null
             );
-            providerPlugins.put(name, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPlugins.put(name, configProviderPlugin);
+            }
         }
-        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
-        Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
-        return transformed;
+        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+        Map<String, String> transformed;
+        try {
+            transformed = transformer.transform(props).data();
+        } finally {
+            for (Plugin<ConfigProvider> p : providerPlugins.values()) {
+                Utils.closeQuietly(p, "config provider plugin");
+            }
+        }
+        return transformed;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, ConfigProvider> providers = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
}
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
if (configProviderPlugin != null) {
providerPlugins.put(name, configProviderPlugin);
}
}
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed;
try {
transformed = transformer.transform(props).data();
} finally {
for (Plugin<ConfigProvider> p : providerPlugins.values()) {
Utils.closeQuietly(p, "config provider plugin");
}
}
return transformed;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants