-
Notifications
You must be signed in to change notification settings - Fork 0
Clone kafka 18894 #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe change converts ConfigProvider handling to plugin-wrapped instances across clients and Connect. Maps switch from provider instances to Plugin, constructors and fields update accordingly, and cleanup now closes plugins. Plugins.newConfigProvider’s signature changes to return Plugin and accept Metrics. Tests adapt and add monitoring/metrics coverage. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Admin as Admin/Config
participant Plugins as Plugins
participant Worker as Worker
participant WCT as WorkerConfigTransformer
participant CT as ConfigTransformer
participant Prov as Plugin<ConfigProvider>
participant Provider as ConfigProvider
participant Metrics as Metrics
Admin->>Plugins: newConfigProvider(config, providerName, loader, Metrics)
Plugins->>Plugins: derive providerPrefix
Plugins->>Provider: instantiate & configure
Plugins->>Prov: Plugin.wrapInstance(Provider, Metrics, "config.providers", {provider: name})
Plugins-->>Worker: Plugin<ConfigProvider>
Worker->>WCT: new WorkerConfigTransformer(worker, Map<String, Plugin<ConfigProvider>>)
WCT->>CT: new ConfigTransformer(Map<String, Plugin<ConfigProvider>>)
Admin->>WCT: transform(props)
WCT->>CT: transform(props)
CT->>Prov: lookup by provider name
alt provider found
Prov->>Provider: get(path, keys)
Provider-->>Prov: ConfigData (+ TTL)
Prov-->>CT: ConfigData
CT-->>WCT: resolved props, TTLs
else provider missing
CT-->>WCT: unchanged props
end
note over Worker,Prov: Shutdown
Worker->>Prov: close()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Pre-merge checks (1 warning, 2 inconclusive)❌ Failed checks (1 warning, 2 inconclusive)
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.38.6)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.javaTip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. ✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant internal refactoring to how ConfigProvider instances are handled, primarily by encapsulating them within a Plugin wrapper. This change is foundational for enabling robust metric collection and observability for ConfigProvider implementations, ensuring that their operational behavior can be monitored effectively within the Kafka ecosystem.
Highlights
- ConfigProvider Plugin Integration: The core change involves wrapping
ConfigProviderinstances within a newPluginclass. This refactoring affects howConfigProviderinstances are instantiated and managed acrossAbstractConfig,ConfigTransformer, and Kafka Connect'sWorkerandMirrorMakerConfig. - Enhanced Metrics for ConfigProviders: This change enables
ConfigProviderimplementations to register metrics by implementing theMonitorableinterface. ThePluginwrapper automatically handles metric registration with relevant tags likeconfig,class, andprovider. - API and Internal Method Signature Updates: Methods like
instantiateConfigProvidersinAbstractConfigand constructors/fields inConfigTransformerandWorkerConfigTransformernow useMap<String, Plugin<ConfigProvider>>instead ofMap<String, ConfigProvider>>. ThePlugins.newConfigProvidermethod has also been updated to return thePluginwrapper and acceptMetricsfor direct metric integration. - Test Suite Modernization and Expansion: Existing tests in
ConfigTransformerTest,WorkerConfigTransformerTest, andPluginsTesthave been updated to reflect the newPluginwrapping and leverageMap.offor cleaner test data. A newMonitorableConfigProvidertest class and corresponding tests inWorkerTestandPluginsTestvalidate the new metric registration capabilities.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
|
/refacto-test |
|
The user @visz11 does not have an active seat allocated for code reviews. Please contact your organization admin to assign a seat or upgrade your subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces metrics for ConfigProvider implementations, which is a valuable enhancement for monitoring configuration resolution in Kafka Connect. The core change involves wrapping ConfigProvider instances within a generic Plugin object that handles metrics registration for providers that implement the Monitorable interface.
The refactoring is consistently applied across AbstractConfig, ConfigTransformer, MirrorMakerConfig, and the Connect runtime components like Worker and Plugins. The changes are clean, and variable names have been updated for clarity (e.g., providers to providerPlugins).
New tests have been added to verify the metrics registration and to ensure the correct lifecycle of Monitorable config providers (configure is called before withPluginMetrics). Existing tests have been updated to accommodate the new Plugin wrapper. The implementation correctly handles cases where a metrics registry is not available by passing null, ensuring broad compatibility.
Overall, this is a high-quality contribution that improves the observability of Kafka Connect. The changes are well-designed and thoroughly tested.
|
/refacto-test |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (13)
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)
32-35: Clarify when theprovidermetric tag is present and that metrics require a non-nullMetrics.As implemented,
Plugin.wrapInstancealways addsconfigandclasstags; theprovidertag is only added by callers (e.g.,Plugins.newConfigProvider) that supply extra tags, and metrics are only registered when a non-nullMetricsis passed. The Javadoc currently reads as if all three tags are always present.Recommend tightening the wording to reflect actual behavior:
- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics. - * The following tags are automatically added to all metrics registered: <code>config</code> set to - * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name, - * and <code>provider</code> set to the provider name. + * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics + * when wrapped via the plugin mechanism with a non-null {@link org.apache.kafka.common.metrics.Metrics} instance. + * The following tags are automatically added: <code>config</code>=<code>config.providers</code> and + * <code>class</code>=the ConfigProvider simple class name. When the wrapper supplies a provider name + * (e.g., via {@code Plugins.newConfigProvider(...)}), the tag <code>provider</code>=<code><name></code> is also added.Please confirm all call sites that wrap config providers (e.g., in clients’ AbstractConfig) pass the provider name when available so docs and behavior stay aligned.
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)
273-281: Optional: propagate metrics for provider plugins if available.If MirrorMaker has a
Metricsinstance (or can access one via runtime), consider passing it instead ofnullto surface provider metrics consistent with the new Javadoc.Would a
Metricsinstance be available here (e.g., via the runtime/worker context) to enable provider metric registration?connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
48-52: Null-safety for provider plugin mapGuard against accidental nulls and pass the canonical field to the transformer.
- public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) { - this.worker = worker; - this.configProviderPlugins = configProviderPlugins; - this.configTransformer = new ConfigTransformer(configProviderPlugins); - } + public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) { + this.worker = worker; + this.configProviderPlugins = java.util.Objects.requireNonNull(configProviderPlugins, "configProviderPlugins"); + this.configTransformer = new ConfigTransformer(this.configProviderPlugins); + }Additionally add (outside the range, imports block):
+import java.util.Objects;
100-102: Close-time hygiene: cancel scheduled restarts and minor readability nitCancel pending HerderRequests to avoid late restarts after shutdown; rename the lambda var.
@Override public void close() { - configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); + requests.values().forEach(m -> m.values().forEach(HerderRequest::cancel)); + configProviderPlugins.values().forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin")); }clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (2)
63-69: Doc and constructor null-safetyClarify the parameter type in Javadoc and defensively default to an empty map.
- /** - * Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>. - * - * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances. - */ - public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) { - this.configProviderPlugins = configProviderPlugins; - } + /** + * Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>. + * + * @param configProviderPlugins a Map of provider names and Plugin-wrapped {@link ConfigProvider} instances. + */ + public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) { + this.configProviderPlugins = configProviderPlugins != null ? configProviderPlugins : Map.of(); + }
98-105: Redundant null-checkkeysByPath comes from the map’s value and won’t be null; simplify the condition.
- Map<String, Set<String>> keysByPath = entry.getValue(); - if (providerPlugin != null && keysByPath != null) { + Map<String, Set<String>> keysByPath = entry.getValue(); + if (providerPlugin != null) {clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (2)
35-38: Clarify metric naming and guard against accidental collisionsThe second arg to metricName is a group; the constant name “DESCRIPTION” is misleading. Also, using addMetric can throw if the same metric name+tags is registered twice across runs. Prefer a clearer constant and addMetricIfAbsent (if available on PluginMetrics), or keep addMetric but document it’s intentional.
- public static final String DESCRIPTION = "description"; + public static final String GROUP = "config-provider-metrics"; @@ - MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of()); - metrics.addMetric(metricName, (Measurable) (config, now) -> 123); + MetricName metricName = metrics.metricName(NAME, GROUP, Map.of()); + metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
41-48: Return empty ConfigData instead of nullReturning null can NPE downstream; for tests, an empty ConfigData is safer and equally inert.
- public ConfigData get(String path) { - return null; - } + public ConfigData get(String path) { + return new ConfigData(Map.of()); + } @@ - public ConfigData get(String path, Set<String> keys) { - return null; - } + public ConfigData get(String path, Set<String> keys) { + return new ConfigData(Map.of()); + }connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)
381-402: Close Plugin to avoid leaks; simplify via try-with-resourcesEven though metrics is null here, Plugin may hold closeable resources later. Use try-with-resources to ensure cleanup.
- Plugin<ConfigProvider> plugin = plugins.newConfigProvider( - config, - providerName, - ClassLoaderUsage.PLUGINS, - null - ); - - assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples"); - Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten(); + try (Plugin<ConfigProvider> plugin = plugins.newConfigProvider( + config, + providerName, + ClassLoaderUsage.PLUGINS, + null + )) { + assertNotNull(plugin); + assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples"); + Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten(); + assertTrue(samples.containsKey("configure")); + assertPluginClassLoaderAlwaysActive(plugin.get()); + } - assertTrue(samples.containsKey("configure")); - assertPluginClassLoaderAlwaysActive(plugin.get());
810-816: Track plugin-metrics injection to make the test definitiveAdd a flag that flips when withPluginMetrics runs, and assert on it in the test.
public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider { + volatile boolean metricsInjected; @Override public void withPluginMetrics(PluginMetrics metrics) { assertTrue(configured); + metricsInjected = true; } }connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)
2904-2927: Stop the worker to release plugin metrics and avoid cross-test leakageThis test never stops the Worker. Call worker.stop() after assertions so PluginMetrics and providers are closed.
Apply:
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); Metrics metrics = worker.metrics().metrics(); @@ assertMetrics(metrics, 1, expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2"))); + worker.stop(); }
2937-2951: Avoid brittle dependency on metrics group "plugins"If the group name changes, this becomes flaky. Consider matching by name/description + tags only.
- for (MetricName metricName : metrics.metrics().keySet()) { - if (metricName.group().equals("plugins")) { + for (MetricName metricName : metrics.metrics().keySet()) { + if (MonitorableConfigProvider.NAME.equals(metricName.name()) + && MonitorableConfigProvider.DESCRIPTION.equals(metricName.description())) { Map<String, String> tags = metricName.tags(); if (expectedTags.equals(tags)) { - assertEquals(MonitorableConfigProvider.NAME, metricName.name()); - assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description()); found++; } } }connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)
272-286: Close provider plugins before stopping metrics to ensure clean metric unregistrationPlugin.close() shuts down PluginMetricsImpl; doing this after metrics.stop() risks no-op/unregister races. Close workerConfigTransformer first.
globalOffsetBackingStore.stop(); - metrics.stop(); + workerConfigTransformer.close(); + metrics.stop(); log.info("Worker stopped"); workerMetricsGroup.close(); if (connectorStatusMetricsGroup != null) { connectorStatusMetricsGroup.close(); } - workerConfigTransformer.close(); ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); Utils.closeQuietly(internalKeyConverter, "internal key converter"); Utils.closeQuietly(internalValueConverter, "internal value converter");
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java(4 hunks)clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java(3 hunks)clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java(1 hunks)clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java(5 hunks)clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java(1 hunks)connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java(2 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java(1 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java(3 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java(3 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics(6299-6301)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
Plugins(59-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
WorkerConfig(58-463)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils(93-1700)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils(93-1700)
🔇 Additional comments (15)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (3)
21-21: Good switch to plugin-wrapped providers in tests.Importing
Pluginand wrappingTestConfigProvidermirrors production wiring and keeps behavior realistic; passingnullmetrics is fine here.Also applies to: 69-69
75-75: UsingMap.of(...)for test inputs is clean and immutable.Nice modernization; the maps are not mutated downstream, so this is appropriate.
Also applies to: 100-100, 115-115, 122-122
150-158: Test provider responses updated toMap.of(...)look correct.TTL-bearing and non-TTL cases are preserved, and immutability simplifies the assertions.
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (3)
20-20: LGTM on adoptingPlugin.wrapInstancein client-side tests.This aligns with the production
ConfigTransformersignature that now expectsPlugin<ConfigProvider>.Also applies to: 49-49
63-68: TTL assertion addition is accurate.Asserting
ttls.get(TEST_PATH) == 1Lvalidates the new TTL propagation path.
54-54: Consistent use ofMap.of(...)in tests.Looks good. Keeping
Collections.singletonMapin the null-value test is also correct sinceMap.ofdisallows nulls.Also applies to: 72-72, 79-79, 86-86, 93-93, 102-102
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
46-52: Switch to Plugin is correct and cleanup path looks goodUsing Plugin-wrapped providers and closing the plugins here aligns with the new lifecycle (providers + metrics).
Also applies to: 101-101
100-102: No duplicate-close found — Worker-owned config providers are closed only once.Worker creates providers in initConfigTransformer and WorkerConfigTransformer.close() is the sole close for those instances; other close sites (MirrorMakerConfig, AbstractConfig) close provider maps they instantiate locally.
- connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (initConfigTransformer)
- connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (close)
- connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (create & close locally)
- clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (instantiateConfigProviders / resolveConfigVariables — create & close locally)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)
550-560: LGTM on plugin-wrapped providers and proper cleanupInstantiating provider plugins, using ConfigTransformer with them, and closing the plugins after resolution is sound.
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
60-69: Field/type switch to Plugin looks correctLookup via providerPlugin.get() preserves behavior while enabling plugin lifecycle/metrics.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (1)
25-29: LGTM: imports align with the new Plugin-wrapped providers APIconnect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
22-24: LGTM: imports required for Plugin wrapping and Metrics
632-650: Add back-compat overload and tighten metric tags
- Internal search shows all internal callers already use the new Plugin signature (Worker.java, MirrorMakerConfig.java, PluginsTest.java, WorkerTest.java). If you need to preserve source/binary compatibility for downstream users, add a 3-arg overload that delegates to the new method with null metrics:
// Back-compat helper: returns raw instance, no metrics. public ConfigProvider newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage) { Plugin<ConfigProvider> plugin = newConfigProvider(config, providerName, classLoaderUsage, null); return plugin == null ? null : plugin.get(); }
- Tighten metric tags in Plugins.newConfigProvider: replace the current Map.of("provider", providerName) with a more specific tag set to improve uniqueness/readability, e.g. Map.of(
"config", providerPrefix,
"class", plugin.getClass().getSimpleName(),
"provider", providerName
) when calling Plugin.wrapInstance.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)
382-390: Mock aligns with new Plugins.newConfigProvider contractStubbing to return Plugin and switching the matcher set (provider name + Metrics) looks correct for the updated API.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)
212-223: Confirmed — WorkerConfigTransformer accepts Plugin and closes plugins on close()Constructor takes Map<String, Plugin> and close() calls Utils.closeQuietly(x, "config provider plugin") for each plugin.
| Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>(); | ||
| for (Map.Entry<String, String> entry : providerMap.entrySet()) { | ||
| try { | ||
| String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM; | ||
| Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties); | ||
| ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); | ||
| provider.configure(configProperties); | ||
| configProviderInstances.put(entry.getKey(), provider); | ||
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); | ||
| configProviderPluginInstances.put(entry.getKey(), providerPlugin); | ||
| } catch (ClassNotFoundException e) { | ||
| log.error("Could not load config provider class {}", entry.getValue(), e); | ||
| throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies"); | ||
| } | ||
| } | ||
|
|
||
| return configProviderInstances; | ||
| return configProviderPluginInstances; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid resource leaks when instantiation/configuration fails; improve plugin tag key
Close already-constructed plugins on failure and include the provider name in the plugin tag key for better metrics attribution.
- Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
+ Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
- Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+ Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG + "." + entry.getKey());
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
} catch (ClassNotFoundException e) {
log.error("Could not load config provider class {}", entry.getValue(), e);
+ // Close any providers constructed so far to avoid leaks
+ configProviderPluginInstances.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin"));
throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
+ } catch (RuntimeException e) {
+ // Close any providers constructed so far to avoid leaks
+ configProviderPluginInstances.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin"));
+ throw e;
}
}
return configProviderPluginInstances;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>(); | |
| for (Map.Entry<String, String> entry : providerMap.entrySet()) { | |
| try { | |
| String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM; | |
| Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties); | |
| ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); | |
| provider.configure(configProperties); | |
| configProviderInstances.put(entry.getKey(), provider); | |
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); | |
| configProviderPluginInstances.put(entry.getKey(), providerPlugin); | |
| } catch (ClassNotFoundException e) { | |
| log.error("Could not load config provider class {}", entry.getValue(), e); | |
| throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies"); | |
| } | |
| } | |
| return configProviderInstances; | |
| return configProviderPluginInstances; | |
| } | |
| Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>(); | |
| for (Map.Entry<String, String> entry : providerMap.entrySet()) { | |
| try { | |
| String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM; | |
| Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties); | |
| ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); | |
| provider.configure(configProperties); | |
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG + "." + entry.getKey()); | |
| configProviderPluginInstances.put(entry.getKey(), providerPlugin); | |
| } catch (ClassNotFoundException e) { | |
| log.error("Could not load config provider class {}", entry.getValue(), e); | |
| // Close any providers constructed so far to avoid leaks | |
| configProviderPluginInstances.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin")); | |
| throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies"); | |
| } catch (RuntimeException e) { | |
| // Close any providers constructed so far to avoid leaks | |
| configProviderPluginInstances.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin")); | |
| throw e; | |
| } | |
| } | |
| return configProviderPluginInstances; | |
| } |
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 624-640, the code currently constructs Plugin<ConfigProvider>
instances but doesn’t close already-created plugin instances if a subsequent
step fails and uses a generic plugin tag key; update the instantiation flow so
that after creating and wrapping the provider you assign the plugin to a local
variable and only put it into the map on successful completion, and in the
catch/failure path close the created plugin (call its close method) to avoid
resource leaks. Also change the plugin tag key passed to Plugin.wrapInstance to
include the provider name (e.g. CONFIG_PROVIDERS_CONFIG + "." + entry.getKey())
so metrics/tags are attributed to the specific provider. Ensure the same
ConfigException is thrown on failure after closing the plugin.
| Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); | ||
| for (String name : providerNames) { | ||
| ConfigProvider configProvider = plugins.newConfigProvider( | ||
| Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | ||
| this, | ||
| CONFIG_PROVIDERS_CONFIG + "." + name, | ||
| Plugins.ClassLoaderUsage.PLUGINS | ||
| name, | ||
| Plugins.ClassLoaderUsage.PLUGINS, | ||
| null | ||
| ); | ||
| providers.put(name, configProvider); | ||
| providerPlugins.put(name, configProviderPlugin); | ||
| } | ||
| ConfigTransformer transformer = new ConfigTransformer(providers); | ||
| ConfigTransformer transformer = new ConfigTransformer(providerPlugins); | ||
| Map<String, String> transformed = transformer.transform(props).data(); | ||
| providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | ||
| return transformed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against null providers and ensure plugins are closed on exceptions.
Plugins.newConfigProvider(...) returns null when a provider name has no corresponding class property. The current code inserts the null into the map and will likely NPE inside ConfigTransformer. Also, if transformer.transform(...) throws, plugins won’t be closed.
Proposed fix: skip nulls and move closing into a finally to guarantee cleanup.
- Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
- for (String name : providerNames) {
- Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
- this,
- name,
- Plugins.ClassLoaderUsage.PLUGINS,
- null
- );
- providerPlugins.put(name, configProviderPlugin);
- }
- ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
- Map<String, String> transformed = transformer.transform(props).data();
- providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
- return transformed;
+ Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+ try {
+ for (String name : providerNames) {
+ Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+ this,
+ name,
+ Plugins.ClassLoaderUsage.PLUGINS,
+ null
+ );
+ if (configProviderPlugin != null) {
+ providerPlugins.put(name, configProviderPlugin);
+ }
+ }
+ ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+ return transformer.transform(props).data();
+ } finally {
+ providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); | |
| for (String name : providerNames) { | |
| ConfigProvider configProvider = plugins.newConfigProvider( | |
| Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
| this, | |
| CONFIG_PROVIDERS_CONFIG + "." + name, | |
| Plugins.ClassLoaderUsage.PLUGINS | |
| name, | |
| Plugins.ClassLoaderUsage.PLUGINS, | |
| null | |
| ); | |
| providers.put(name, configProvider); | |
| providerPlugins.put(name, configProviderPlugin); | |
| } | |
| ConfigTransformer transformer = new ConfigTransformer(providers); | |
| ConfigTransformer transformer = new ConfigTransformer(providerPlugins); | |
| Map<String, String> transformed = transformer.transform(props).data(); | |
| providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | |
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| return transformed; | |
| Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); | |
| try { | |
| for (String name : providerNames) { | |
| Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
| this, | |
| name, | |
| Plugins.ClassLoaderUsage.PLUGINS, | |
| null | |
| ); | |
| if (configProviderPlugin != null) { | |
| providerPlugins.put(name, configProviderPlugin); | |
| } | |
| } | |
| ConfigTransformer transformer = new ConfigTransformer(providerPlugins); | |
| return transformer.transform(props).data(); | |
| } finally { | |
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| } |
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 273 to 286, the code currently inserts whatever
Plugins.newConfigProvider(...) returns (including null) into providerPlugins and
closes plugins only after transform, which can cause NPEs and leak plugin
resources if transform throws; change the loop to check the result of
plugins.newConfigProvider(...) and only put non-null plugins into
providerPlugins, and wrap the creation/transform/close sequence in a try/finally
so that providerPlugins.values() are closed in the finally block (skipping
nulls) to guarantee cleanup even if transformer.transform(...) throws.
| @Test | ||
| public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { | ||
| String providerName = "monitorable"; | ||
| String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; | ||
| props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName()); | ||
| createConfig(); | ||
| Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics()); | ||
| assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Test doesn’t fail if withPluginMetrics is never called—add an explicit assertion and close resources
Currently the assertion lives inside withPluginMetrics; if it’s never invoked, the test still passes. Track invocation and assert it. Also, close Metrics and the Plugin.
- Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
- assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+ Metrics metrics = new Metrics();
+ try (Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, metrics)) {
+ assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+ CustomMonitorableConfigProvider p = (CustomMonitorableConfigProvider) plugin.get();
+ assertTrue(p.metricsInjected, "withPluginMetrics was not invoked");
+ } finally {
+ metrics.close();
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @Test | |
| public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { | |
| String providerName = "monitorable"; | |
| String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; | |
| props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName()); | |
| createConfig(); | |
| Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics()); | |
| assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get()); | |
| } | |
| @Test | |
| public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { | |
| String providerName = "monitorable"; | |
| String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; | |
| props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName()); | |
| createConfig(); | |
| Metrics metrics = new Metrics(); | |
| try (Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, metrics)) { | |
| assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get()); | |
| CustomMonitorableConfigProvider p = (CustomMonitorableConfigProvider) plugin.get(); | |
| assertTrue(p.metricsInjected, "withPluginMetrics was not invoked"); | |
| } finally { | |
| metrics.close(); | |
| } | |
| } |
🤖 Prompt for AI Agents
In
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
around lines 404 to 412, the test relies on an assertion inside the
CustomMonitorableConfigProvider.withPluginMetrics which may never be invoked so
the test can pass spuriously; modify the test to (1) wrap Metrics in a variable
and pass it to newConfigProvider, (2) add a boolean/AtomicBoolean flag that the
provider sets when withPluginMetrics is called and assert that flag after
construction to ensure invocation, (3) explicitly close the Metrics instance in
a finally block (or use try-with-resources if applicable), and (4)
close/unregister the returned Plugin (plugin.close() or plugin.release()) after
the assertion to clean up resources.
|
/refacto-test |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Code Review: ConfigProvider Plugin Integration👍 Well Done
📌 Files Processed
📝 Additional Comments
|
| } | ||
| } | ||
| providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked Plugin Closure
Closing Plugin directly instead of its wrapped instance. The Plugin wrapper itself isn't Closeable, but its wrapped ConfigProvider is. This could prevent proper resource cleanup.
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider")); |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- DbC-Resource-Mgmt
| @Override | ||
| public void close() { | ||
| configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
| configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked Plugin Closure
Closing Plugin directly instead of its wrapped instance. The Plugin wrapper isn't Closeable, but the wrapped ConfigProvider is. This could prevent proper resource cleanup.
| configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider")); |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- DbC-Resource-Mgmt
| String path = pathWithKeys.getKey(); | ||
| Set<String> keys = new HashSet<>(pathWithKeys.getValue()); | ||
| ConfigData configData = provider.get(path, keys); | ||
| ConfigData configData = providerPlugin.get().get(path, keys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource Leak Risk
The Plugin wrapper adds an additional layer of object access with providerPlugin.get().get(path, keys). This double method call pattern creates potential resource leaks if the plugin instance isn't properly closed. The ConfigProvider resources are now wrapped in Plugin objects but the close() method implementation doesn't account for this new structure.
| ConfigData configData = providerPlugin.get().get(path, keys); | |
| ConfigProvider provider = providerPlugin.get(); | |
| ConfigData configData = provider.get(path, keys); |
Standards
- ISO-IEC-25010-Performance-Resource-Utilization
- Netflix-Resource-Management
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(mockFileConfigProvider, | ||
| null, | ||
| WorkerConfig.CONFIG_PROVIDERS_CONFIG, | ||
| Map.of("provider", "file")); | ||
| when(plugins.newConfigProvider(any(AbstractConfig.class), | ||
| eq("config.providers.file"), | ||
| any(ClassLoaderUsage.class))) | ||
| .thenReturn(mockFileConfigProvider); | ||
| eq("file"), | ||
| any(ClassLoaderUsage.class), | ||
| any(Metrics.class))).thenReturn(providerPlugin); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plugin Wrapper Duplication
Plugin wrapping logic is duplicated in test code rather than being encapsulated in the newConfigProvider method. This creates maintenance burden when plugin wrapping implementation changes.
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(mockFileConfigProvider, | |
| null, | |
| WorkerConfig.CONFIG_PROVIDERS_CONFIG, | |
| Map.of("provider", "file")); | |
| when(plugins.newConfigProvider(any(AbstractConfig.class), | |
| eq("config.providers.file"), | |
| any(ClassLoaderUsage.class))) | |
| .thenReturn(mockFileConfigProvider); | |
| eq("file"), | |
| any(ClassLoaderUsage.class), | |
| any(Metrics.class))).thenReturn(providerPlugin); | |
| MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider(); | |
| mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId)); | |
| when(plugins.newConfigProvider(any(AbstractConfig.class), | |
| eq("file"), | |
| any(ClassLoaderUsage.class), | |
| any(Metrics.class))).thenAnswer(invocation -> { | |
| return Plugin.wrapInstance(mockFileConfigProvider, null, WorkerConfig.CONFIG_PROVIDERS_CONFIG, | |
| Map.of("provider", "file")); | |
| }); |
Standards
- Clean-Code-DRY
- Design-Pattern-Factory
|
/refacto-test |
Summary by CodeRabbit