Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Sep 29, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Config providers now run as plugins with built-in metrics support, exposing provider-scoped metrics with standardized tags across Connect and MirrorMaker.
  • Documentation

    • Updated guidance for config providers to clarify monitoring support and automatic metric tagging.
  • Refactor

    • Unified handling of config providers via plugin wrappers for consistent initialization and cleanup.
  • Tests

    • Expanded and updated tests to validate plugin-wrapped providers, metric propagation, and backward-compatible transformation behavior.

@coderabbitai
Copy link

coderabbitai bot commented Sep 29, 2025

Walkthrough

Replaces direct ConfigProvider usage with Plugin wrappers across config resolution and Connect runtime. Updates constructors, fields, and maps to hold plugins, adjusts transform and close flows to use plugin.get() and plugin-aware cleanup, extends Plugins.newConfigProvider to return wrapped plugins with metrics, and adds tests plus a monitorable provider.

Changes

Cohort / File(s) Summary
Core config resolution: plugin-wrapped providers
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java, clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
Switch maps from ConfigProvider to Plugin; construct ConfigTransformer with plugins; resolve via providerPlugin.get(); update instantiation to Plugin.wrapInstance(...); adjust close messaging to “config provider plugin”.
ConfigProvider documentation
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
Javadoc augmented to mention Monitorable and automatic metric tags; no API changes.
Connect runtime integration
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java, connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
Migrate provider maps and transformer construction to Plugin; Plugins.newConfigProvider now returns Plugin and accepts providerName and Metrics; wrap instances with Plugin.wrapInstance(...); update cleanup to close plugins.
Client-side tests and helpers
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java, clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
Tests updated to pass Plugin-wrapped providers (Plugin.wrapInstance(...)); replace Collections.singletonMap with Map.of; add MonitorableConfigProvider test implementation exposing a metric and basic lifecycle hooks.
Connect runtime tests
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
Adapt tests to plugin-wrapped providers; add tests for MonitorableConfigProvider metrics tagging and plugin metrics wiring; introduce CustomMonitorableConfigProvider for assertions; adjust API usage to Plugins.newConfigProvider returning Plugin.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor App as Caller
  participant AC as AbstractConfig
  participant P as Plugins
  participant PL as Plugin<ConfigProvider>
  participant CP as ConfigProvider
  participant CT as ConfigTransformer

  App->>AC: resolveConfigVariables(props)
  AC->>P: newConfigProvider(config, providerName, usage, metrics)
  P-->>AC: PL = Plugin.wrapInstance(CP, metrics, CONFIG_PROVIDERS_CONFIG, {provider})
  AC->>CT: new ConfigTransformer(Map<String, PL>)
  note over CT: Transformer holds Plugin-wrapped providers
  App->>CT: transform(values)
  CT->>PL: get()
  PL-->>CT: CP
  CT->>CP: get(path, keys)
  CP-->>CT: ConfigData
  CT-->>App: transformed values
  App->>PL: close()
  note over PL: Closed via plugin-aware cleanup
Loading
sequenceDiagram
  autonumber
  actor W as Worker/MirrorMaker
  participant P as Plugins
  participant PL as Plugin<ConfigProvider>
  participant WCT as WorkerConfigTransformer
  participant CT as ConfigTransformer

  W->>P: newConfigProvider(config, providerName, usage, metrics)
  P-->>W: PL
  W->>WCT: new(..., Map<String, PL>)
  WCT->>CT: new ConfigTransformer(Map<String, PL>)
  note over WCT,CT: Transform flow uses plugin.get() to access provider
  W->>WCT: close()
  WCT->>PL: close() (Utils.closeQuietly, "config provider plugin")
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

In burrows of code I bounce with delight,
Wrapping providers snug, metrics in sight.
Hop, hop—plugins now carry the load,
get() then close() on the leafy road.
Carrots of tests sprout green and bright—
Transformers hum through the Kafka night. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone kafka 18894” does not describe the substantive changes in the pull request and instead references an internal tracking number; it is too generic to convey that the PR is wrapping ConfigProvider instances in Plugin and updating related code paths. Please update the title to a clear, concise summary of the primary change, for example “Wrap ConfigProvider instances in Plugin for resolver and transformer flows.”
Description Check ⚠️ Warning The pull request description is still the unedited template placeholder and does not include any details about what was changed, why those changes were made, or how they were tested, so it fails to meet the repository’s description requirements. Please replace the placeholder text with a detailed description that outlines the changes made, the rationale behind them, and the testing strategy (including unit or integration tests) used to verify the behavior.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors how ConfigProvider instances are handled within Kafka and Kafka Connect by introducing a Plugin wrapper. This change streamlines the management of ConfigProvider lifecycles and, more importantly, allows these providers to expose operational metrics through the Monitorable interface. The Plugin wrapper automatically registers these metrics with relevant tags, thereby improving observability into ConfigProvider activities. The modifications are applied consistently across core configuration classes, the Kafka Connect runtime, and their respective test suites to ensure robust integration and validation of the new plugin management and metrics features.

Highlights

  • Plugin Wrapper for ConfigProvider: Introduced a Plugin wrapper for ConfigProvider instances, standardizing their lifecycle management and enabling enhanced capabilities.
  • Metrics for ConfigProviders: Enabled ConfigProvider implementations to expose metrics by implementing the Monitorable interface, with automatic registration and tagging (e.g., config, class, provider).
  • Refactored ConfigProvider Usage: Updated various components including AbstractConfig, ConfigTransformer, MirrorMakerConfig, Worker, WorkerConfigTransformer, and Plugins to work with Plugin<ConfigProvider> instead of raw ConfigProvider instances.
  • New Test Utility and Cases: Added a new MonitorableConfigProvider test utility and new test cases to verify the metrics registration and proper functioning of the Plugin wrapper for ConfigProviders.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors ConfigProvider instantiation to use a Plugin wrapper, enabling metrics for config providers via the Monitorable interface. The changes are well-integrated across the client, connect-runtime, and mirror-maker modules, and include corresponding test updates. This is a good improvement for observability. I have a few minor suggestions to improve code clarity and robustness in a test utility.

}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and maintainability, it's good practice to use more descriptive variable names in lambda expressions. Using plugin instead of x would make the code's intent clearer.

Suggested change
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
providerPlugins.values().forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin"));

Comment on lines +41 to +48
public ConfigData get(String path) {
return null;
}

@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The get methods return null, which can lead to a NullPointerException in callers like ConfigTransformer that don't expect a null ConfigData. To make this test utility more robust and align with the implicit contract of ConfigProvider, it's better to return an empty ConfigData object.

Suggested change
public ConfigData get(String path) {
return null;
}
@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}
@Override
public ConfigData get(String path) {
return new ConfigData(java.util.Collections.emptyMap());
}
@Override
public ConfigData get(String path, Set<String> keys) {
return new ConfigData(java.util.Collections.emptyMap());
}

ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and maintainability, it's good practice to use more descriptive variable names in lambda expressions. Using plugin instead of x would make the code's intent clearer.

Suggested change
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
providerPlugins.values().forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin"));

@Override
public void close() {
configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and maintainability, it's good practice to use more descriptive variable names in lambda expressions. Using plugin instead of x would make the code's intent clearer.

Suggested change
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
configProviderPlugins.values().forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin"));

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)

632-650: Change wrap order to before configuration
Wrap the provider with Plugin.wrapInstance before calling configure() so that Monitorable providers receive metrics during their configure(...) call.

🧹 Nitpick comments (8)
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)

32-35: Clarify when metrics and tags are attached to providers

Tags are added only when the provider is wrapped via Plugins.newConfigProvider and a non-null Metrics is supplied; the class tag is the simple class name. Consider tightening the Javadoc to avoid implying unconditional tagging.

- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
- * The following tags are automatically added to all metrics registered: <code>config</code> set to
- * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
- * and <code>provider</code> set to the provider name.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics
+ * when created via {@code Plugins.newConfigProvider(...)} with a non-null {@code Metrics}.
+ * The following tags are added to all metrics registered: <code>config</code> = <code>config.providers</code>,
+ * <code>class</code> = the provider's simple class name, and <code>provider</code> = the configured provider name.
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

270-287: Avoid null entries in providerPlugins; filter before put and close

newConfigProvider may return null; skip adding those and you won’t rely on Utils.closeQuietly handling nulls. Keeps the map clean and intent clear.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
         for (String name : providerNames) {
             Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                     this,
                     name,
                     Plugins.ClassLoaderUsage.PLUGINS,
                     null
             );
-            providerPlugins.put(name, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPlugins.put(name, configProviderPlugin);
+            }
         }
         ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
         Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        providerPlugins.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin"));
         return transformed;
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

624-633: Add provider name as a Plugin metrics tag for better observability

Include the provider’s logical name in Plugin tags to aid metrics/log correlation.

-                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(
+                        provider,
+                        null,
+                        CONFIG_PROVIDERS_CONFIG,
+                        Collections.singletonMap("name", entry.getKey())
+                );
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)

210-223: Clean up partially created provider plugins if initialization fails

If creating a later provider or constructing WorkerConfigTransformer throws, earlier plugins remain unclosed. Add failure-path cleanup.

     private WorkerConfigTransformer initConfigTransformer() {
         final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
-        Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
-        for (String providerName : providerNames) {
-            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
-                config,
-                providerName,
-                ClassLoaderUsage.PLUGINS,
-                metrics.metrics()
-            );
-            providerPluginMap.put(providerName, configProviderPlugin);
-        }
-        return new WorkerConfigTransformer(this, providerPluginMap);
+        Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
+        try {
+            for (String providerName : providerNames) {
+                Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+                        config,
+                        providerName,
+                        ClassLoaderUsage.PLUGINS,
+                        metrics.metrics()
+                );
+                providerPluginMap.put(providerName, configProviderPlugin);
+            }
+            return new WorkerConfigTransformer(this, providerPluginMap);
+        } catch (Throwable t) {
+            providerPluginMap.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin"));
+            throw t;
+        }
     }
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)

23-30: Close the transformer in tests to exercise/verify plugin lifecycle

Add @AfterEach to close configTransformer; future-proofs tests if providers acquire resources.

@@
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
@@
     public void setup() {
         configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
     }
+
+    @AfterEach
+    public void tearDown() {
+        if (configTransformer != null) {
+            configTransformer.close();
+        }
+    }

Also applies to: 67-71

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)

46-52: Plugin-wrapped providers: wiring looks correct

Constructor and field migration to Map<String, Plugin> aligns with the Plugin API; passing the plugin map to ConfigTransformer is consistent with the PR direction. Consider guarding against external mutation by copying/wrapping the map.

Apply within this hunk:

-        this.configProviderPlugins = configProviderPlugins;
-        this.configTransformer = new ConfigTransformer(configProviderPlugins);
+        this.configProviderPlugins = java.util.Collections.unmodifiableMap(new java.util.HashMap<>(configProviderPlugins));
+        this.configTransformer = new ConfigTransformer(this.configProviderPlugins);

Confirm ConfigTransformer’s constructor now accepts Map<String, Plugin> to avoid a cross-module signature mismatch.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)

2904-2927: New test asserts provider metrics exposure — ensure init timing

The assertions read metrics immediately after Worker construction (without worker.start()). If Worker’s provider initialization ever moves to start(), this test could become flaky.

Options:

  • Call worker.start() and worker.stop() in this test, or
  • Add a small accessor/trigger that initializes provider plugins deterministically for tests.

Verify this test passes consistently across configurations (no reliance on side effects in the constructor).

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (1)

404-412: withPluginMetrics-after-configure ordering is validated

Creating a monitorable provider with Metrics and asserting instance type relies on overridden assertion for order — good. Consider also asserting at least one “plugins” metric exists to strengthen the test.

Example follow-up within this test:

  • Inspect new Metrics instance for a MetricName with group "plugins" and class tag CustomMonitorableConfigProvider.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and b92a2bb.

📒 Files selected for processing (12)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
🔇 Additional comments (10)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

26-26: Import looks correct for Plugin usage

Using org.apache.kafka.common.internals.Plugin aligns with the wrapped-provider approach introduced in this PR.

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)

98-105: LGTM: correct use of plugin.get() for provider access

Null-safe access and delegation to ConfigProvider#get(path, keys) via the wrapper looks good.

clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)

49-55: Tests updated appropriately for Plugin-wrapped providers

Wrapping TestConfigProvider with Plugin and switching to Map.of() reads clean and validates the new ConfigTransformer contract.

Also applies to: 63-68, 72-75, 79-82, 86-89, 93-98, 102-105

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)

101-101: Close the plugin, not just the provider — good

Closing the Plugin ensures both provider and associated plugin metrics are torn down. LGTM.

clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)

35-38: Validates PluginMetrics hookup

Using PluginMetrics.metricName and registering a measurable is appropriate for exercising plugin-scoped metrics in tests. LGTM.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)

382-389: Mocks return Plugin correctly

Wrapping MockFileConfigProvider with Plugin and tagging by provider=file matches the new API. Passing null Metrics intentionally avoids metric binding for this mock. LGTM.

Please confirm the intent is to skip plugin metrics for the file provider in these tests (null metrics).


2930-2935: Expected tag map helper — clear and reusable

Helper builds stable tag set for plugin metrics; matches Plugin.tags key/class plus extra. LGTM.


2937-2951: Metric assertions are precise

Filtering by group "plugins" and verifying name/description and exact tags gives strong coverage of PluginMetrics wiring. LGTM.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)

381-402: Adaptation to Plugin API

Using providerName and asserting on plugin.get() class while keeping classloader checks is correct. LGTM.


810-816: Custom monitorable provider for ordering check

Overriding withPluginMetrics to assert configured=true neatly validates configure-then-metrics sequence. LGTM.

Comment on lines +550 to 560
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Always close provider plugins on exceptions during variable resolution

If ConfigTransformer.transform(...) throws, providers aren’t closed, leaking resources. Wrap usage in try/finally and close in the finally block.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins =
+                instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+        try {
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Map<String, Plugin<ConfigProvider>> providerPlugins =
instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
try {
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
} finally {
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 550 to 560, instantiateConfigProviders is called and then
ConfigTransformer.transform(...) is invoked without guaranteeing provider
plugins are closed on exceptions; to fix, move the providerPlugins usage into a
try/finally: keep providerPlugins created before the try, call new
ConfigTransformer(providerPlugins) and transform(...) inside the try, apply
resolvedOriginals.putAll(...) only if result.data() is non-empty, and in the
finally block always iterate providerPlugins.values() and
Utils.closeQuietly(...) each to ensure plugins are closed even if transform
throws.

Comment on lines +65 to 69
* @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
this.configProviders = configProviders;
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix constructor @param Javadoc to reflect Plugin-wrapped providers

The parameter now accepts Map<String, Plugin>; the doc still mentions raw ConfigProvider instances.

- * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
+ * @param configProviderPlugins a Map of provider names to Plugin-wrapped {@link ConfigProvider} instances.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
* @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
this.configProviders = configProviders;
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}
* @param configProviderPlugins a Map of provider names to Plugin-wrapped {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
around lines 65 to 69, the constructor Javadoc @param incorrectly says the map
contains raw ConfigProvider instances; update the @param description to state it
accepts a Map of provider names to Plugin-wrapped ConfigProvider instances
(Map<String, Plugin<ConfigProvider>>), briefly mentioning that each value is a
Plugin<ConfigProvider> rather than a direct ConfigProvider to match the
constructor signature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants