Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Oct 4, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features
    • Added monitoring support for configuration providers, exposing provider-level metrics with standard tags.
  • Refactor
    • Migrated configuration provider handling to a plugin-based lifecycle for consistent setup, metrics, and cleanup.
  • Documentation
    • Clarified guidance for configuration provider implementations to enable metrics registration.
  • Tests
    • Added tests validating monitorable configuration providers and metric exposure.
  • Chores
    • Minor whitespace tweak in development tooling file (no functional impact).

@refacto-test
Copy link

refacto-test bot commented Oct 4, 2025

ConfigProvider Plugin Metrics Integration

TL;DR: Enhances ConfigProvider with metrics support by wrapping providers in Plugin class for monitoring capabilities.


Refacto PR Summary

Refactors ConfigProvider implementation to support metrics collection through the Plugin wrapper class.
This PR modifies the ConfigProvider interface to enable metrics monitoring by implementing the Monitorable interface. It wraps ConfigProvider instances in the Plugin class to standardize metrics collection with automatic tagging. The implementation changes how config providers are instantiated, configured, and managed throughout the codebase, particularly in the Worker and ConfigTransformer classes, ensuring proper metrics registration while maintaining backward compatibility.

Change Highlights

Click to expand
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java: Modified to use Plugin-wrapped ConfigProviders
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java: Added documentation for Monitorable implementation
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java: New test class for metrics-enabled providers
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: Updated to use Plugin-wrapped ConfigProviders with metrics
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: Modified newConfigProvider to return Plugin-wrapped instances
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: Updated to handle Plugin-wrapped ConfigProviders
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java: Updated for Plugin-wrapped providers

Sequence Diagram

sequenceDiagram
    participant Worker as Worker
    participant Plugins as Plugins
    participant ConfigProvider as ConfigProvider
    participant Plugin as Plugin
    participant Metrics as Metrics
    
    Worker->>Plugins: newConfigProvider(config, providerName, ClassLoaderUsage, metrics)
    Plugins->>Plugins: Load ConfigProvider class
    Plugins->>ConfigProvider: instantiate
    Plugins->>ConfigProvider: configure(configs)
    Plugins->>Plugin: wrapInstance(provider, metrics, config, tags)
    Note over Plugin: Add standard tags:<br>config, class, provider
    Plugin->>ConfigProvider: withPluginMetrics(metrics) if Monitorable
    Plugins-->>Worker: Return Plugin<ConfigProvider>
    Worker->>Worker: Store in providerPluginMap
Loading

Testing Guide

Click to expand
  1. Run the WorkerTest.testMonitorableConfigProvider() test to verify metrics registration for ConfigProviders
  2. Implement a custom ConfigProvider that implements Monitorable interface and verify metrics appear in JMX
  3. Check that standard tags are applied to all ConfigProvider metrics: "config", "class", and "provider"
  4. Verify that existing ConfigProviders continue to work correctly with the new Plugin wrapper
  5. Test that ConfigProvider lifecycle (configure, close) works properly with the Plugin wrapper

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant enhancement to how ConfigProviders are managed and monitored within Kafka Connect. By wrapping ConfigProvider instances in a new Plugin abstraction, the system can now automatically handle metric registration for providers that implement the Monitorable interface. This change allows for better observability into the behavior and performance of custom configuration providers, providing valuable insights into their operation and ensuring withPluginMetrics is called after configuration.

Highlights

  • Metric Integration for Config Providers: ConfigProvider implementations can now register metrics by implementing the Monitorable interface, allowing for better observability into their operation.
  • Plugin Wrapper for Config Providers: ConfigProvider instances are now wrapped in a Plugin object, standardizing their lifecycle management and enabling the new metric collection capabilities.
  • Enhanced Plugin Instantiation: The Plugins utility class has been updated to instantiate ConfigProviders as Plugin objects, facilitating automatic metric registration with appropriate tags during their creation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Oct 4, 2025

Walkthrough

Plugin-wrapped ConfigProvider replaces direct ConfigProvider usage across config resolution paths, adding metrics integration. Constructor signatures and maps update to Map<String, Plugin>. Worker/Plugins now create provider plugins with metrics, and closing logic targets plugins. Tests updated accordingly. One whitespace-only Vagrantfile change; Javadoc added.

Changes

Cohort / File(s) Summary
Vagrantfile whitespace
Vagrantfile
Inserted a single blank line; no functional change.
Common config plumbing
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java, clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
Switched from Map<String, ConfigProvider> to Map<String, Plugin>. Updated constructors, fields, resolution flow, and closing to operate on Plugin-wrapped providers.
ConfigProvider docs
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
Added Javadoc recommending Monitorable implementation and describing automatic metric tags. No API changes.
Common tests
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java, clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
Tests wrap providers with Plugin.wrapInstance and use Map.of. Added new test helper MonitorableConfigProvider implementing ConfigProvider and Monitorable with basic metric registration.
Connect runtime production
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
Migrated provider handling to Plugin. Plugins.newConfigProvider now returns Plugin and accepts Metrics; wraps instances and wires metrics. Init/close paths updated to operate on plugins. Transformer construction updated.
Connect runtime tests
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
Updated to use Plugin-wrapped providers and Map.of. Added tests for Monitorable providers and metrics tagging/integration. Introduced CustomMonitorableConfigProvider for metrics assertion.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Admin as Admin/Worker
  participant Plugins as Plugins
  participant Provider as ConfigProvider (impl)
  participant Plugin as Plugin<ConfigProvider>
  participant Metrics as Metrics
  participant WCT as WorkerConfigTransformer
  participant CT as ConfigTransformer

  Admin->>Plugins: newConfigProvider(config, providerName, classLoaderUsage, metrics)
  Plugins->>Provider: instantiate + configure
  Plugins->>Plugin: wrapInstance(provider, metrics, tags)
  Plugins-->>Admin: Plugin<ConfigProvider>

  Admin->>WCT: construct with Map{name -> Plugin<ConfigProvider>}
  WCT->>CT: new ConfigTransformer(providerPlugins)

  Admin->>CT: transform(configs)
  CT->>Plugin: get()
  Plugin-->>CT: ConfigProvider
  CT->>Provider: get(path, keys)
  Provider-->>CT: ConfigData
  CT-->>Admin: transformed configs

  Admin->>WCT: close()
  WCT->>Plugin: close() per provider
Loading
sequenceDiagram
  autonumber
  participant Plugins as Plugins
  participant MonProv as MonitorableConfigProvider
  participant Pn as Plugin<ConfigProvider>
  participant PM as PluginMetrics

  Plugins->>MonProv: configure(configs)
  Plugins->>Pn: wrapInstance(monProv, metrics, tags)
  note over Pn,MonProv: On wrap, withPluginMetrics is invoked
  Pn->>MonProv: withPluginMetrics(PluginMetrics)
  MonProv->>PM: register metric (config/class/provider tags)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I nibble through configs, hop plug-ins in tow,
Wrapping each provider with metrics aglow.
Tags on my whiskers, TTLs in the air,
Transformations ripple with careful hare flair.
Close the plugins softly—carrot-click, done!
Another sprint’s harvest under October’s sun. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The pull request description remains the default template placeholder and does not provide any details about the changes, objectives, or testing strategy as required by the repository template. Please replace the placeholder text with a detailed description that follows the repository template. Include a summary of what was modified and why, along with the testing strategy covering unit and/or integration tests. This information will guide reviewers and form the final commit message.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title Check ❓ Inconclusive The title “Clone kafka 18894” is vague and does not clearly summarize the key changes in this pull request, instead relying on an internal issue reference that is not informative to reviewers. Please update the PR title to a concise sentence describing the primary change, such as wrapping ConfigProvider instances in Plugin across Kafka and Connect components. This will give reviewers an immediate understanding of what the PR accomplishes. Avoid using issue numbers or generic terms that do not convey the change’s purpose.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@DDShantanuBadmanji
Copy link
Author

DDShantanuBadmanji commented Oct 4, 2025

/refacto-bot

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces metrics for ConfigProvider implementations by wrapping them in a Plugin object, which is a valuable addition for monitoring. The changes are consistently applied across the codebase, including refactoring existing code to use the Plugin<ConfigProvider> wrapper and adding new tests to verify the metrics integration. The code quality is high, and I've found one minor issue regarding metric tag consistency that I've commented on.

ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Plugin wrapper for the ConfigProvider is created without the provider metric tag. The updated Javadoc for ConfigProvider states that the provider tag will be automatically added to all registered metrics. Other parts of the codebase, such as connect.runtime.isolation.Plugins#newConfigProvider, correctly add this tag.

To ensure consistency and adhere to the documented behavior, the provider name, which is available as entry.getKey(), should be included as a tag.

Suggested change
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG, Map.of("provider", entry.getKey()));

@refacto-test
Copy link

refacto-test bot commented Oct 4, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Oct 4, 2025

Code Review: ConfigProvider Plugin Integration

👍 Well Done
Plugin Encapsulation

Properly wraps ConfigProvider instances in Plugin class for better resource management.

Metrics Integration

Added monitoring capability to ConfigProvider improves system observability.

📌 Files Processed
  • Vagrantfile
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
📝 Additional Comments
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
Resource Cleanup Pattern

The closeQuietly pattern on Plugin wrapper objects may hide resource leaks in the underlying ConfigProvider implementations. Consider adding explicit error logging within the Plugin wrapper's close method to improve visibility of resource cleanup issues while maintaining the quiet closure pattern.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Resource-Utilization
  • Resource-Management-Pattern
  • Error-Visibility-Standard
Resource Leak Risk

If provider.configure() throws an exception, the created provider instance may not be properly closed. This could lead to resource leaks when instantiating config providers. Consider using try-with-resources or ensuring proper cleanup on exceptions.

Standards:

  • CWE-772
  • OWASP-A06
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (4)
Error Handling Improvement

The method doesn't include specific error handling for Plugin.wrapInstance failures. Adding targeted exception handling would improve reliability by ensuring proper resource cleanup and more informative error messages when plugin wrapping fails.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Error-Handling
  • SRE-Error-Handling
Explicit Parameter Documentation

The new metrics parameter lacks documentation about its purpose and whether it can be null. Adding parameter documentation would improve maintainability by clarifying expected parameter usage, especially since the method now accepts null metrics.

Standards:

  • Clean-Code-Documentation
  • Maintainability-Quality-API-Design
Extract Constants

String literal "provider" is used directly in code. Extracting this to a named constant would improve maintainability by centralizing the definition and making future changes to the key name easier to implement consistently.

Standards:

  • Clean-Code-Naming
  • Maintainability-Quality-Constants
Potential Null Reference

The metrics parameter can be null as indicated by the method signature, but no null check is performed before passing it to Plugin.wrapInstance(). If metrics is null when a MonitorableConfigProvider is used, this could lead to NullPointerException during metrics registration.

Standards:

  • CWE-476
  • OWASP-A06
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
Defensive Null Checking

The code checks if providerPlugin is not null but doesn't verify that providerPlugin.get() returns non-null before calling methods on it. This could lead to NullPointerException if the plugin wrapper contains a null provider instance.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Defensive-Programming
  • SRE-Error-Handling

plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The plugin instance is wrapped without closing it if an exception occurs during wrapping. If Plugin.wrapInstance throws an exception, the already configured plugin won't be properly closed, potentially causing resource leaks. This violates proper resource management patterns.

        try {
            return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
        } catch (Exception e) {
            Utils.closeQuietly(plugin, "config provider");
            throw e;
        }
Commitable Suggestion
Suggested change
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
try {
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
} catch (Exception e) {
Utils.closeQuietly(plugin, "config provider");
throw e;
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Resource-Mgmt
  • SRE-Error-Handling

}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The code attempts to close Plugin objects directly, but Plugin doesn't implement Closeable. This should call x.get() to retrieve the actual ConfigProvider instance before closing. Without this change, ConfigProvider resources won't be properly released.

        providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Commitable Suggestion
Suggested change
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Standards
  • Logic-Verification-Resource-Management
  • Algorithm-Correctness-API-Usage
  • Business-Rule-Resource-Cleanup

plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Metrics Risk

The metrics parameter can be null (as seen in method signature allowing null) but is passed directly to Plugin.wrapInstance without null checking. This could cause NullPointerException when a ConfigProvider implementing Monitorable tries to use metrics, reducing system reliability.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition
  • SRE-Error-Handling

Comment on lines +67 to 69
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Resource Cleanup

The ConfigTransformer now accepts Plugin instead of ConfigProvider but doesn't update its close method. This inconsistency could lead to improper resource cleanup when ConfigTransformer is closed, potentially causing resource leaks in long-running applications.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Resource-Mgmt
  • SRE-Resource-Management

@BeforeEach
public void setup() {
configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Plugin Parameters

The Plugin.wrapInstance() call is missing the extraTags parameter that's used in other similar calls throughout the codebase. This inconsistency could lead to missing provider name tags in metrics, causing monitoring gaps for test providers.

Standards
  • Logic-Verification-API-Usage
  • Algorithm-Correctness-Parameter-Completeness

Comment on lines +632 to 634
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
String classPropertyName = providerPrefix + ".class";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Method Signature

Method signature changed from using providerPrefix to providerName, requiring internal reconstruction of the prefix. This creates inconsistency with previous implementation and increases maintenance complexity when caller code needs modification.

Standards
  • Clean-Code-Method-Design
  • Maintainability-Quality-Consistency

Comment on lines +631 to +632
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Security Validation

The provider instance is wrapped without validation before usage. Malicious providers could be instantiated and wrapped without proper security checks, potentially allowing untrusted code execution. This creates a security risk when loading external config providers.

Standards
  • CWE-20
  • OWASP-A03
  • NIST-SSDF-PW.1

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 5989a26.

📒 Files selected for processing (13)
  • Vagrantfile (1 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)
🔇 Additional comments (3)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3)

21-21: LGTM!

The import of Plugin and the field type change from Map<String, ConfigProvider> to Map<String, Plugin<ConfigProvider>> are correct and align with the broader refactoring to enable metrics integration for config providers.

Also applies to: 60-60


65-68: LGTM!

The constructor signature and Javadoc updates are correct. The parameter name change from configProviders to configProviderPlugins improves clarity and consistency with the new Plugin-based approach.


98-104: LGTM!

The changes to retrieve and use the Plugin-wrapped ConfigProvider are correct:

  • Line 98: Retrieves Plugin<ConfigProvider> from the map
  • Line 100: Null check on the plugin wrapper (correct scope)
  • Line 104: providerPlugin.get().get(path, keys) properly unwraps the plugin to access the ConfigProvider instance before calling its get method

The logic is sound and maintains the same behavior while enabling metrics support through the Plugin wrapper.

Comment on lines +631 to 633
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
} catch (ClassNotFoundException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix plugin tag key to avoid collapsing metrics

All provider plugins are wrapped with the constant key config.providers, so the resulting metrics tags become identical (config=config.providers, class=<providerSimpleName>). When the same provider class is configured under multiple aliases, their metrics now collide, hiding per-provider signals. Pass the provider alias (or another unique identifier) instead of the constant key so each plugin keeps distinct tags.

Apply this diff:

-                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, entry.getKey());
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 631 to 633, the plugin wrapper is being created with the constant
CONFIG_PROVIDERS_CONFIG which collapses metrics for different aliases into the
same tag; change the call to Plugin.wrapInstance(...) to pass the provider alias
(e.g., entry.getKey()) or another unique identifier instead of
CONFIG_PROVIDERS_CONFIG so each provider instance keeps distinct metric tags
(ensure any corresponding map keys or subsequent uses expect the alias).

Comment on lines 274 to 282
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Filter out missing config provider plugins

plugins.newConfigProvider(...) returns null when the provider name lacks a <name>.class declaration. By unconditionally inserting that null we hand ConfigTransformer a null plugin (and later try to close it), which leads to an immediate NullPointerException instead of the previous “just skip it” behavior. Please restore the null check before adding to the map.

-            providerPlugins.put(name, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPlugins.put(name, configProviderPlugin);
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
for (String name : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
if (configProviderPlugin != null) {
providerPlugins.put(name, configProviderPlugin);
}
}
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 274 to 282, the loop unconditionally inserts the result of
plugins.newConfigProvider(...) into providerPlugins, but newConfigProvider can
return null when the provider lacks a <name>.class entry; restore a null check
before putting the plugin into providerPlugins (i.e., if the returned
Plugin<ConfigProvider> is null, skip adding it and optionally log a debug/warn
message) so ConfigTransformer never receives a null plugin and later close() is
not invoked on null.

Comment on lines +215 to 221
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Skip null config provider plugins

Plugins.newConfigProvider returns null when the provider name is configured without a corresponding <name>.class. We now insert that null into providerPluginMap, so the first transform attempt dereferences a null plugin and blows up (previous code skipped these). Restore the guard so only successfully-instantiated providers are added.

-            providerPluginMap.put(providerName, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPluginMap.put(providerName, configProviderPlugin);
+            }
🤖 Prompt for AI Agents
In connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
around lines 215 to 221, the code unconditionally inserts the result of
Plugins.newConfigProvider into providerPluginMap even when that method returns
null; restore the previous null-guard so only non-null configProviderPlugin
instances are put into providerPluginMap (i.e., check if configProviderPlugin !=
null before providerPluginMap.put(...) and otherwise skip/continue, optionally
logging a debug/info message that the named provider had no class configured).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants