Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Sep 29, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Config providers now expose metrics when monitorable, improving observability for variable resolution in Connect and MirrorMaker.
    • Metrics include standard tags for easier filtering and analysis.
  • Documentation

    • Clarified that config providers can register metrics and which tags are included.
  • Refactor

    • Adopted plugin-wrapped config providers with improved lifecycle management and cleanup.
  • Tests

    • Added and updated tests to validate metrics propagation, plugin wrapping, and provider behavior.

@coderabbitai
Copy link

coderabbitai bot commented Sep 29, 2025

Walkthrough

Refactors config provider handling to use Plugin-wrapped ConfigProvider instances across core, Connect runtime, and MirrorMaker. Updates constructor and field types, resource cleanup, and instantiation paths to pass Plugin with metrics. Adds a monitorable provider test implementation and extends tests to validate plugin wrapping and metrics tagging.

Changes

Cohort / File(s) Summary
Core config resolution
clients/.../common/config/AbstractConfig.java, clients/.../common/config/ConfigTransformer.java, clients/.../common/config/provider/ConfigProvider.java
Switch maps from Map<String, ConfigProvider> to Map<String, Plugin<ConfigProvider>>, adjust constructors/usages, unwrap via plugin.get() during transform, update close messages, and add Javadoc about Monitorable metrics.
Connect runtime wiring
connect/.../runtime/Worker.java, connect/.../runtime/WorkerConfigTransformer.java, connect/.../runtime/isolation/Plugins.java
Propagate Plugin-wrapped providers through Worker and WorkerConfigTransformer; Plugins.newConfigProvider(...) now returns Plugin<ConfigProvider>, accepts Metrics, derives prefix internally, configures provider, then wraps with metrics tags.
MirrorMaker config
connect/mirror/.../MirrorMakerConfig.java
Adopts Plugin-wrapped config providers and passes them to ConfigTransformer; updates instantiation and cleanup.
Unit tests: core and runtime
clients/.../common/config/ConfigTransformerTest.java, connect/.../runtime/WorkerConfigTransformerTest.java, connect/.../runtime/WorkerTest.java, connect/.../runtime/isolation/PluginsTest.java
Update tests to use Plugin.wrapInstance(...), adapt assertions to unwrap via get(), migrate to Map.of(...), add metrics-based assertions, and new tests for monitorable providers.
Test support provider
clients/.../common/config/provider/MonitorableConfigProvider.java
Adds a monitorable test provider implementing ConfigProvider and Monitorable, registering a constant metric and tracking configuration state.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Admin as Admin/Worker
  participant Plugins as Plugins
  participant ProvPlg as Plugin<ConfigProvider>
  participant Prov as ConfigProvider
  participant Metrics as Metrics

  Admin->>Plugins: newConfigProvider(config, providerName, classLoaderUsage, metrics)
  Plugins->>Plugins: Load class via plugin loader
  Plugins->>Prov: configure(configSubset)
  Plugins->>Prov: withPluginMetrics(PluginMetrics) (if Monitorable)
  Plugins->>ProvPlg: wrapInstance(Prov, metrics, {config:providers, provider:providerName, class:Prov})
  Plugins-->>Admin: Plugin<ConfigProvider>
  note over Admin,ProvPlg: Caller stores Plugin-wrapped provider for later use
Loading
sequenceDiagram
  autonumber
  participant X as Caller (AbstractConfig/Worker/MirrorMaker)
  participant CT as ConfigTransformer
  participant Map as Map<String, Plugin<ConfigProvider>>
  participant ProvPlg as Plugin<ConfigProvider>
  participant Prov as ConfigProvider

  X->>CT: new ConfigTransformer(Map)
  X->>CT: transform(configs)
  CT->>Map: find provider by name
  Map-->>CT: Plugin<ConfigProvider>
  CT->>ProvPlg: get()
  ProvPlg-->>CT: ConfigProvider
  CT->>Prov: get(path, keys)
  Prov-->>CT: ConfigData(values, ttl)
  CT-->>X: Transformed configs + TTLs
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45–90 minutes

Poem

I hopped through plugins, neat and spry,
Wrapped providers with a metric’s eye.
Unwrap to fetch, then wrap to close,
A tidy warren where config flows.
With tags that tell the who and why—
Thump! The tests all pass. Oh my! 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone kafka 18894” does not describe the actual code changes in this pull request and instead references an action or issue number without conveying any meaningful information about the substantive modifications, making it misleading and unhelpful for reviewers or future readers. Please update the title to a concise, descriptive sentence that highlights the primary change—such as how ConfigProvider usage was refactored to use Plugin wrappers—so that it accurately reflects the core purpose of the pull request.
Description Check ⚠️ Warning The pull request description still contains the placeholder template text and lacks any details about what was changed, why the changes were made, or how they were tested, leaving reviewers without critical context or rationale for the updates. Replace the template placeholder with a detailed summary of the changes (including the shift from ConfigProvider to Plugin), explain the motivation for the refactor, list any impacted components or classes, and describe the testing strategy (unit or integration tests) used to validate the new behavior.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant refactoring to how ConfigProvider instances are managed within Kafka Connect by integrating them with a generic Plugin wrapper. This change aims to standardize the lifecycle of configuration providers and, crucially, enables a robust mechanism for these providers to register and expose metrics. This enhancement improves the observability of custom configuration providers by automatically attaching relevant tags to their metrics, such as the provider's name and class. The ConfigTransformer and various worker components have been adapted to utilize this new Plugin abstraction, with comprehensive tests added to ensure the correct operation of the metrics registration feature.

Highlights

  • Introduction of Plugin Wrapper for ConfigProvider: ConfigProvider instances are now wrapped in a generic Plugin object, standardizing their lifecycle management and enabling consistent handling across the system.
  • Metrics Integration for ConfigProviders: The new Plugin wrapper facilitates the registration of metrics for ConfigProviders that implement the Monitorable interface, with automatic tagging for config, class, and provider for enhanced observability.
  • Refactoring of ConfigProvider Instantiation: Methods responsible for instantiating ConfigProviders in AbstractConfig, MirrorMakerConfig, and Worker have been updated to return Map<String, Plugin<ConfigProvider>> and manage the wrapping process.
  • Updated ConfigTransformer Logic: The ConfigTransformer has been modified to operate directly with Plugin<ConfigProvider> instances, retrieving the underlying provider when configuration transformation is required.
  • New MonitorableConfigProvider for Testing: A new test class, MonitorableConfigProvider, has been added to demonstrate and verify the proper functioning of the new metrics registration capabilities for ConfigProvider plugins.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the ConfigProvider instantiation to wrap it in a Plugin object, which enables metrics for ConfigProvider implementations that are Monitorable. The changes are consistently applied across the codebase, including client code, Connect runtime, and tests. The introduction of Plugin.wrapInstance centralizes the logic for handling metrics for plugins, which is a good architectural improvement. I've found a couple of minor issues in the new tests related to resource management that should be addressed.

Comment on lines +391 to +401
Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
config,
providerPrefix,
ClassLoaderUsage.PLUGINS
providerName,
ClassLoaderUsage.PLUGINS,
null
);

assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(plugin);
assertPluginClassLoaderAlwaysActive(plugin.get());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Plugin<ConfigProvider> returned by newConfigProvider is an AutoCloseable resource, but it is not being closed in this test. This can lead to resource leaks. It's a good practice to ensure all closeable resources are closed, even in tests.

Suggested change
Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
config,
providerPrefix,
ClassLoaderUsage.PLUGINS
providerName,
ClassLoaderUsage.PLUGINS,
null
);
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(plugin);
assertPluginClassLoaderAlwaysActive(plugin.get());
try (Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
null
)) {
assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(plugin.get());
}

Comment on lines +410 to +411
Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Plugin<ConfigProvider> and Metrics objects are AutoCloseable resources that are not being closed in this test. This can lead to resource leaks. Using a try-with-resources block will ensure they are properly closed.

        try (Metrics metrics = new Metrics();
             Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, metrics)) {
            assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
        }

@DDShantanuBadmanji
Copy link
Author

/refacto-test

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

598-640: Ensure provider is closed if configure() fails

If provider.configure throws, the raw provider (not yet wrapped) isn’t closed. Close it on failure for parity with getConfiguredInstance().

-            try {
+            try {
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
-                provider.configure(configProperties);
+                try {
+                    provider.configure(configProperties);
+                } catch (Exception e) {
+                    Utils.maybeCloseQuietly(provider, "config provider constructed during failed configure");
+                    throw e;
+                }
                 Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
                 configProviderPluginInstances.put(entry.getKey(), providerPlugin);
             } catch (ClassNotFoundException e) {
🧹 Nitpick comments (8)
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)

32-35: Clarify how the “provider” tag is added

The Plugin wrapper auto-adds config and class tags; the provider tag is supplied by the caller (e.g., via Plugins.newConfigProvider/Plugin.wrapInstance extraTags). Suggest updating the text to avoid implying it’s always automatic.

Proposed Javadoc tweak:

- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
- * The following tags are automatically added to all metrics registered: <code>config</code> set to
- * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
- * and <code>provider</code> set to the provider name.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
+ * When wrapped with the plugin infrastructure, metrics registered via {@code PluginMetrics} include:
+ * <code>config</code>=<code>config.providers</code> and <code>class</code>=the provider’s simple class name.
+ * A <code>provider</code> tag may also be present when the caller supplies it (e.g., via extra tags during wrapping).
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)

2937-2950: Make tag assertions tolerant of default tags

MetricName.tags() may include default tags from MetricsContext. Matching on exact map equality can be brittle. Prefer subset matching.

Apply:

-            Map<String, String> tags = metricName.tags();
-            if (expectedTags.equals(tags)) {
+            Map<String, String> tags = metricName.tags();
+            if (tags.entrySet().containsAll(expectedTags.entrySet())) {
                 assertEquals(MonitorableConfigProvider.NAME, metricName.name());
                 assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description());
                 found++;
             }
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)

379-402: Close Plugin in tests to avoid leaking provider/metrics resources

Wrap the returned Plugin in try-with-resources so plugin metrics and any AutoCloseable provider resources are closed.

-        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
-            config,
-            providerName,
-            ClassLoaderUsage.PLUGINS,
-            null
-        );
-
-        assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
-        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
-        assertTrue(samples.containsKey("configure"));
-        assertPluginClassLoaderAlwaysActive(plugin.get());
+        try (Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
+                config,
+                providerName,
+                ClassLoaderUsage.PLUGINS,
+                null)) {
+            assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
+            Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
+            assertTrue(samples.containsKey("configure"));
+            assertPluginClassLoaderAlwaysActive(plugin.get());
+        }

404-412: Also close Metrics and Plugin in the monitorable-provider test

Use try-with-resources for both Metrics and Plugin to avoid background thread/resource leaks in tests.

-        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
-        assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+        try (Metrics m = new Metrics();
+             Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, m)) {
+            assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+        }
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (2)

60-69: Plugin‑wrapped providers wiring LGTM; tighten Javadoc and drop redundant null‑check

  • The switch to Map<String, Plugin> and use of providerPlugin.get() is correct.
  • Javadoc still says “Map of … ConfigProvider instances”; suggest clarifying they are Plugin‑wrapped.
  • keysByPath is never null in this loop; drop the extra check.

Apply:

-     * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
+     * @param configProviderPlugins a Map of provider names to Plugin‑wrapped {@link ConfigProvider} instances.
-            if (providerPlugin != null && keysByPath != null) {
+            if (providerPlugin != null) {

Optional micro‑opt (avoid repeated get()):

-            if (providerPlugin != null) {
-                for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
+            if (providerPlugin != null) {
+                ConfigProvider provider = providerPlugin.get();
+                for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
...
-                    ConfigData configData = providerPlugin.get().get(path, keys);
+                    ConfigData configData = provider.get(path, keys);

Also applies to: 98-105


104-113: Defensive null‑handling for provider data to prevent NPEs in replace()

If a misbehaving ConfigProvider returns ConfigData with a null data() map, later replace() will NPE on keyValues.get(...). Make this robust:

-                    ConfigData configData = providerPlugin.get().get(path, keys);
-                    Map<String, String> data = configData.data();
+                    ConfigData configData = providerPlugin.get().get(path, keys);
+                    Map<String, String> data = configData.data() != null ? configData.data() : java.util.Collections.emptyMap();

Note: using fully‑qualified Collections avoids a new import.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)

99-102: Close() should also cancel pending scheduled restarts to avoid leaks/shutdown surprises

Outstanding HerderRequest entries in requests may still fire after shutdown. Cancel and clear before closing plugins.

     @Override
     public void close() {
-        configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        // cancel any scheduled restarts
+        requests.values().forEach(m -> m.values().forEach(HerderRequest::cancel));
+        requests.clear();
+        // then close provider plugins
+        configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
     }

76-84: If TTL keys become provider‑scoped, requests keying will remain stable

If you adopt providerName:path TTL keys (see ConfigTransformer suggestion), WorkerConfigTransformer will continue to treat the key opaquely and avoid cross‑provider cancellation collisions.

No code change required here if the upstream TTL key is updated, but consider logging the key to aid debugging:

-            log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
+            log.info("Scheduling a restart of connector {} for key '{}' in {} ms", connectorName, path, ttl);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and b92a2bb.

📒 Files selected for processing (12)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (9)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (4)
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala (1)
  • MockFileConfigProvider (1636-1641)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
🔇 Additional comments (10)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)

49-50: LGTM: tests aligned to Plugin-wrapped providers and Map.of

  • Using Plugin.wrapInstance in setup is correct for the new ConfigTransformer API.
  • Map.of conversions simplify setup and assertions; TTL expectations remain correct.

Also applies to: 54-59, 63-68, 72-75, 79-82, 86-98

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)

382-390: Good: provider plugin wraps with explicit “provider” tag

Returning a Plugin with extraTags ensures consistent tagging and mirrors production behavior. No issues.


2904-2927: Ensure the test triggers provider instantiation

This test asserts metrics without calling worker.start(). If provider creation ever moves out of the constructor, this could become flaky. Consider calling worker.start() or verifying plugins.newConfigProvider(...) was invoked to make the test robust.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)

69-70: LGTM: updated to Plugin-wrapped provider and Map.of

  • Plugin.wrapInstance in setup matches the new WorkerConfigTransformer API.
  • Returning ConfigData via Map.of simplifies the provider; behavior unchanged.

Also applies to: 75-76, 115-116, 122-123, 151-158

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

273-286: Switch to Plugin looks correct; confirm metrics intent

Provider plugins are created and closed safely. You pass null for Metrics, so monitorable config providers won’t register metrics during MM2’s transform—which is probably fine since this is short‑lived. Please confirm this is intentional.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (1)

810-816: Good assertion of configure-before-metrics order

Overriding withPluginMetrics to assert configured ensures ordering is correct. LGTM.

clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)

29-58: Test helper looks good

Provides minimal Monitorable implementation and a flag for configuration; sufficient for the added tests.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)

632-650: newConfigProvider API change and metrics wiring look correct

Correctly configures the provider, then wraps with Plugin and tags with provider name. No issues spotted.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)

46-52: Approve—no legacy ConfigProvider usages found
Legacy Map<String, ConfigProvider> types and constructors for ConfigTransformer/WorkerConfigTransformer aren’t present in the codebase.

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)

94-110: Namespace TTL keys by provider name to avoid collisions
No duplicates were found in placeholder scans, but path collisions remain a runtime risk when multiple providers use the same path. Use:

-                        ttls.put(path, ttl);
+                        ttls.put(providerName + ":" + path, ttl);

Ensure WorkerConfigTransformer remains compatible with the namespaced keys.

Comment on lines +550 to 561
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

return new ResolvingMap<>(resolvedOriginals, originals);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close provider plugins even when transform throws (resource leak)

If ConfigTransformer.transform throws, providerPlugins are never closed. Wrap usage in try/finally to ensure closure.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins =
+            instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+        try {
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return new ResolvingMap<>(resolvedOriginals, originals);
Map<String, Plugin<ConfigProvider>> providerPlugins =
instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
try {
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
} finally {
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
return new ResolvingMap<>(resolvedOriginals, originals);
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 550-561, providerPlugins are instantiated but if
ConfigTransformer.transform(...) throws an exception they are never closed
causing a resource leak; wrap the use of providerPlugins (creation, transform
invocation, and result handling) in a try/finally where the finally calls
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider
plugin")) to ensure closure even on exceptions, keeping providerPlugins visible
to the finally block by declaring it before the try.

Comment on lines 210 to 223
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
return new WorkerConfigTransformer(this, providerMap);
return new WorkerConfigTransformer(this, providerPluginMap);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard against null plugins returned for misconfigured providers

plugins.newConfigProvider can return null if the provider class isn’t defined; inserting null values risks NPEs downstream.

-        Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
         for (String providerName : providerNames) {
             Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                 config,
                 providerName,
                 ClassLoaderUsage.PLUGINS,
                 metrics.metrics()
             );
-            providerPluginMap.put(providerName, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPluginMap.put(providerName, configProviderPlugin);
+            } else {
+                log.warn("Config provider '{}' listed in '{}' but missing its '.class' property; skipping.",
+                        providerName, WorkerConfig.CONFIG_PROVIDERS_CONFIG);
+            }
         }
         return new WorkerConfigTransformer(this, providerPluginMap);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
return new WorkerConfigTransformer(this, providerMap);
return new WorkerConfigTransformer(this, providerPluginMap);
}
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
if (configProviderPlugin != null) {
providerPluginMap.put(providerName, configProviderPlugin);
} else {
log.warn("Config provider '{}' listed in '{}' but missing its '.class' property; skipping.",
providerName, WorkerConfig.CONFIG_PROVIDERS_CONFIG);
}
}
return new WorkerConfigTransformer(this, providerPluginMap);
}
🤖 Prompt for AI Agents
In connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
around lines 210 to 223, the code blindly inserts the result of
plugins.newConfigProvider into providerPluginMap which can be null for
misconfigured providers; change the loop to check the returned
Plugin<ConfigProvider> for null and handle it explicitly—either throw a
ConfigException (or IllegalArgumentException) with a clear message naming the
provider and the problem, or skip adding null entries and log a warning; only
put non-null plugin instances into providerPluginMap so downstream code cannot
encounter NPEs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants