Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Sep 29, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Connect workers now expose metrics for monitorable config providers with standardized tags (config, class, provider).
  • Refactor

    • Config providers are now handled via plugin wrappers for consistent lifecycle management and isolation; no expected functional change.
  • Documentation

    • Clarified guidance for making config providers monitorable and documented emitted metric tags.
  • Tests

    • Added and updated tests to validate plugin-wrapped providers, transformation behavior, and metric emission for monitorable providers.

@coderabbitai
Copy link

coderabbitai bot commented Sep 29, 2025

Walkthrough

Replaces direct ConfigProvider usage with Plugin<ConfigProvider> wrappers across provider instantiation, transformer construction, resolution, and cleanup. Adds metrics wiring for monitorable providers, updates Plugins.newConfigProvider signature, and adapts tests to wrap providers and assert metric registration.

Changes

Cohort / File(s) Summary of changes
Core config plumbing
clients/.../common/config/AbstractConfig.java, clients/.../common/config/ConfigTransformer.java
Switch maps and flows from ConfigProvider to Plugin<ConfigProvider>; update constructor/field types, access providers via plugin.get(), and close plugin wrappers.
ConfigProvider docs
clients/.../common/config/provider/ConfigProvider.java
Add Javadoc recommending Monitorable implementation for metrics and describing automatic metric tags; no API surface change.
ConfigProvider test helper
clients/.../common/config/provider/MonitorableConfigProvider.java
New test-oriented Monitorable ConfigProvider that registers a plugin metric, exposes NAME/DESCRIPTION constants, and includes basic get, configure, and close stubs.
Client tests updated
clients/.../common/config/ConfigTransformerTest.java
Wrap test providers with Plugin.wrapInstance; replace Collections.singletonMap with Map.of; update imports.
Plugins API and wiring
connect/runtime/.../isolation/Plugins.java
Change newConfigProvider to return Plugin<ConfigProvider>, add Metrics parameter, instantiate provider, call Plugin.wrapInstance with metrics, and return the plugin wrapper.
Worker and transformer wiring (runtime)
connect/runtime/.../Worker.java, connect/runtime/.../WorkerConfigTransformer.java
Construct and pass Map<String, Plugin<ConfigProvider>> instead of raw providers; update constructor signatures/fields; close plugin wrappers with updated messages; import Plugin.
MirrorMaker config path
connect/mirror/.../MirrorMakerConfig.java
Use plugins.newConfigProvider(...) to obtain Plugin<ConfigProvider>, pass to ConfigTransformer, and close plugin wrappers on shutdown.
Runtime tests updated
connect/runtime/.../WorkerConfigTransformerTest.java, connect/runtime/.../WorkerTest.java
Adapt tests to supply Plugin.wrapInstance providers via Map.of; add testMonitorableConfigProvider that verifies plugin metrics and helper methods for expected tags and metric assertions.
Plugins unit tests updated
connect/runtime/.../isolation/PluginsTest.java
Update to use Plugin<ConfigProvider>, call plugin.get() to access provider, add test ensuring withPluginMetrics is invoked after configure, and adjust provider naming/prefix logic.
Miscellaneous
Vagrantfile
Minor formatting/comment addition only.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller as Caller
  participant AC as AbstractConfig
  participant Pl as Plugins
  participant PPl as Plugin<ConfigProvider>
  participant CT as ConfigTransformer
  participant CP as ConfigProvider
  participant Mx as Metrics

  Caller->>AC: load configs, request provider for "name"
  AC->>Pl: newConfigProvider(config, "name", classLoaderUsage, Mx)
  Pl->>CP: instantiate provider class
  Pl->>CP: configure(...)
  Note right of CP: if Monitorable, will expect withPluginMetrics(Mx)
  Pl->>PPl: Plugin.wrapInstance(CP, Mx, tags)
  Pl-->>AC: return PPl
  AC->>CT: new ConfigTransformer(mapOf(name->PPl))
  Caller->>CT: transform(props)
  CT->>PPl: p = plugin.get()
  PPl-->>CT: CP (underlying)
  CT->>CP: get(path[, keys])
  CP-->>CT: ConfigData
  CT-->>Caller: transformed props
  Caller->>PPl: close (Utils.closeQuietly)
Loading
sequenceDiagram
  autonumber
  participant Test as TestHarness
  participant Plugin as Plugin.wrapInstance
  participant Prov as MonitorableConfigProvider
  participant M as Metrics

  Test->>Prov: new MonitorableConfigProvider()
  Test->>Plugin: wrapInstance(Prov, M, tags)
  Plugin-->>Test: Plugin<ConfigProvider>
  Test->>M: assert metric exists with tags (config=config.providers, provider=name, class=Prov)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I wrapped my configs snug and neat,
In plugin coats with tiny feet.
Metrics hum a tidy tune,
Providers nap beneath the moon.
Close softly now — the system's neat. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone kafka 18894” does not convey the primary change introduced by this pull request, which is the migration of ConfigProvider usage throughout the codebase to Plugin-wrapped instances. It is overly generic, references an internal ticket number without context, and fails to summarize the core modification made. Please update the title to clearly and concisely describe the main change, for example “Migrate ConfigProvider usages to Plugin-wrapped ConfigProvider instances” so that reviewers immediately understand the objective of the PR.
Description Check ⚠️ Warning The pull request description is still the unmodified template placeholder and does not include any details about the implemented changes, testing strategy, or rationale. This leaves reviewers without context on what was changed or how it was validated. Please replace the placeholder text with a detailed description of the changes, including a summary of the migration to Plugin-wrapped ConfigProvider instances, the reasons for the change, and an outline of any unit or integration tests added to verify the new behavior.
✅ Passed checks (1 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b92a2bb and 5989a26.

📒 Files selected for processing (1)
  • Vagrantfile (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • Vagrantfile

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@refacto-test
Copy link

refacto-test bot commented Sep 29, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural enhancement by wrapping ConfigProvider instances within a generic Plugin framework. This change primarily facilitates the integration of ConfigProviders with Kafka's metrics system, allowing them to expose monitoring data. By standardizing the plugin lifecycle and enabling automatic metric registration with contextual tags, this update improves the observability and manageability of configuration providers across Kafka and Kafka Connect components.

Highlights

  • ConfigProvider Plugin Integration: ConfigProvider instances are now managed through a Plugin wrapper, standardizing their lifecycle and enabling advanced features.
  • Enhanced Metrics for ConfigProviders: ConfigProvider's can now implement the Monitorable interface to expose custom metrics, which are automatically registered with relevant tags by the Plugin framework.
  • Refactored Instantiation Logic: The core logic for instantiating and managing ConfigProviders across AbstractConfig, MirrorMakerConfig, and Kafka Connect Worker has been updated to leverage the new Plugin wrapper, ensuring consistent behavior and metrics integration.
  • Comprehensive Test Coverage: New and updated unit tests confirm the correct behavior of the Plugin wrapper, including proper metrics registration and lifecycle management for ConfigProviders.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@refacto-test
Copy link

refacto-test bot commented Sep 29, 2025

Clone kafka 18894

TL;DR: Backports RemoteLogManager implementation to an older Kafka version with compatibility adjustments for Java 8.


Refacto PR Summary

This PR adds the RemoteLogManager implementation and related classes for tiered storage support while maintaining Java 8 compatibility.
The changes primarily focus on backporting the RemoteLogManager implementation from a newer Kafka version, making necessary adjustments for Java 8 compatibility. This includes replacing Java 11+ features like records with regular classes, replacing switch expressions with traditional switch statements, and modifying collection stream operations to use collect(Collectors.toList()) instead of toList(). The PR also includes updates to class paths and imports to align with the target codebase structure, ensuring the tiered storage functionality works properly in this version.

Change Highlights

Click to expand
  • core/src/main/java/kafka/log/remote/RemoteLogManager.java: Added core implementation for tiered storage management
  • core/src/main/java/kafka/log/remote/RemoteLogReader.java: Added implementation for reading logs from remote storage
  • core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java: Added implementation for reading offsets from remote storage
  • connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java: Replaced record with class for Java 8 compatibility
  • connect/runtime/src/test/java/org/apache/kafka/connect/tools/TransformationDoc.java: Replaced record with class for Java 8 compatibility
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: Replaced record with class for Java 8 compatibility
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StartsAndStops.java: Replaced record with class for Java 8 compatibility

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant Broker as Kafka Broker
    participant RLM as RemoteLogManager
    participant RLMM as RemoteLogMetadataManager
    participant RSM as RemoteStorageManager
    participant Storage as Remote Storage
    
    Client->>Broker: Fetch Request
    Broker->>RLM: Read Remote Log
    RLM->>RLMM: Fetch Metadata
    RLMM-->>RLM: Remote Log Segment Metadata
    RLM->>RSM: Fetch Log Segment
    RSM->>Storage: Read Data
    Storage-->>RSM: Log Data
    RSM-->>RLM: Log Segment
    RLM-->>Broker: FetchDataInfo
    Broker-->>Client: Fetch Response
Loading

Testing Guide

Click to expand
  1. Verify RemoteLogManager initialization with proper configuration parameters
  2. Test log segment copying to remote storage by configuring a remote storage manager and verifying segments are properly transferred
  3. Test reading from remote storage by configuring a broker to fetch data from tiered storage and verifying data integrity
  4. Test leader/follower transitions by simulating leadership changes and verifying the RemoteLogManager properly handles the state transitions
  5. Test cleanup of expired segments by configuring retention policies and verifying segments are properly deleted from remote storage

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the handling of ConfigProvider instances to wrap them in a Plugin object, which enables metrics for monitorable providers. The changes are consistently applied throughout the codebase, including updates to AbstractConfig, ConfigTransformer, and various components in Kafka Connect. The implementation correctly passes the Metrics registry where available (like in Worker) and handles its absence gracefully in other contexts. New tests have been added to validate the metrics integration and the lifecycle of config providers. The code quality is high, and the changes are a valuable addition for monitoring. I have no further comments.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

598-640: Close provider on configure failure; trim provider names; improve plugin tagging

  • If configure() throws, the raw provider instance may leak. Close it before rethrowing.
  • Trim entries from config.providers to tolerate whitespace.
  • Use a provider-specific key for Plugin metrics/log tags (helps diagnostics, even if metrics is null here).

Apply this diff:

-    private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
+    private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
             Map<String, String> indirectConfigs,
             Map<String, ?> providerConfigProperties,
             Predicate<String> classNameFilter
     ) {
         final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
@@
-        for (String provider : configProviders.split(",")) {
+        for (String providerRaw : configProviders.split(",")) {
+            final String provider = providerRaw.trim();
+            if (provider.isEmpty()) continue;
             String providerClass = providerClassProperty(provider);
             if (indirectConfigs.containsKey(providerClass)) {
                 String providerClassName = indirectConfigs.get(providerClass);
                 if (classNameFilter.test(providerClassName)) {
                     providerMap.put(provider, providerClassName);
@@
-        Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
-                ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
-                provider.configure(configProperties);
-                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
+                try {
+                    provider.configure(configProperties);
+                } catch (Throwable t) {
+                    maybeClose(provider, "AutoCloseable config provider constructed during failed instantiation of '" + entry.getKey() + "'");
+                    throw t;
+                }
+                // Tag with specific provider for clearer metrics/logging even if metrics are null here
+                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG + "." + entry.getKey());
                 configProviderPluginInstances.put(entry.getKey(), providerPlugin);
             } catch (ClassNotFoundException e) {
                 log.error("Could not load config provider class {}", entry.getValue(), e);
                 throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
             }
         }
 
         return configProviderPluginInstances;
     }

Consider LinkedHashMap to preserve declaration order if any downstream relies on it.

🧹 Nitpick comments (11)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3)

641-649: Wrap before configure and close on failure to avoid leaks

If plugin.configure(...) throws, the instance is not wrapped and never closed. Wrap first and close the wrapper on failure so both the plugin and its metrics (if any) are cleaned up.

-        ConfigProvider plugin = newVersionedPlugin(config, classPropertyName, null, ConfigProvider.class, classLoaderUsage, scanResult.configProviders());
+        ConfigProvider plugin = newVersionedPlugin(config, classPropertyName, null, ConfigProvider.class, classLoaderUsage, scanResult.configProviders());
+        Plugin<ConfigProvider> wrapped = Plugin.wrapInstance(
+                plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
@@
-        try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
-            plugin.configure(configProviderConfig);
-        }
-        return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
+        try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
+            wrapped.get().configure(configProviderConfig);
+        } catch (Throwable t) {
+            try { wrapped.close(); } catch (Exception e) {
+                log.warn("Failed to close config provider after configure failure for provider '{}'", providerName, e);
+            }
+            throw t;
+        }
+        return wrapped;

649-649: Use a more specific metrics “config” tag key

For better disambiguation across multiple providers, prefer providerPrefix (e.g., config.providers.file) as the metrics key instead of the generic config.providers.

-        return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
+        return Plugin.wrapInstance(plugin, metrics, providerPrefix, Map.of("provider", providerName));

If you apply the “wrap-before-configure” change above, apply the same key when creating wrapped.


633-633: Trim providerName to avoid subtle misconfig from whitespace

A minor hardening to tolerate “file, secret ” style lists.

-        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
+        String normalizedProvider = providerName.trim();
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + normalizedProvider;

Also use normalizedProvider in the metrics extra tag:

-        Map.of("provider", providerName)
+        Map.of("provider", normalizedProvider)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)

49-50: Wrapping provider via Plugin is correct; consider a metrics variant.

Using null Metrics keeps this suite focused on transform behavior. Optionally add a dedicated test with a Monitorable provider and a real Metrics instance to assert metrics wiring here too, or rely on PluginsTest for that coverage.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (1)

404-413: Strengthen metrics test by asserting default metric tags.

The test guarantees withPluginMetrics is called after configure. To fully validate the documented contract, also assert tags include config=config.providers, class=, and provider=.

Apply this diff to make provider name reusable and assert tags inside the monitorable provider:

@@
 public class PluginsTest {
+    private static final String MONITORABLE_PROVIDER_NAME = "monitorable";
@@
-        String providerName = "monitorable";
-        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
+        String providerName = MONITORABLE_PROVIDER_NAME;
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
@@
         Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
         assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
     }

And in the inner class:

@@
-    public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider {
+    public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider {
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            assertTrue(configured);
+            // Assert default tags
+            var metricName = metrics.metricName("dummy", "dummy", java.util.Map.of());
+            var tags = metricName.tags();
+            assertEquals("config.providers", tags.get("config"));
+            assertEquals(CustomMonitorableConfigProvider.class.getSimpleName(), tags.get("class"));
+            assertEquals(MONITORABLE_PROVIDER_NAME, tags.get("provider"));
+        }
-        @Override
-        public void withPluginMetrics(PluginMetrics metrics) {
-            assertTrue(configured);
-        }
     }
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)

32-35: Clarify metrics tag contract to match implementation.

Proposed Javadoc tweak in ConfigProvider.java:

- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
- * The following tags are automatically added to all metrics registered: <code>config</code> set to
- * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
- * and <code>provider</code> set to the provider name.
+ * <p>If the provider implements {@link org.apache.kafka.common.metrics.Monitorable} and registers metrics,
+ * Kafka will invoke {@code withPluginMetrics(...)}. Those metrics include default tags:
+ * <code>config</code>=<code>config.providers</code> and <code>class</code>=the provider’s simple class name.
+ * An additional <code>provider</code> tag (with the configured provider name) is only added when
+ * the provider is created via {@code Plugins#newConfigProvider(...)}.
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

275-280: Optional: wire Metrics to providers if available.

Passing null disables plugin metrics for monitorable providers. If a Metrics instance is accessible here, consider supplying it.

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)

65-68: Update Javadoc to reflect Plugin-wrapped providers.

Adjust parameter description to avoid confusion.

-     * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
+     * @param configProviderPlugins a Map of provider names to Plugin‑wrapped {@link ConfigProvider} instances.
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)

55-57: configure flag set is fine; consider asserting in tests.

Optionally verify configured == true where used.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)

2904-2927: Solid coverage for monitorable providers; add a tiny assertion and optional cleanup.

  • Consider asserting both provider names were instantiated via newConfigProvider for clarity.
  • Optionally call worker.stop() to release metrics resources.
@@
-        assertMetrics(metrics,
+        assertMetrics(metrics,
             1,
             expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable")));
         assertMetrics(metrics,
             1,
             expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2")));
+        verify(plugins, times(1)).newConfigProvider(any(AbstractConfig.class), eq("monitorable"), any(ClassLoaderUsage.class), any(Metrics.class));
+        verify(plugins, times(1)).newConfigProvider(any(AbstractConfig.class), eq("monitorable2"), any(ClassLoaderUsage.class), any(Metrics.class));
+        worker.stop();

2937-2951: Metric assertion works; make it resilient to future default tags.

If MetricConfig ever adds default tags, strict map equality could cause false negatives. Prefer containment.

-                if (expectedTags.equals(tags)) {
+                if (tags.entrySet().containsAll(expectedTags.entrySet())) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and b92a2bb.

📒 Files selected for processing (12)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (4)
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala (1)
  • MockFileConfigProvider (1636-1641)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
🔇 Additional comments (21)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)

22-24: Imports LGTM

New imports for Plugin and Metrics are necessary for wrapping config providers with metrics instrumentation.

clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (2)

20-21: Import addition is appropriate.

Required for Plugin.wrapInstance usage in setup.


54-59: Map.of migration looks good.

The switch to Map.of for inputs improves brevity with no behavior change; assertions remain valid.

Also applies to: 63-68, 72-75, 79-82, 86-89, 93-98, 102-105

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (4)

21-22: Import addition is appropriate.

Needed for Plugin-wrapped providers.


69-70: Supplying Plugin-wrapped provider is correct for the new API.

Null Metrics is fine for these tests; Worker interactions and restart scheduling are unaffected.


75-76: Map.of migration for transform inputs is fine.

Concise, immutable inputs; no change in semantics.

Also applies to: 100-101, 115-116, 122-123


150-158: Use of Map.of in ConfigData is appropriate.

Immutable payloads for test providers reduce accidental mutation. LGTM.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)

25-29: New imports are appropriate.

Required for Plugin-wrapped providers and metrics assertions.


381-402: Updated newConfigProvider test correctly adapts to Plugin wrapper.

Using plugin.get() for assertions maintains previous intent and preserves classloader checks.

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

26-26: Import looks correct for Plugin usage.

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (2)

60-69: Migration to Plugin-wrapped providers looks good.

Constructor and field types align with new Plugin model.

Please ensure all callers now pass Map<String, Plugin>.


98-105: Provider access via plugin.get() is correct; null-safe check present.

No further action; relies on providers returning non-null ConfigData.

Confirm all test providers return non-null ConfigData to avoid NPEs here.

clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)

35-38: Metrics wiring is fine for test purposes.

MetricName construction and Measurable registration look OK.

clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

23-23: Import addition is correct

Importing Plugin aligns with new provider-wrapping flow.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)

212-223: Cleanup of provider plugins on shutdown is correctly handled WorkerConfigTransformer.close() iterates over all configProviderPlugins and calls Utils.closeQuietly on each, and Worker.stop() invokes workerConfigTransformer.close() after stopping metrics, ensuring all provider plugins are closed.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)

40-47: Imports for Plugin-wrapped providers and metrics — OK.

These are the right types to support Plugin and monitorable providers.

Also applies to: 98-98, 117-117


2929-2935: Tag construction mirrors Plugin tag semantics — OK.

Using LinkedHashMap is fine; equality checks are order-insensitive anyway.


382-389: API consistency verified Plugins.newConfigProvider(AbstractConfig, String, ClassLoaderUsage, Metrics) is implemented in Plugins.java, invoked with four arguments in Worker.java, and correctly stubbed in WorkerTest. No changes needed.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3)

23-23: Import for Plugin — OK.

Required for the new wrapped provider map.


101-101: Close path now shuts down plugins and their metrics — good.

Closing the wrapper ensures both provider and plugin metrics are released.


46-52: ConfigTransformer constructor supports Map<String, Plugin>> Confirmed that ConfigTransformer defines a constructor taking Map<String, Plugin>, matching the new usage—no compilation issues.

Comment on lines +550 to 560
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Always close provider plugins on failure (use finally) to avoid leaks

If ConfigTransformer construction or transform throws, provider plugins aren’t closed. Wrap with try/finally and include provider name in close logs.

Apply this diff:

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins = Collections.emptyMap();
+        try {
+            providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            for (Map.Entry<String, Plugin<ConfigProvider>> e : providerPlugins.entrySet()) {
+                Utils.closeQuietly(e.getValue(), "config provider plugin '" + e.getKey() + "'");
+            }
+        }

If ConfigTransformer implements AutoCloseable, we should also close it. Check with:


🏁 Script executed:

#!/bin/bash
rg -nP 'class\s+ConfigTransformer\b.*implements\s+AutoCloseable' -g '!**/test/**'
rg -nP '\bvoid\s+close\s*\(' clients/ connect/

Length of output: 42678


🏁 Script executed:

find . -type f -name ConfigTransformer.java

Length of output: 77


🏁 Script executed:

#!/bin/bash
sed -n '1,200p' clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java

Length of output: 8388


Always close provider plugins on failure (use try/finally)
Wrap instantiation and transformation in a try/finally to guarantee plugins are closed on any exception—include the provider name in the close log. ConfigTransformer does not implement AutoCloseable, so it need not be closed.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins = Collections.emptyMap();
+        try {
+            providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            for (Map.Entry<String, Plugin<ConfigProvider>> e : providerPlugins.entrySet()) {
+                Utils.closeQuietly(e.getValue(), "config provider plugin '" + e.getKey() + "'");
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Map<String, Plugin<ConfigProvider>> providerPlugins = Collections.emptyMap();
try {
providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
} finally {
for (Map.Entry<String, Plugin<ConfigProvider>> e : providerPlugins.entrySet()) {
Utils.closeQuietly(e.getValue(), "config provider plugin '" + e.getKey() + "'");
}
}
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 550 to 560, the provider plugins are instantiated and transformed
but only closed after the success path; wrap the instantiation and the call to
ConfigTransformer.transform(indirectVariables) in a try/finally so that
providerPlugins.values() are closed in the finally block on any exception, and
when closing each plugin use Utils.closeQuietly(plugin, "config provider plugin
" + pluginName) or otherwise include the provider name in the close log;
ConfigTransformer need not be closed.

Comment on lines +41 to +48
public ConfigData get(String path) {
return null;
}

@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return non-null ConfigData from get methods to avoid NPEs.

ConfigTransformer expects a non-null ConfigData; current null return can crash tests using this provider.

     @Override
     public ConfigData get(String path) {
-        return null;
+        return new ConfigData(Map.of());
     }

     @Override
     public ConfigData get(String path, Set<String> keys) {
-        return null;
+        return new ConfigData(Map.of());
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public ConfigData get(String path) {
return null;
}
@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}
@Override
public ConfigData get(String path) {
return new ConfigData(Map.of());
}
@Override
public ConfigData get(String path, Set<String> keys) {
return new ConfigData(Map.of());
}
🤖 Prompt for AI Agents
In
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
around lines 41 to 48, the two get methods currently return null causing NPEs;
replace those null returns with non-null ConfigData instances (e.g., an empty
Map for data and a null/empty version) and in the keyed get(String path,
Set<String> keys) return a ConfigData whose data contains only the requested
keys (or an empty map if none), ensuring both methods always return a valid
ConfigData object.

Comment on lines +273 to 286
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Close provider plugins in a finally block and skip null plugins.

Prevents resource leaks if transform throws and avoids null values in the map.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
-        for (String name : providerNames) {
-            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
-                    this,
-                    name,
-                    Plugins.ClassLoaderUsage.PLUGINS,
-                    null
-            );
-            providerPlugins.put(name, configProviderPlugin);
-        }
-        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
-        Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
-        return transformed;
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+        try {
+            for (String name : providerNames) {
+                Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+                        this,
+                        name,
+                        Plugins.ClassLoaderUsage.PLUGINS,
+                        null
+                );
+                if (configProviderPlugin != null) {
+                    providerPlugins.put(name, configProviderPlugin);
+                }
+            }
+            ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+            return transformer.transform(props).data();
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
try {
for (String name : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
if (configProviderPlugin != null) {
providerPlugins.put(name, configProviderPlugin);
}
}
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
return transformer.transform(props).data();
} finally {
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 273 to 286, the provider plugins are added to the map and closed
after transform, but if transformer.transform(props) throws you leak resources
and you may insert null plugins into the map; wrap plugin creation/transform
usage in a try/finally where you close all created plugins in the finally block,
and when constructing the map skip adding null Plugin instances (i.e., only put
non-null plugins), and when closing iterate and skip nulls to avoid NPEs.

}

public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

API change: verify all call sites and document behavior

Method now returns Plugin<ConfigProvider> and requires Metrics. Ensure all usages are updated and that release notes call out this source/binary change.

Run to find call sites and check that metrics are threaded through:


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# All invocations of newConfigProvider
rg -nP --type=java -C2 '\bnewConfigProvider\s*\('

# Quick sanity: confirm files calling it also reference Metrics
rg -nP --type=java -C2 '\bnewConfigProvider\s*\(' \
  | sed -n 's/^\([^:]*\):.*/\1/p' | sort -u \
  | xargs -I{} rg -nP --type=java -H '\bMetrics\b|\bmetrics\b' {}

Length of output: 17259


🏁 Script executed:

#!/usr/bin/env bash
set -euo pipefail

# List all non-test Java files invoking newConfigProvider
files=$(rg -lP --type=java 'newConfigProvider\s*\(' | grep -v '/src/test/' || true)

for file in $files; do
  echo "=== $file ==="
  rg -nP --type=java -C3 'newConfigProvider' "$file"
  echo
done

Length of output: 1541


🏁 Script executed:

#!/usr/bin/env bash
set -euo pipefail

echo "MirrorMakerConfig.java (lines 270–285):"
sed -n '270,285p' connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
echo
echo "Worker.java (lines 210–225):"
sed -n '210,225p' connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

Length of output: 1669


Pass a real Metrics to newConfigProvider in MirrorMakerConfig

  • In connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (around line 275), you’re invoking
    plugins.newConfigProvider(this, name, Plugins.ClassLoaderUsage.PLUGINS, null)
    — replace the null with a valid Metrics instance (e.g. create or inject a ConnectMetrics.metrics() object) to avoid NPEs and enable metrics collection.
  • Document in the release notes that newConfigProvider now requires a Metrics argument, as this is a breaking API change.
🤖 Prompt for AI Agents
In
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
around line 632, the newConfigProvider signature now requires a non-null Metrics
parameter; update callers (notably
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around line ~275) to pass a real Metrics instance instead of null (e.g., obtain
or inject a ConnectMetrics/ConnectMetrics.metrics() instance or create a Metrics
from the current ConnectMetrics context), add necessary imports and wiring to
ensure the Metrics object is available where MirrorMakerConfig constructs the
provider, and update the release notes to state that newConfigProvider now
requires a Metrics argument (breaking API change).

@DDShantanuBadmanji
Copy link
Author

DDShantanuBadmanji commented Sep 30, 2025

/refacto-bot

@refacto-test
Copy link

refacto-test bot commented Sep 30, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Sep 30, 2025

Multi-Domain Review: Connect Compatibility

👍 Well Done
Plugin Metrics Integration

Added metrics for monitorable config providers with standardized tags.

Java 8 Compatibility

Refactored code to maintain backward compatibility with Java 8.

📌 Files Processed
  • .asf.yaml
  • LICENSE-binary
  • checkstyle/import-control.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/PrivilegedWriteException.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartsAndStops.java
  • checkstyle/suppressions.xml
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/TestableSourceConnector.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
  • coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
  • coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
  • coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
  • core/src/main/java/kafka/log/remote/RemoteLogManager.java
  • core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java
  • core/src/main/java/kafka/log/remote/RemoteLogReader.java
  • core/src/main/java/kafka/server/QuotaFactory.java
  • core/src/main/java/kafka/server/TierStateMachine.java
  • core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
  • core/src/main/scala/kafka/admin/ConfigCommand.scala
  • core/src/main/scala/kafka/cluster/Partition.scala
  • core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
  • core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
  • core/src/main/scala/kafka/log/LogManager.scala
  • core/src/main/scala/kafka/network/RequestChannel.scala
  • core/src/main/scala/kafka/network/SocketServer.scala
  • core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
  • core/src/main/scala/kafka/raft/RaftManager.scala
  • core/src/main/scala/kafka/server/AclApis.scala
  • core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
  • core/src/main/scala/kafka/server/ApiVersionManager.scala
  • core/src/main/scala/kafka/server/AuthHelper.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java
  • core/src/main/scala/kafka/server/BrokerServer.scala
  • core/src/main/scala/kafka/server/ConfigHandler.scala
  • core/src/main/scala/kafka/server/ControllerApis.scala
  • core/src/main/scala/kafka/server/ControllerServer.scala
  • core/src/main/scala/kafka/server/DelayedElectLeader.scala
  • core/src/main/scala/kafka/server/DelegationTokenManager.scala
  • core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
  • core/src/main/scala/kafka/server/DynamicConfig.scala
  • core/src/main/scala/kafka/server/FetchSession.scala
  • core/src/main/scala/kafka/server/ForwardingManager.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java
  • core/src/main/scala/kafka/server/KafkaApis.scala
  • core/src/main/scala/kafka/server/KafkaBroker.scala
  • core/src/main/scala/kafka/server/KafkaConfig.scala
  • core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  • core/src/main/scala/kafka/server/ReplicaManager.scala
  • core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
  • core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
  • core/src/main/scala/kafka/tools/TestRaftServer.scala
  • core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
  • core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
  • core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
  • core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
  • core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
  • core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
  • core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  • core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
  • core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
  • core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  • core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala
  • core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
  • clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
  • core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
  • core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
  • core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
  • core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
  • core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
  • core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
  • core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
  • core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
  • core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
  • core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
  • clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
  • core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
  • core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
  • core/src/test/scala/unit/kafka/log/LogTestUtils.scala
  • core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
  • core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
  • core/src/test/scala/unit/kafka/network/ProcessorTest.scala
  • core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  • core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
  • core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
  • core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
  • README.md
  • clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
  • core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
  • core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
  • core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
  • core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
  • core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  • core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
  • clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  • core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
  • core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
  • core/src/test/scala/unit/kafka/utils/TestUtils.scala
  • docker/README.md
  • docker/native/README.md
  • docker/native/native-image-configs/resource-config.json
  • docs/upgrade.html
  • gradle/dependencies.gradle
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
  • jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
  • metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  • metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
  • metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
  • metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java
  • metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
  • metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  • metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
  • raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
  • server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
  • server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
  • server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
  • server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  • server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
  • server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java
  • server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunction.java
  • server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
  • server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
  • server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
  • server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
  • server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerUtilsTest.java
  • server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java
  • server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
  • storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
  • storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
  • storage/src/main/java/org/apache/kafka/server/log/remote/TopicPartitionLog.java
  • storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
  • storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
  • storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
  • storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
  • storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
  • streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
  • streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
  • streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  • streams/src/main/java/org/apache/kafka/streams/query/Position.java
  • streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
  • streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
  • test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
  • test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
  • tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
  • tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
  • tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
  • tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
  • tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
  • tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
  • tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
  • tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
  • tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
  • tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
  • trogdor/README.md
  • bin/kafka-run-class.sh
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
  • clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
  • clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
  • clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
  • clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java
  • build.gradle
  • clients/src/main/java/org/apache/kafka/common/errors/ShareSessionLimitReachedException.java
  • clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
  • clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  • clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
  • clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
  • clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
  • clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
  • clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  • clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
  • checkstyle/import-control-clients-integration-tests.xml
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
  • clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProvider.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProviderTest.java
  • checkstyle/import-control-core.xml
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsSource.java
  • connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
  • connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
  • checkstyle/import-control-metadata.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SessionKey.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibility.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
  • checkstyle/import-control-server.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
  • checkstyle/import-control-storage.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
📝 Additional Comments
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java (1)
Verbose Switch Statements

The code has been converted from a concise switch expression to a more verbose traditional switch statement. While this maintains Java 8 compatibility, it makes the code less readable and more error-prone due to the need for break statements.

Standards:

  • Clean Code - Keep Code Concise
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)
Verbose Switch Statements

The code has been converted from a concise switch expression to a more verbose traditional switch statement. While this maintains Java 8 compatibility, it makes the code less readable and more error-prone.

Standards:

  • Clean Code - Keep Code Concise
connect/runtime/src/test/java/org/apache/kafka/connect/integration/TestableSourceConnector.java (2)
Verbose Switch Statements

The code has been converted from a concise switch expression to a more verbose traditional switch statement. While this maintains Java 8 compatibility, it makes the code less readable and more error-prone.

Standards:

  • Clean Code - Keep Code Concise
Verbose Switch Statements

The code has been converted from a concise switch expression to a more verbose traditional switch statement. While this maintains Java 8 compatibility, it makes the code less readable and more error-prone.

Standards:

  • Clean Code - Keep Code Concise
connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartsAndStops.java (1)
Record to Class Conversion

The code has been converted from a concise Java record to a traditional class with explicit getters. While this maintains Java 8 compatibility, it makes the code more verbose and less maintainable.

Standards:

  • Java Best Practices - Complete Value Objects
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java (1)
Record to Class Conversion

The code has been converted from a concise Java record to a traditional class with explicit getters, equals, and hashCode methods. While this maintains Java 8 compatibility, it makes the code more verbose and less maintainable.

Standards:

  • Java Best Practices - Complete Value Objects

*/
public class MemoryConfigBackingStore implements ConfigBackingStore {

private static final Logger log = LoggerFactory.getLogger(MemoryConfigBackingStore.class);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Null Check

Logger is defined but never used in the class. This suggests incomplete implementation or forgotten code that could lead to reliability issues when debugging is needed.

Standards
  • Clean Code - Remove unused variables

ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);

assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order


ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order


ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

Comment on lines 918 to 919
assertEquals(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY), SERIALIZED_KEY);
assertEquals(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD), SERIALIZED_RECORD);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

Comment on lines 400 to 402
assertEquals(configuration.errorRetryTimeout(), ERRORS_RETRY_TIMEOUT_DEFAULT);
assertEquals(configuration.errorMaxDelayInMillis(), ERRORS_RETRY_MAX_DELAY_DEFAULT);
assertEquals(configuration.errorToleranceType(), ERRORS_TOLERANCE_DEFAULT);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

Comment on lines 417 to 423
assertEquals(configuration.errorRetryTimeout(), 100);

configuration = config(singletonMap(ERRORS_RETRY_MAX_DELAY_CONFIG, "100"));
assertEquals(100, configuration.errorMaxDelayInMillis());
assertEquals(configuration.errorMaxDelayInMillis(), 100);

configuration = config(singletonMap(ERRORS_TOLERANCE_CONFIG, "none"));
assertEquals(ToleranceType.NONE, configuration.errorToleranceType());
assertEquals(configuration.errorToleranceType(), ToleranceType.NONE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

Comment on lines 235 to 238
assertEquals(configuration.dlqTopicName(), DLQ_TOPIC);

configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "7"));
assertEquals(7, configuration.dlqTopicReplicationFactor());
assertEquals(configuration.dlqTopicReplicationFactor(), 7);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class);
verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull());
assertEquals("Connector " + CONNECTOR_NAME + " not found", exceptionCaptor.getValue().getMessage());
assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

Comment on lines 902 to 905
cause.getMessage(),
"Connector configuration is invalid and contains the following 1 error(s):\n" +
error + "\n" +
"You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

Comment on lines 64 to 65
assertEquals(SSLUtils.getOrDefault(map, existingKey, defaultValue), value);
assertEquals(SSLUtils.getOrDefault(map, missingKey, defaultValue), defaultValue);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Equals Order

The assertEquals parameters are reversed from standard order. In JUnit, the expected value should be the first parameter and the actual value second. This reversal makes test failure messages confusing.

Standards
  • JUnit Best Practices - Parameter Order

Comment on lines +382 to 390
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(mockFileConfigProvider,
null,
WorkerConfig.CONFIG_PROVIDERS_CONFIG,
Map.of("provider", "file"));
when(plugins.newConfigProvider(any(AbstractConfig.class),
eq("config.providers.file"),
any(ClassLoaderUsage.class)))
.thenReturn(mockFileConfigProvider);
eq("file"),
any(ClassLoaderUsage.class),
any(Metrics.class))).thenReturn(providerPlugin);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant Type Casting

The any(Metrics.class) parameter is redundant as it can be replaced with any() since the method signature already defines the type. This makes the code less maintainable if the method signature changes.

Standards
  • Clean Code - Avoid Redundant Type Information

Comment on lines 379 to 381
return classLoader -> {
return new LoaderSwap(Thread.currentThread().getContextClassLoader());
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary Parentheses

The lambda expression has been expanded with unnecessary braces and return statement. This makes the code more verbose without adding any value or clarity.

Standards
  • Clean Code - Keep Code Concise

private String executePut(URI serverUrl, String endpoint, String jsonBody) throws IOException {
HttpPut request = new HttpPut(endpoint);
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8);
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Character Encoding Issue

The code is changed to use StandardCharsets.UTF_8.name() instead of passing the Charset object directly. This is less reliable as it introduces a string conversion that could potentially cause issues if the StringEntity constructor changes its behavior.

Standards
  • Java Best Practices - Use Charset Objects Directly

@DDShantanuBadmanji
Copy link
Author

/refacto-bot

@DDShantanuBadmanji
Copy link
Author

/refacto-bot

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Code Review: Whitespace Changes

👍 Well Done
Clean Code Format

Maintaining consistent whitespace improves code readability.

📌 Files Processed
  • Vagrantfile
📝 Additional Comments
Vagrantfile (2)
Unnecessary Whitespace

The PR introduces an unnecessary blank line in the Vagrantfile. While this doesn't affect functionality, unnecessary whitespace changes can create noise in version control history and make future diffs harder to review.

Standards:

  • ISO-IEC-25010-Maintainability
Unnecessary Empty Line

An unnecessary empty line was added to the file. Removing superfluous whitespace keeps the code clean and consistent, improving readability and reducing visual noise.

Standards:

  • Clean-Code-Format
  • Refactoring-Whitespace

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

No review comments were generated during this review iteration of the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants