Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Oct 3, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Config providers in Connect now expose metrics when monitorable, with standardized tags for easier observability.
    • Configuration transformation uses plugin-based providers for improved isolation and lifecycle management (transparent to users).
  • Documentation

    • Clarified that config providers can register metrics and the tags applied.
  • Refactor

    • Migrated config provider handling across components to plugin-wrapped providers for consistency and safer resource management.
  • Tests

    • Added tests validating metrics emission for monitorable config providers and updated transformer tests.

@coderabbitai
Copy link

coderabbitai bot commented Oct 3, 2025

Walkthrough

Switches config provider handling from direct instances to Plugin-wrapped providers across core clients and Connect runtime. Updates constructors, maps, and close logic to operate on Plugin. Adds Monitorable-capable test provider and metrics wiring/tests. Minor Javadoc and formatting updates. Tests adjusted to use Plugin wrappers and Map.of.

Changes

Cohort / File(s) Summary
Vagrant housekeeping
Vagrantfile
Inserted an extra blank line; no behavioral change.
Core config provider plumbing (clients)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java, clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java, clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
Replace plain ConfigProvider maps with Map<String, Plugin<ConfigProvider>> in resolution/transformer; update construction/closing to use plugins; Javadoc mentions Monitorable and metric tags.
Client-side tests
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java, clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
Tests updated to wrap providers with Plugin.wrapInstance(...) and use Map.of(...); add new MonitorableConfigProvider test class implementing Monitorable and registering a metric.
Connect runtime: provider instantiation and wiring
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java, connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
Plugins.newConfigProvider(...) now returns Plugin<ConfigProvider> and accepts Metrics; Worker, WorkerConfigTransformer, and MirrorMakerConfig updated to build/store/close provider plugins and pass them to ConfigTransformer.
Connect tests
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
Tests migrated to Plugin-wrapped providers and Map.of(...); add tests for MonitorableConfigProvider metrics and plugin metrics lifecycle; introduce a custom monitorable provider for verification.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Admin as Admin/Worker.init
  participant Plugins as Plugins
  participant Plugin as Plugin<ConfigProvider>
  participant Provider as ConfigProvider
  participant WCT as WorkerConfigTransformer
  participant CT as ConfigTransformer
  participant Metrics as Metrics

  Admin->>Plugins: newConfigProvider(config, providerName, classLoaderUsage, Metrics)
  Plugins-->>Admin: Plugin.wrapInstance(provider, metrics, tags)
  Admin->>WCT: new WorkerConfigTransformer(worker, Map{name -> Plugin})
  WCT->>CT: new ConfigTransformer(Map{name -> Plugin})

  note over CT: During transform
  CT->>Plugin: get()
  Plugin-->>CT: ConfigProvider
  CT->>Provider: get(path/keys)
  Provider-->>CT: ConfigData
  CT-->>WCT: Transformed configs

  note over Plugins,Provider: If provider implements Monitorable
  Plugins->>Provider: withPluginMetrics(PluginMetrics)
  Provider-->>Metrics: register gauges/counters
Loading
sequenceDiagram
  autonumber
  actor MM as MirrorMakerConfig.transform
  participant Plugins as Plugins
  participant Plugin as Plugin<ConfigProvider>
  participant CT as ConfigTransformer

  MM->>Plugins: newConfigProvider(config, providerName, usage, Metrics)
  Plugins-->>MM: Plugin<ConfigProvider>
  MM->>CT: new ConfigTransformer(Map{name -> Plugin})
  MM->>CT: transform(configs)
  CT-->>MM: Transformed configs
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I thump my paw at plugins new,
Wrap the configs, metrics too.
Providers hop through lazy gates,
Tags in tow, they meet their fates.
With gentle nibbles, tests all pass—
Transformations sprout like grass.
Hippity hop, shipped at last! 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone kafka 18894” does not describe the substantive change in this PR, which is migrating ConfigProvider usage to Plugin<ConfigProvider> wrappers across multiple modules; instead it merely references an issue number and lacks any meaningful summary of the code changes. Rename the pull request to a concise summary of the primary change, for example “Migrate config providers to Plugin<ConfigProvider> wrappers,” so it clearly reflects the main code modifications.
Description Check ⚠️ Warning The description is untouched template placeholder text and provides no details about what was changed, why, or how it was tested, leaving the PR without any substantive summary or testing strategy. Replace the placeholder text with a detailed description that outlines the changes made (e.g., the shift to Plugin<ConfigProvider>), the rationale for the update, and the testing strategy including any new or updated unit and integration tests.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run `@coderabbitai generate docstrings` to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a fundamental refactoring to how ConfigProvider instances are managed and instantiated within Kafka Connect. By wrapping ConfigProvider objects in a new Plugin class, the system gains a more unified approach to handling plugin lifecycles and enables ConfigProvider implementations to expose metrics through the Monitorable interface. This change enhances the extensibility and observability of Kafka Connect's configuration management, allowing for better monitoring of custom configuration providers.

Highlights

  • Refactored ConfigProvider Management: ConfigProvider instances are now consistently wrapped within a new Plugin object across the codebase. This change centralizes their lifecycle management and provides a unified mechanism for handling various plugin types within Kafka Connect.
  • Enabled Metrics for ConfigProviders: ConfigProvider implementations can now expose metrics by implementing the Monitorable interface. This enhancement significantly improves the observability of configuration providers, allowing for better monitoring of their operational status and performance.
  • Improved Plugin Instantiation: The Plugins utility class has been updated to return Plugin<ConfigProvider> and now accepts a Metrics instance during ConfigProvider creation. This facilitates the automatic registration of metrics for monitorable config providers.
  • Enhanced Test Coverage: New test cases and a dedicated MonitorableConfigProvider test utility class have been introduced to thoroughly validate the new metrics functionality and the proper wrapping of ConfigProvider plugins.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@refacto-test
Copy link

refacto-test bot commented Oct 3, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Oct 3, 2025


🔒 Limited Access – You're on the free plan.

🚀 Upgrade to Pro and unlock:

  • Unlimited AI PR reviews - No more restrictions
  • 🛡️ Code Quality Improvement with 50+ Standards (OWASP, CWE, Google-SRE)
  • 📊 Code Review Analytics - Track and improve your team's performance
  • 🎯 Specialized Reviews for Front-end, Backend, DevOps and other teams
  • 📈 Sequence diagrams and workflow charts
  • 📝 Enhanced summaries that help your team ship faster

👉 Upgrade to Pro Now

---## Clone kafka 18894: Add metrics support for ConfigProvider plugins

TL;DR: Enhances ConfigProvider with metrics support through the Plugin wrapper, enabling monitoring capabilities for config providers.


Refacto PR Summary

Adds metrics support to ConfigProvider plugins via the Plugin wrapper class and Monitorable interface integration.
This PR enables ConfigProvider implementations to expose metrics by implementing the Monitorable interface. The implementation wraps ConfigProvider instances in a Plugin class that handles metrics registration with appropriate tags (config, class, provider). The change modifies the ConfigTransformer to work with Plugin-wrapped ConfigProviders instead of direct instances, maintaining backward compatibility while adding observability. This enhancement allows operators to monitor config provider performance and behavior through standard metrics interfaces.

Change Highlights

Click to expand
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java: Modified to work with Plugin-wrapped ConfigProviders
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java: Added documentation for Monitorable implementation
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java: New test class for metrics support
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: Updated to use Plugin-wrapped ConfigProviders with metrics
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: Modified newConfigProvider to return Plugin-wrapped instances
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java: Updated to work with Plugin-wrapped providers
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: Added test for MonitorableConfigProvider

Sequence Diagram

sequenceDiagram
    participant Worker
    participant Plugins
    participant ConfigProvider
    participant Plugin
    participant Metrics
    
    Worker->>Plugins: newConfigProvider(config, providerName, ClassLoaderUsage, metrics)
    Plugins->>Plugins: Load and configure ConfigProvider
    Plugins->>Plugin: wrapInstance(provider, metrics, CONFIG_PROVIDERS_CONFIG, tags)
    Plugin->>ConfigProvider: Check if implements Monitorable
    ConfigProvider->>Metrics: Register metrics (if Monitorable)
    Plugins-->>Worker: Return Plugin<ConfigProvider>
    Worker->>Worker: Create ConfigTransformer with Plugin-wrapped providers
Loading

Testing Guide

Click to expand
  1. Implement a custom ConfigProvider that implements Monitorable interface
  2. Configure Kafka Connect with the custom provider in worker config
  3. Verify metrics are properly registered with correct tags (config, class, provider)
  4. Check metrics values are exposed through JMX or metrics endpoint
  5. Test that existing ConfigProvider implementations continue to work without changes

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to expose metrics from ConfigProvider implementations by wrapping them in a Plugin object. The changes are consistently applied across the client and connect modules, enabling metrics for config providers in Kafka Connect. The core logic is sound, and tests are updated and added to cover the new functionality. I have one suggestion to improve the consistency of metric tagging for config providers used outside of the Kafka Connect runtime, which will also align the implementation with the updated documentation.

ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure consistent metric tagging for ConfigProviders across different use cases (client-side vs. Kafka Connect), it would be beneficial to also add the provider tag here. This would align with the behavior in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java and make the updated Javadoc in ConfigProvider.java accurate for all scenarios. The provider name is available here as entry.getKey().

Suggested change
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG, Map.of("provider", entry.getKey()));

@DDShantanuBadmanji
Copy link
Author

/refacto-test

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

270-286: Close provider plugins in a finally block.

If transformer.transform(props) throws, the current code never reaches the forEach that closes the Plugin<ConfigProvider> instances, leaking both the provider and any attached plugin metrics. Wrapping the transform call in a try/finally restores the previous cleanup guarantees and matches our usual pattern for plugin lifecycle management.

-        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
-        Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+        try {
+            Map<String, String> transformed = transformer.transform(props).data();
+            return transformed;
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
-        return transformed;
+        return null; // replace with the real return in the try block

(Adjust the placement so the actual return happens inside the try while keeping the finally solely for cleanup.)

clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

532-560: Ensure config provider plugins are always closed.

ConfigTransformer.transform(indirectVariables) can throw (bad provider output, parse errors, etc.), and when it does we currently leak every Plugin<ConfigProvider> created in this method. This regression matters now that plugins may allocate metric resources. Wrapping the transform call in a try/finally keeps the lifecycle symmetrical:

-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        if (!providerPlugins.isEmpty()) {
+            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+            try {
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            } finally {
+                providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+            }
+        }

This mirrors our other plugin-wrapping sites and prevents resource leakage on failure.

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)

65-69: Refresh the constructor Javadoc.

The parameter doc still refers to raw ConfigProvider instances, but the signature now takes Plugin<ConfigProvider>. Please update the wording so the documentation matches the API and avoids confusion.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 5989a26.

📒 Files selected for processing (13)
  • Vagrantfile (1 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)

Comment on lines 147 to 158
public ConfigData get(String path, Set<String> keys) {
if (path.equals(TEST_PATH)) {
if (keys.contains(TEST_KEY)) {
return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT));
return new ConfigData(Map.of(TEST_KEY, TEST_RESULT));
} else if (keys.contains(TEST_KEY_WITH_TTL)) {
return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
} else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
}
}
return new ConfigData(Collections.emptyMap());
return new ConfigData(Map.of());
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restore support for multi-key lookups in the test provider.

ConfigTransformer aggregates all keys per provider/path into a single Set and calls ConfigProvider#get(path, keys) once. The new implementation returns immediately on the first match, so if two keys share the same path (e.g., ${test:testPath:testKey} and ${test:testPath:testKeyWithTTL} in the same config), only one value is supplied and the other substitution silently fails. The previous version accumulated all matches in a map, so the regression drops coverage for the core multi-key behavior we rely on in production.

Please revert to building the result map before returning, so every requested key is populated and TTL is derived consistently, e.g.:

-            if (path.equals(TEST_PATH)) {
-                if (keys.contains(TEST_KEY)) {
-                    return new ConfigData(Map.of(TEST_KEY, TEST_RESULT));
-                } else if (keys.contains(TEST_KEY_WITH_TTL)) {
-                    return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
-                } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
-                    return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
-                }
-            }
-            return new ConfigData(Map.of());
+            Map<String, String> data = new HashMap<>();
+            Long ttl = null;
+            if (path.equals(TEST_PATH)) {
+                if (keys.contains(TEST_KEY)) {
+                    data.put(TEST_KEY, TEST_RESULT);
+                }
+                if (keys.contains(TEST_KEY_WITH_TTL)) {
+                    data.put(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL);
+                    ttl = 1L;
+                }
+                if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
+                    data.put(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL);
+                    ttl = 10L;
+                }
+            }
+            return new ConfigData(data, ttl);

This keeps the stub faithful to the contract and ensures the worker transformer tests continue to cover multi-key resolutions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public ConfigData get(String path, Set<String> keys) {
if (path.equals(TEST_PATH)) {
if (keys.contains(TEST_KEY)) {
return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT));
return new ConfigData(Map.of(TEST_KEY, TEST_RESULT));
} else if (keys.contains(TEST_KEY_WITH_TTL)) {
return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
} else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
}
}
return new ConfigData(Collections.emptyMap());
return new ConfigData(Map.of());
}
public ConfigData get(String path, Set<String> keys) {
Map<String, String> data = new HashMap<>();
Long ttl = null;
if (path.equals(TEST_PATH)) {
if (keys.contains(TEST_KEY)) {
data.put(TEST_KEY, TEST_RESULT);
}
if (keys.contains(TEST_KEY_WITH_TTL)) {
data.put(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL);
ttl = 1L;
}
if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
data.put(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL);
ttl = 10L;
}
}
return new ConfigData(data, ttl);
}
🤖 Prompt for AI Agents
In
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
around lines 147-158, the test ConfigProvider#get implementation currently
returns on the first matching key and thus fails multi-key lookups; change it to
build a result map for all requested keys for the TEST_PATH by iterating the
provided keys set, adding each matching key/value pair to the map and collecting
any TTLs, then return a single ConfigData containing the full map and a
consistently derived TTL (e.g., compute and pass the appropriate TTL after
collecting all matches) so every requested key is populated instead of returning
early.

@DDShantanuBadmanji
Copy link
Author

/refacto-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants