Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Oct 4, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features
    • Config providers can now expose metrics when implementing a monitorable interface, enabling better observability of provider health and usage.
  • Documentation
    • Updated guidance to note that config providers may register metrics with standard tags.
  • Refactor
    • Internal handling of config providers now uses a plugin wrapper to standardize lifecycle and metrics integration. No user-visible behavior changes expected.
  • Tests
    • Expanded test coverage for config provider metrics and updated tests to align with the new plugin-based handling.

@refacto-test
Copy link

refacto-test bot commented Oct 4, 2025

⚠️ Subscription Expired

Your subscription expired on 10/2/2025.

To continue using Refacto code reviews, please renew or upgrade your subscription.

@coderabbitai
Copy link

coderabbitai bot commented Oct 4, 2025

Walkthrough

The change replaces direct ConfigProvider usage with Plugin-wrapped providers across clients and Connect. Constructors, fields, and wiring now accept Map<String, Plugin>. Metrics integration is added for monitorable providers. Tests are updated accordingly. Minor Vagrantfile formatting and Javadoc updates included.

Changes

Cohort / File(s) Summary
Housekeeping
Vagrantfile
Added a blank line and a comment; no behavioral change.
Clients — Config provider pluginization
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java, clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
Switched from raw ConfigProvider to Plugin<ConfigProvider> for instantiation, wiring, and transformation. Updated constructor, fields, internal maps, lookups, and close paths accordingly.
Clients — ConfigProvider docs
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
Javadoc mentions optional Monitorable implementation and default metric tags; no code changes.
Clients — Tests
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java, clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
Tests wrap providers with Plugin.wrapInstance and use Map.of. Added MonitorableConfigProvider test class with metrics hook and stubbed methods.
Connect — Runtime core
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
Refactored to use Plugin<ConfigProvider> throughout. Plugins.newConfigProvider now returns Plugin<ConfigProvider> and accepts providerName and Metrics. Worker and WorkerConfigTransformer updated to pass/hold plugin maps and to close plugins. MirrorMakerConfig builds transformers with plugin maps.
Connect — Runtime tests
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
Tests updated for Plugin<ConfigProvider> signatures, Map.of usage, and metrics assertions. Added tests for monitorable provider metrics and PluginMetrics callback ordering; adjusted invocations and assertions to use plugin.get().

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor App
  participant AC as AbstractConfig
  participant CT as ConfigTransformer
  participant PMap as Map<String, Plugin<ConfigProvider>>
  participant P as Plugin<ConfigProvider>
  participant CP as ConfigProvider

  App->>AC: resolveConfigVariables(...)
  AC->>AC: instantiateConfigProviders -> PMap
  AC->>CT: new ConfigTransformer(PMap)
  AC->>CT: transform(configs)
  CT->>PMap: find providerPlugin by name
  alt provider found
    CT->>P: get()
    P-->>CT: CP
    CT->>CP: get(path, keys)
    CP-->>CT: ConfigData
    CT-->>AC: transformed configs
  else provider missing
    CT-->>AC: error for missing provider
  end
  AC->>PMap: close plugins
Loading
sequenceDiagram
  autonumber
  actor Admin
  participant W as Worker
  participant Pl as Plugins
  participant Met as Metrics
  participant P as Plugin<ConfigProvider>
  participant WCT as WorkerConfigTransformer
  participant CT as ConfigTransformer

  Admin->>W: initConfigTransformer()
  loop for each providerName
    W->>Pl: newConfigProvider(config, providerName, usage, Met)
    Pl->>Pl: load & configure ConfigProvider
    Pl->>P: Plugin.wrapInstance(provider, Met, providersPrefix, {provider: name})
    Pl-->>W: P
  end
  W->>WCT: new WorkerConfigTransformer(W, Map<name,P>)
  WCT->>CT: new ConfigTransformer(Map<name,P>)
  Admin-->>W: ready
Loading
sequenceDiagram
  autonumber
  participant Mon as MonitorableConfigProvider
  participant PM as PluginMetrics
  note over Mon,PM: Metrics registration path
  Mon->>Mon: configure(configs)
  Mon->>PM: withPluginMetrics(metrics)
  PM-->>Mon: register metric {name, description, tags}
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I nibble wires, hop through code,
Wrap providers in a plugin mode.
Metrics bloom like clover bright,
Worker hums through day and night.
Map.of trails where singles stood—
Thump! A test that proves it’s good.
Carrot commits? Crunch—understood. 🥕🐇

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The pull request description is left as the unedited template text and does not provide any summary of the changes, rationale, or testing strategy, so it fails to inform reviewers about the scope and intent of the patch. Please replace the placeholder text with a detailed description that summarizes the refactoring of ConfigProvider usage into Plugin wrappers, documents any API changes, and outlines your testing strategy and results.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title Check ❓ Inconclusive The title “Clone kafka 18894” is generic and does not describe the primary change in the pull request, making it unclear to reviewers what the patch actually does. Please update the title to clearly and concisely reflect the main change, for example “Migrate ConfigProvider to use Plugin wrappers” so that reviewers can immediately understand the purpose of this PR.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances Kafka Connect's configuration provider mechanism by introducing the capability for ConfigProvider implementations to expose metrics. This is achieved through the integration of a new Plugin wrapper, which standardizes the instantiation, configuration, and lifecycle management of these providers, including the crucial step of metric registration. This change aims to provide greater insight into the operational aspects of custom configuration providers within the Kafka ecosystem.

Highlights

  • Metric Monitoring for Config Providers: ConfigProvider implementations can now register metrics by implementing the Monitorable interface, allowing for better observability into their performance and behavior.
  • Plugin Wrapper Integration: ConfigProvider instances are now consistently wrapped in a new Plugin object across various components, standardizing their lifecycle management and enabling metric collection.
  • API Updates for Plugin Management: The Plugins.newConfigProvider method signature has been updated to return a Plugin<ConfigProvider> and accept a Metrics instance, facilitating the new metric registration mechanism.
  • Enhanced Test Coverage: New and updated tests ensure that the metric registration and Plugin wrapping work as expected, including verifying the correct order of configure and withPluginMetrics calls for monitorable plugins.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces metrics for ConfigProvider implementations by wrapping them in a new Plugin class and leveraging the Monitorable interface. The changes are well-implemented across the codebase, including updates to AbstractConfig, ConfigTransformer, and various components in the Connect runtime. I've found one issue in AbstractConfig where the provider name is not being passed as a metric tag, which could lead to metric collisions. My detailed feedback is in the comment below.

ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When wrapping the ConfigProvider instance, the provider name should be included as a tag to ensure unique metrics when multiple providers of the same class are used. Without this, metrics from different provider instances of the same class could collide.1

Suggested change
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG, Map.of("provider", entry.getKey()));

Style Guide References

Footnotes

  1. The Javadoc for ConfigProvider, updated in this same pull request, states that the 'provider' tag is automatically added to all registered metrics. This change ensures that the provider name is correctly passed to be used as a tag, aligning with the documented behavior and preventing metric collisions.

@DDShantanuBadmanji
Copy link
Author

/refacto-bot

@refacto-test
Copy link

refacto-test bot commented Oct 4, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 5989a26.

📒 Files selected for processing (13)
  • Vagrantfile (1 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (4)
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala (1)
  • MockFileConfigProvider (1636-1641)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
🔇 Additional comments (3)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (3)

20-20: LGTM! Plugin wrapping aligns with the migration.

The addition of the Plugin import and wrapping of TestConfigProvider correctly aligns with the broader migration from direct ConfigProvider usage to Plugin. Passing null for metrics is appropriate for test scenarios, as the Plugin class handles this gracefully by skipping metrics registration.

Also applies to: 49-49


54-54: LGTM! Modern map construction improves readability.

Replacing Collections.singletonMap with Map.of is a cleaner, more concise approach for creating immutable single-entry maps with non-null values. This modernizes the test code without changing semantics.

Also applies to: 63-63, 72-72, 79-79, 86-86, 93-93, 102-102


109-109: LGTM! Correctly retains Collections.singletonMap for null value.

This test case must use Collections.singletonMap instead of Map.of because Map.of does not permit null values. The retention is correct and necessary for testing null config values.

Comment on lines +41 to +48
public ConfigData get(String path) {
return null;
}

@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return empty ConfigData instead of null

ConfigTransformer calls ConfigProvider#get(...).data() and will throw an NPE when these helpers hand back null. Please return an empty ConfigData instead so the tests stay robust.

Apply this diff:

     @Override
     public ConfigData get(String path) {
-        return null;
+        return new ConfigData(Map.of());
     }

     @Override
     public ConfigData get(String path, Set<String> keys) {
-        return null;
+        return new ConfigData(Map.of());
     }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
around lines 41 to 48, the two get(...) methods currently return null which
causes callers to NPE; change both methods to return an empty ConfigData
instance (i.e., a ConfigData whose data map is empty) instead of null so callers
can safely call .data() without checks.

Comment on lines 274 to 286
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Skip null config provider plugins

Plugins.newConfigProvider(...) returns null when config.providers.<name>.class is absent. With the new loop we still insert that null, so ConfigTransformer trips over a NullPointerException when it dereferences the plugin. Please keep the old guard and only put non-null plugins into the map.

Apply this diff:

         for (String name : providerNames) {
             Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                     this,
                     name,
                     Plugins.ClassLoaderUsage.PLUGINS,
                     null
             );
-            providerPlugins.put(name, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPlugins.put(name, configProviderPlugin);
+            }
         }
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 274 to 286, the loop currently inserts null entries into
providerPlugins because Plugins.newConfigProvider(...) can return null; update
the loop to only call providerPlugins.put(name, configProviderPlugin) when
configProviderPlugin is non-null (i.e., guard with an if (configProviderPlugin
!= null) check) so ConfigTransformer does not receive null values, and ensure
that later cleanup/close logic only iterates over and closes the non-null plugin
instances.

Comment on lines 210 to 221
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard against null config provider plugins

Plugins.newConfigProvider(...) returns null when a provider name has no .class configured. We add that null to the map, and later WorkerConfigTransformer dereferences it, producing an NPE during worker startup. Please keep the null check and only register plugins that actually loaded.

Apply this diff:

         for (String providerName : providerNames) {
             Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                 config,
                 providerName,
                 ClassLoaderUsage.PLUGINS,
                 metrics.metrics()
             );
-            providerPluginMap.put(providerName, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPluginMap.put(providerName, configProviderPlugin);
+            }
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
for (String providerName : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
if (configProviderPlugin != null) {
providerPluginMap.put(providerName, configProviderPlugin);
}
}
🤖 Prompt for AI Agents
In connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
around lines 210 to 221, the loop unconditionally adds the result of
plugins.newConfigProvider(...) to providerPluginMap even though
newConfigProvider can return null when a provider has no .class configured; this
leads to an NPE later. Fix by checking the returned configProviderPlugin for
null and only put it into providerPluginMap if non-null (optionally log or warn
when a provider name yields null) so WorkerConfigTransformer will only receive
valid plugins.

@refacto-test
Copy link

refacto-test bot commented Oct 4, 2025

Code Review: ConfigProvider Plugin Metrics Integration

👍 Well Done
Plugin Architecture Implementation

Clean implementation of metrics integration for config providers using plugin architecture.

Resource Management

Proper resource cleanup with Utils.closeQuietly prevents resource leaks in provider implementations.

📌 Files Processed
  • Vagrantfile
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
📝 Additional Comments
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3)
Null Metrics Handling

The metrics parameter can be null as shown in method signature, but there's no null check before passing it to Plugin.wrapInstance(). This could lead to inconsistent behavior where some providers receive metrics capabilities while others don't, based on caller's implementation.

Standards:

  • Logic-Verification-Null-Safety
  • Algorithm-Correctness-Defensive-Programming
  • Business-Rule-Consistency
Error Message Clarity

Error message lacks context about which class failed to load. A more specific message including the className variable would improve troubleshooting. This makes maintenance more difficult when diagnosing configuration issues.

Standards:

  • Clean-Code-Error-Handling
  • Maintainability-Quality-Diagnostics
Metrics Null Handling

The method accepts a nullable Metrics parameter but doesn't handle the null case explicitly. If null metrics are passed, this could cause unexpected behavior when the Plugin attempts to register metrics, potentially leading to NullPointerException.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
Explicit Plugin Parameters

The Plugin.wrapInstance() call uses null for metrics parameter without explanation. This creates maintenance ambiguity about whether metrics should be provided. Consider adding a comment explaining why metrics are null here or refactor to use a consistent approach.

Standards:

  • Clean-Code-Clarity
  • Maintainability-Quality-Intent
Map Initialization Size

Creating a HashMap without an initial capacity can lead to resizing operations when elements are added. Consider specifying an initial capacity based on the expected number of providers to avoid rehashing operations and improve performance.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Resource-Utilization
  • Optimization-Pattern-Collection-Sizing
  • Memory-Allocation-Optimization
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
Metrics Caching Opportunity

Creating MetricName objects in the withPluginMetrics method without caching them could lead to repeated object creation if the method is called frequently. Consider caching the MetricName in a class field to reduce object creation and GC pressure.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Resource-Utilization
  • Optimization-Pattern-Object-Reuse
  • Memory-Allocation-Optimization
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)
Plugin Metrics Validation

The metrics validation only checks for exact matches of metric name and description. A more robust approach would verify the metric value is as expected, ensuring the plugin metrics integration functions correctly under various conditions.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Observability
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)
Metrics Security Context

Documentation indicates metrics are registered with provider name tags that could expose sensitive configuration provider identifiers. In security-sensitive environments, metric names might reveal internal system architecture details useful for targeted attacks. Consider adding guidance on securing sensitive provider names in metrics.

Standards:

  • CWE-200
  • OWASP-A04
  • NIST-SSDF-PW.1

}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The closeQuietly method is called on the Plugin wrapper rather than the ConfigProvider instance. This changes the resource management logic, as the Plugin.close() method may not properly delegate to the wrapped ConfigProvider's close() method, potentially causing resource leaks.

Standards
  • Logic-Verification-Resource-Management
  • Business-Rule-Lifecycle-Management
  • Algorithm-Correctness-Delegation-Pattern

Comment on lines 632 to 649
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
String classPropertyName = providerPrefix + ".class";
Map<String, String> originalConfig = config.originalsStrings();
if (!originalConfig.containsKey(classPropertyName)) {
// This configuration does not define the config provider via the specified property name
return null;
}

ConfigProvider plugin = newVersionedPlugin(config, classPropertyName, null, ConfigProvider.class, classLoaderUsage, scanResult.configProviders());

// Configure the ConfigProvider
String configPrefix = providerPrefix + ".param.";
Map<String, Object> configProviderConfig = config.originalsWithPrefix(configPrefix);
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Method Naming

Method signature changed from newConfigProvider to return Plugin but still uses 'plugin' variable name for the unwrapped instance. This creates confusion between the wrapped and unwrapped objects, making maintenance harder. Variable names should reflect their actual types.

Standards
  • Clean-Code-Naming
  • Maintainability-Quality-Consistency
  • Clean-Code-Variable-Naming

Comment on lines 647 to +649
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Variable Assignment

The configProviderConfig variable is used but never defined in the method. This will cause a compilation error and indicates incomplete refactoring. The code needs to extract configuration properties from the provided config parameter.

          Map<String, Object> configProviderConfig = config.originalsWithPrefix(providerPrefix + ".");
          try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
              plugin.configure(configProviderConfig);
          }
Commitable Suggestion
Suggested change
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Map<String, Object> configProviderConfig = config.originalsWithPrefix(providerPrefix + ".");
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
Standards
  • Clean-Code-Variable-Usage
  • Maintainability-Quality-Completeness

Comment on lines +69 to +75
configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
}

@Test
public void testReplaceVariable() {
// Execution
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKey}"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Map Creation

The code inconsistently uses Map.of() in line 69 but still uses Collections.singletonMap() in line 75. This creates maintenance confusion with mixed styles. Standardize on the more modern Map.of() approach throughout the codebase for consistency.

Standards
  • Clean-Code-Consistency
  • Maintainability-Quality-Modernization

Comment on lines 41 to +42
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.config.provider.MonitorableConfigProvider;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused Import

The ConfigProvider import is added but not directly used in the modified code. Unused imports increase maintenance burden by creating false dependencies. Remove unnecessary imports to improve code clarity and maintainability.

Standards
  • Clean-Code-Imports
  • Maintainability-Quality-Clarity

}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Cleanup Risk

The closeQuietly method is now called on Plugin objects rather than ConfigProvider instances directly. This could lead to improper resource cleanup if the Plugin wrapper doesn't properly delegate close operations to the wrapped provider.

Standards
  • ISO-IEC-25010-Reliability-Recoverability
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Resource-Mgmt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants