Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Oct 1, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Config providers now emit plugin metrics with standard tags, enabling improved monitoring in Kafka Connect and core configuration processing.
  • Documentation

    • Expanded guidance on ConfigProvider metrics and monitoring capabilities.
  • Refactor

    • Migrated config provider handling to a plugin-based approach across clients and Connect for consistent lifecycle and observability; no functional behavior changes.
  • Tests

    • Added tests validating plugin-wrapped providers, configuration transformation, and metrics emission.
  • Chores

    • Minor formatting/comment updates in development tooling files.

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

⚠️ Organization Review Limit Reached

Your organization has reached the review limit (30/30 reviews used).

To continue using Refacto code reviews, please upgrade your subscription.

@coderabbitai
Copy link

coderabbitai bot commented Oct 1, 2025

Walkthrough

Replaces direct ConfigProvider usage with Plugin-wrapped providers across clients and Connect. Updates constructors, method signatures, and control flow to pass Plugin into ConfigTransformer and lifecycle management to close plugins. Adds metrics wiring via Plugins.newConfigProvider, test coverage for monitorable providers, and minor docs/comment/style tweaks.

Changes

Cohort / File(s) Summary
Dev environment
Vagrantfile
Added a blank line and a cautionary comment about Vagrantfile API/syntax; no functional change.
Clients – config resolution
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java, clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java, clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
Switched variable resolution to use Plugin<ConfigProvider> maps; updated constructor/signatures and internal usage; close plugins instead of raw providers; added JavaDoc about monitorable metrics; imports adjusted.
Clients – tests
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java, clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
Tests updated to wrap providers with Plugin.wrapInstance; assertions adjusted (including empty TTLs); introduced MonitorableConfigProvider test class emitting a metric via PluginMetrics.
Connect runtime – core
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
Public API change: Plugins.newConfigProvider now returns Plugin<ConfigProvider> and accepts Metrics; providers are wrapped with metrics/tags; worker paths propagate plugins to transformers; lifecycle closes provider plugins.
Connect runtime – tests
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
Tests updated for plugin-wrapped providers and metrics; added test validating withPluginMetrics invocation and provider metrics/tagging; minor replacements of Collections.singletonMap with Map.of.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor App as Application
  participant AC as AbstractConfig
  participant ICP as instantiateConfigProviders
  participant CT as ConfigTransformer
  participant P as Plugin<ConfigProvider>
  participant CP as ConfigProvider

  App->>AC: resolveConfigVariables(originals)
  AC->>ICP: instantiateConfigProviders(...)
  ICP-->>AC: Map<String, Plugin<ConfigProvider>> (providerPlugins)

  alt providerPlugins is empty
    AC-->>App: originals (no transformation)
  else providerPlugins present
    AC->>CT: new ConfigTransformer(providerPlugins)
    loop each variable
      CT->>P: get()
      P-->>CT: CP
      CT->>CP: get(path, keys)
      CP-->>CT: ConfigData
    end
    CT-->>AC: transformed configs + TTLs
    AC-->>App: ResolvingMap (merged)
  end

  note over AC,P: On shutdown, close provider plugins
Loading
sequenceDiagram
  autonumber
  actor W as Worker
  participant Pl as Plugins
  participant M as Metrics
  participant L as PluginLoader
  participant P as Plugin<ConfigProvider>
  participant CP as ConfigProvider
  participant WCT as WorkerConfigTransformer

  W->>Pl: newConfigProvider(config, providerName, usage, metrics)
  Pl->>L: load provider class for providerName
  L-->>Pl: ConfigProvider instance
  Pl->>CP: configure(config subset)
  Pl->>P: Plugin.wrapInstance(CP, metrics, baseTag, {provider: name})
  P-->>Pl: Plugin<ConfigProvider>
  Pl-->>W: Plugin<ConfigProvider>

  W->>WCT: new WorkerConfigTransformer(worker, {name: P})
  WCT->>P: pass to ConfigTransformer(...)
  note over P,CP: Metrics tags applied (config/class/provider)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I thump my paws on wires and wood,
Wrapped the configs like a bunny should.
Plugins hop with metrics bright,
Tags and TTLs in moonlit night.
With gentle nudge, the flows align—
Carrots cached, the builds divine. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone kafka 18894” is vague and does not reflect the main change of migrating config providers to Plugin wrappers and updating related APIs across multiple modules, instead appearing to reference an issue number. It fails to convey the substantive alteration to config instantiation, transformation, and lifecycle management. A teammate scanning the commit history would not understand the purpose or scope of this pull request from the current title. Rename the pull request title to clearly summarize the primary change, for example “Migrate config providers to Plugin wrappers,” so that it accurately reflects the core API and implementation updates introduced by this PR.
Description Check ⚠️ Warning The pull request description remains the placeholder template text and has not been replaced with any details about what was changed, why, or how it was tested. As a result, reviewers lack context on the modifications, their rationale, and the testing strategy, and the description cannot serve as the eventual commit message. The author needs to complete the template with substantive information. Update the pull request description by replacing the placeholder text with a detailed summary of the changes, the rationale behind migrating to Plugin, the modules affected, and the testing strategy, following the repository’s description template.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural enhancement by wrapping ConfigProvider instances in a generic Plugin object. This change standardizes how configuration providers are loaded, configured, and managed throughout Kafka, particularly within Kafka Connect. A key benefit is the enablement of metrics registration for ConfigProvider's, allowing for improved monitoring and operational visibility. The modifications involve updating core configuration handling, plugin loading mechanisms, and associated test cases to ensure robust and observable configuration management.

Highlights

  • Standardized Plugin Management: ConfigProvider instances are now consistently managed using a generic Plugin wrapper, streamlining their lifecycle and integration.
  • Enhanced Observability with Metrics: ConfigProvider's can now register metrics by implementing the Monitorable interface, providing better insights into their operation within Kafka Connect.
  • Refactored Instantiation Logic: The process of instantiating and configuring ConfigProvider's has been updated across AbstractConfig, MirrorMakerConfig, and Worker to leverage the new Plugin wrapper and support metrics.
  • Comprehensive Test Updates: Extensive changes to unit and integration tests ensure the correct functionality and metrics reporting of the new ConfigProvider plugin architecture.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to expose metrics from ConfigProvider implementations by wrapping them in a Plugin object. This is a valuable addition for monitoring and observability. The changes are well-implemented, following existing patterns within the Kafka codebase. The refactoring is applied consistently across AbstractConfig, ConfigTransformer, MirrorMakerConfig, and Worker. The Worker class is correctly updated to wire in the metrics registry, enabling this feature in Kafka Connect. The new tests are comprehensive, covering both the metrics registration and the plugin lifecycle, ensuring the correctness of the implementation. I have reviewed the changes and found no issues of medium or higher severity. The code quality is excellent.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

273-282: Guard against null newConfigProvider results before inserting into the map

Plugins.newConfigProvider(...) may return null when a provider name lacks a corresponding ".class" config. Avoid inserting null entries and optionally log to aid debugging.

-        for (String name : providerNames) {
-            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+        for (String name : providerNames) {
+            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                     this,
                     name,
                     Plugins.ClassLoaderUsage.PLUGINS,
                     null
             );
-            providerPlugins.put(name, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPlugins.put(name, configProviderPlugin);
+            } else {
+                // optionally: log at debug/warn that provider 'name' is not configured
+            }
         }
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)

2904-2951: Start/stop Worker to exercise provider creation path and avoid flakiness

Currently the test does not call worker.start(); if provider instantiation moves to start(), this test may stop validating metrics. Start and stop the worker for stability.

-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
+        worker.start();
         Metrics metrics = worker.metrics().metrics();
         assertMetrics(metrics,
             1,
             expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable")));
         assertMetrics(metrics,
             1,
             expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2")));
+        worker.stop();

2937-2951: Relax tag equality to tolerate default/global metric context tags

MetricName.tags() may include global tags (e.g., from MetricsConfig). Comparing equality can make the test brittle. Check that expected tags are a subset.

-        for (MetricName metricName : metrics.metrics().keySet()) {
+        for (MetricName metricName : metrics.metrics().keySet()) {
             if (metricName.group().equals("plugins")) {
                 Map<String, String> tags = metricName.tags();
-                if (expectedTags.equals(tags)) {
+                if (tags.entrySet().containsAll(expectedTags.entrySet())) {
                     assertEquals(MonitorableConfigProvider.NAME, metricName.name());
                     assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description());
                     found++;
                 }
             }
         }
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (2)

98-105: Minor: simplify/clarify null checks

keysByPath cannot be null here; only providerPlugin needs a null check. Optional tidy-up:

-            Map<String, Set<String>> keysByPath = entry.getValue();
-            if (providerPlugin != null && keysByPath != null) {
+            Map<String, Set<String>> keysByPath = entry.getValue();
+            if (providerPlugin != null) {

95-113: TTL map may collide across providers with identical paths

ttls.put(path, ttl) can overwrite when multiple providers use the same path string. If downstream consumers rely on per-provider TTLs, consider scoping by provider+path.

-        Map<String, Long> ttls = new HashMap<>();
+        Map<String, Long> ttls = new HashMap<>();
         for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
             String providerName = entry.getKey();
             Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName);
             Map<String, Set<String>> keysByPath = entry.getValue();
             if (providerPlugin != null && keysByPath != null) {
                 for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
                     String path = pathWithKeys.getKey();
                     Set<String> keys = new HashSet<>(pathWithKeys.getValue());
                     ConfigData configData = providerPlugin.get().get(path, keys);
@@
-                    if (ttl != null && ttl >= 0) {
-                        ttls.put(path, ttl);
-                    }
+                    if (ttl != null && ttl >= 0) {
+                        // e.g., use providerName + ":" + path as key if appropriate for consumers of ConfigTransformerResult
+                        ttls.put(path, ttl);
+                    }

Note: apply only if ConfigTransformerResult consumers can handle disambiguated TTL keys.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 5989a26.

📒 Files selected for processing (13)
  • Vagrantfile (1 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
🔇 Additional comments (30)
Vagrantfile (1)

21-21: Comment adds useful guardrail.

Clear warning helps future edits avoid accidental API version changes.

clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (4)

20-20: LGTM! Plugin import added for wrapper support.

The import enables Plugin.wrapInstance usage in test setup, aligning with the plugin-based provider lifecycle management.


49-49: LGTM! Test setup correctly uses Plugin wrapper.

The test provider is now wrapped with Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"), matching the production code's plugin-based approach. Passing null for metrics is appropriate in tests unless metric behavior is being validated.


54-59: Good addition of TTL assertion.

The assertTrue(ttls.isEmpty()) assertion on line 58 validates that basic variable replacement doesn't produce TTL values. This strengthens test coverage.


63-68: LGTM! Consistent modernization to Map.of.

Replacing Collections.singletonMap with Map.of throughout the test methods improves readability and aligns with modern Java practices.

Also applies to: 72-75, 79-82, 86-89, 93-98, 102-105

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)

210-223: LGTM! Provider initialization migrated to plugin-based approach.

The initConfigTransformer method correctly:

  • Creates a Map<String, Plugin<ConfigProvider>> for plugin-wrapped providers (line 212)
  • Calls plugins.newConfigProvider with metrics support via metrics.metrics() (lines 214-219)
  • Passes the plugin map to WorkerConfigTransformer (line 222)

This enables proper lifecycle management and metrics wiring for config providers.

clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)

32-35: LGTM! Clear documentation for metrics support.

The added JavaDoc accurately describes how ConfigProvider implementations can enable metrics by implementing Monitorable, and specifies the automatic tags (config, class, provider) that will be added to all registered metrics.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (4)

21-21: LGTM! Plugin import added for test setup.

Enables Plugin.wrapInstance usage in the test configuration.


69-69: LGTM! Test setup correctly uses Plugin wrapper.

The test provider is wrapped with Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"), matching the production code changes. Passing null for metrics is appropriate for this test scenario.


75-75: LGTM! Consistent modernization to Map.of.

Test method inputs updated from verbose map constructions to clean Map.of syntax.

Also applies to: 100-100, 115-115, 122-122


150-157: LGTM! TestConfigProvider updated with Map.of.

The mock provider now returns ConfigData instances using Map.of for cleaner test data construction. This aligns with the modernization throughout the test suite.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3)

23-23: LGTM! Plugin import added.

Required for the plugin-based provider lifecycle management.


46-52: LGTM! Constructor correctly migrated to plugin-based providers.

The changes properly:

  • Update the field type to Map<String, Plugin<ConfigProvider>> (line 46)
  • Accept plugin-wrapped providers in the constructor (line 48)
  • Store the plugin map (line 50)
  • Pass the plugin map to ConfigTransformer (line 51)

This enables proper lifecycle management and metrics support for config providers.


100-102: LGTM! Plugin cleanup correctly implemented.

The close() method now closes plugin instances instead of raw providers. Since Plugin.close() handles both the wrapped ConfigProvider and any associated PluginMetrics, this ensures proper resource cleanup.

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)

275-280: Confirm classloader usage is intentional

Using Plugins.ClassLoaderUsage.PLUGINS here changes resolution compared to CURRENT_CLASSLOADER. Please confirm this matches MirrorMaker’s expected isolation behavior. If not, switch to CURRENT_CLASSLOADER.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)

382-390: Good migration to Plugin-wrapped provider in test setup

Wrapping MockFileConfigProvider with Plugin and stubbing the new signature keeps tests aligned with runtime changes. LGTM.

clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)

60-69: Migration to Plugin looks correct

Constructor and field updates are consistent; usage via providerPlugin.get() is appropriate.

clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)

29-58: LGTM! Well-designed test helper for metrics wiring.

The class correctly implements both ConfigProvider and Monitorable interfaces with appropriate stub methods for testing. The protected visibility of the configured field enables subclass verification patterns (used in PluginsTest.CustomMonitorableConfigProvider).

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3)

22-23: LGTM! Required imports for Plugin-based metrics wiring.


632-632: LGTM! API updated to support Plugin-wrapped providers with metrics.

The signature changes align with the broader refactoring:

  • Return type Plugin<ConfigProvider> enables automatic lifecycle management and metrics wiring
  • providerName parameter is clearer than the previous providerPrefix
  • Metrics parameter enables metrics for Monitorable providers

633-649: LGTM! Correct Plugin wrapping with appropriate tags.

The implementation correctly:

  • Derives providerPrefix from providerName (line 633)
  • Wraps the provider with Plugin.wrapInstance including metrics and proper tags (line 649)
  • Tags will be: {"config": "config.providers", "class": "<ClassName>", "provider": "<providerName>"}
  • Automatic withPluginMetrics invocation for Monitorable providers
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4)

25-28: LGTM! Required imports for Plugin-based testing.


380-402: LGTM! Test correctly updated for Plugin-wrapped API.

The test properly exercises the new signature:

  • Uses providerName parameter (clearer than providerPrefix)
  • Accesses provider via plugin.get()
  • Validates plugin classloader behavior

404-412: LGTM! Test validates metrics wiring order.

The test correctly verifies that:

  • MonitorableConfigProvider instances can be wrapped with metrics
  • The provider's withPluginMetrics is invoked by the Plugin wrapper

The test implicitly validates the call order through CustomMonitorableConfigProvider.withPluginMetrics(), which asserts that configure() was called first (lines 813-815).


810-816: LGTM! Test helper validates configure-before-metrics contract.

The assertion assertTrue(configured) (line 814) correctly validates that configure() is called before withPluginMetrics(), which is the expected order in the Plugins.newConfigProvider() implementation (configure at lines 646-648, wrap at line 649).

clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (5)

23-23: LGTM: Import added for Plugin wrapper.

The import is necessary to support wrapping ConfigProvider instances with Plugin for lifecycle and metrics management.


550-558: LGTM: Variable transformation adapts to plugin-wrapped providers.

The refactor correctly updates the variable name and type to Map<String, Plugin<ConfigProvider>>, and passes the plugin map to ConfigTransformer. The control flow and logic remain sound.


559-559: LGTM: Closing logic correctly handles plugin lifecycle.

The code properly closes each Plugin<ConfigProvider> instance, which will clean up both the wrapped provider (if AutoCloseable) and any associated plugin metrics.


598-602: LGTM: Method signature updated to return plugin-wrapped providers.

The return type correctly reflects the new pattern of wrapping ConfigProvider instances in Plugin for lifecycle and metrics management.


624-639: Remove metrics verification in AbstractConfig.wrapInstance
Passing null for the Metrics parameter in Plugin.wrapInstance(provider, null, …) is intentional and consistent—metrics are injected at higher layers via Plugins.newConfigProvider. No changes needed.

Likely an incorrect or invalid review comment.

@DDShantanuBadmanji
Copy link
Author

/refacto-bot

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Multi-Domain Review: Java Compatibility

👍 Well Done
Plugin Metrics Implementation

Added metrics for ConfigProvider plugins improving observability.

Backward Compatibility

Maintained Java 8 compatibility while modernizing code.

📌 Files Processed
  • .asf.yaml
  • LICENSE-binary
  • checkstyle/import-control-storage.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/PrivilegedWriteException.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
  • checkstyle/import-control.xml
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartsAndStops.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/TestableSourceConnector.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
  • checkstyle/suppressions.xml
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
  • coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
  • coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
  • coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
  • core/src/main/java/kafka/log/remote/RemoteLogManager.java
  • core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java
  • core/src/main/java/kafka/log/remote/RemoteLogReader.java
  • core/src/main/java/kafka/server/QuotaFactory.java
  • core/src/main/java/kafka/server/TierStateMachine.java
  • core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
  • core/src/main/scala/kafka/admin/ConfigCommand.scala
  • core/src/main/scala/kafka/cluster/Partition.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
  • core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
  • core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
  • core/src/main/scala/kafka/log/LogManager.scala
  • core/src/main/scala/kafka/network/RequestChannel.scala
  • core/src/main/scala/kafka/network/SocketServer.scala
  • core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
  • core/src/main/scala/kafka/raft/RaftManager.scala
  • core/src/main/scala/kafka/server/AclApis.scala
  • core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
  • core/src/main/scala/kafka/server/ApiVersionManager.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
  • core/src/main/scala/kafka/server/AuthHelper.scala
  • core/src/main/scala/kafka/server/BrokerServer.scala
  • core/src/main/scala/kafka/server/ConfigHandler.scala
  • core/src/main/scala/kafka/server/ControllerApis.scala
  • core/src/main/scala/kafka/server/ControllerServer.scala
  • core/src/main/scala/kafka/server/DelayedElectLeader.scala
  • core/src/main/scala/kafka/server/DelegationTokenManager.scala
  • core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
  • core/src/main/scala/kafka/server/DynamicConfig.scala
  • core/src/main/scala/kafka/server/FetchSession.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java
  • core/src/main/scala/kafka/server/ForwardingManager.scala
  • core/src/main/scala/kafka/server/KafkaApis.scala
  • core/src/main/scala/kafka/server/KafkaBroker.scala
  • core/src/main/scala/kafka/server/KafkaConfig.scala
  • core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  • core/src/main/scala/kafka/server/ReplicaManager.scala
  • core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
  • core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
  • core/src/main/scala/kafka/tools/TestRaftServer.scala
  • core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java
  • core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
  • core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
  • core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
  • core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
  • core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
  • core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  • core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
  • core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
  • core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  • core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
  • core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
  • core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
  • core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
  • core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
  • core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
  • core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
  • core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
  • core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
  • core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
  • core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
  • clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
  • core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
  • core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
  • core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
  • core/src/test/scala/unit/kafka/log/LogTestUtils.scala
  • core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
  • core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
  • core/src/test/scala/unit/kafka/network/ProcessorTest.scala
  • core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  • core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
  • core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
  • README.md
  • clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
  • core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
  • core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
  • core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
  • core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
  • core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
  • core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  • clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
  • core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
  • core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
  • core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
  • core/src/test/scala/unit/kafka/utils/TestUtils.scala
  • docker/README.md
  • docker/native/README.md
  • docker/native/native-image-configs/resource-config.json
  • docs/upgrade.html
  • gradle/dependencies.gradle
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
  • clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
  • jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
  • metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  • metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
  • metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
  • metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
  • metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
  • metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  • metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
  • raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
  • server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
  • server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
  • server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java
  • server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java
  • server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
  • server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java
  • server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunction.java
  • server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
  • server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
  • server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  • server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
  • server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerUtilsTest.java
  • server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java
  • server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
  • storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
  • storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
  • storage/src/main/java/org/apache/kafka/server/log/remote/TopicPartitionLog.java
  • storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
  • storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
  • storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
  • storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
  • storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
  • streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
  • streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
  • streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  • streams/src/main/java/org/apache/kafka/streams/query/Position.java
  • streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
  • streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
  • test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
  • test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
  • tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
  • tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
  • tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
  • tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
  • tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
  • tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
  • tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
  • tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
  • tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
  • tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
  • trogdor/README.md
  • Vagrantfile
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
  • clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
  • clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
  • clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
  • bin/kafka-run-class.sh
  • clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java
  • clients/src/main/java/org/apache/kafka/common/errors/ShareSessionLimitReachedException.java
  • clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
  • clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  • clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
  • clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
  • clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
  • clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
  • clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  • clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  • build.gradle
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
  • clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProvider.java
  • checkstyle/import-control-clients-integration-tests.xml
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProviderTest.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsSource.java
  • connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
  • connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
  • checkstyle/import-control-core.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SessionKey.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibility.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  • checkstyle/import-control-metadata.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
  • checkstyle/import-control-server.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<>() {
ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java 8 Compatibility

Diamond operator without explicit type parameters is not compatible with Java 8. This syntax requires Java 9+, but the codebase appears to target Java 8 compatibility.

Standards
  • Java 8 Compatibility
  • Language Version Compliance

})
.sorted(Comparator.comparing(docInfo -> docInfo.predicateName))
.toList();
.collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java 8 Stream API

The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.

Standards
  • Java 8 Compatibility
  • Stream API Compatibility

public class TransformationDoc {

private record DocInfo(String transformationName, String overview, ConfigDef configDef) {
private static final class DocInfo {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java Record Replacement

Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. Using records will cause compilation failures in Java 8 environments.

Standards
  • Java 8 Compatibility
  • Language Feature Compatibility

* the connector.
*/
public record ConnectorTaskId(String connector, int task) implements Serializable, Comparable<ConnectorTaskId> {
public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java Record Usage

Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.

Standards
  • Java 8 Compatibility
  • Language Feature Compatibility

}

private record TestDTO(String content) {
private static class TestDTO {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java Record Usage

Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.

Standards
  • Java 8 Compatibility
  • Language Feature Compatibility

package org.apache.kafka.connect.integration;

public record StartsAndStops(int starts, int stops) {
public class StartsAndStops {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java Record Usage

Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.

Standards
  • Java 8 Compatibility
  • Language Feature Compatibility

Comment on lines 137 to 142
switch (supportLevel) {
case EXACTLY_ONCE_SUPPORTED:
return ExactlyOnceSupport.SUPPORTED;
case EXACTLY_ONCE_UNSUPPORTED:
return ExactlyOnceSupport.UNSUPPORTED;
case EXACTLY_ONCE_FAIL:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch Expression Usage

Switch expressions were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.

Standards
  • Java 8 Compatibility
  • Language Feature Compatibility

Comment on lines 153 to 158
switch (supportLevel) {
case TRANSACTION_BOUNDARIES_SUPPORTED:
return ConnectorTransactionBoundaries.SUPPORTED;
case TRANSACTION_BOUNDARIES_FAIL:
throw new ConnectException("oh no :(");
case TRANSACTION_BOUNDARIES_NULL:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch Expression Usage

Switch expressions were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.

Standards
  • Java 8 Compatibility
  • Language Feature Compatibility

"simulated-task-producer-" + CONNECTOR_NAME + "-" + i,
Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i)
)).toList();
)).collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList Usage

The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.

Standards
  • Java 8 Compatibility
  • Stream API Compatibility

private static String formatAssignment(Map<String, Collection<String>> assignment) {
StringBuilder result = new StringBuilder();
for (String worker : assignment.keySet().stream().sorted().toList()) {
for (String worker : assignment.keySet().stream().sorted().collect(Collectors.toList())) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList Usage

The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.

Standards
  • Java 8 Compatibility
  • Stream API Compatibility

.map(ConfigValueInfo::errors)
.flatMap(Collection::stream)
.toList();
.collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList Usage

The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.

Standards
  • Java 8 Compatibility
  • Stream API Compatibility

Comment on lines 599 to 603
String expectedType;
switch (connectorType) {
case SINK:
expectedType = "sink";
break;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch Expression Usage

Switch expressions were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.

Standards
  • Java 8 Compatibility
  • Language Feature Compatibility

.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.toList();
.collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList Usage

The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.

Standards
  • Java 8 Compatibility
  • Stream API Compatibility

.filter(path -> !path.toFile().getName().endsWith(".java"))
.filter(path -> !removeRuntimeClasses.test(path.toFile().getName()))
.toList();
.collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList Usage

The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.

Standards
  • Java 8 Compatibility
  • Stream API Compatibility

"\"config\": {}" +
"}";
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8);
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringEntity Charset

The StringEntity constructor with Charset parameter was introduced in HttpClient 4.3+. For compatibility with older versions, the charset name string should be used instead.

Standards
  • API Compatibility
  • HttpClient Compatibility

private String executePut(URI serverUrl, String endpoint, String jsonBody) throws IOException {
HttpPut request = new HttpPut(endpoint);
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8);
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringEntity Charset

The StringEntity constructor with Charset parameter was introduced in HttpClient 4.3+. For compatibility with older versions, the charset name string should be used instead.

Standards
  • API Compatibility
  • HttpClient Compatibility

ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);

assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Order

The assertEquals method parameters are in the wrong order. The expected value should be the first parameter, and the actual value should be the second parameter. This can lead to confusing error messages when tests fail.

Standards
  • JUnit Best Practices
  • Test Readability


ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Order

The assertEquals method parameters are in the wrong order. The expected value should be the first parameter, and the actual value should be the second parameter. This can lead to confusing error messages when tests fail.

Standards
  • JUnit Best Practices
  • Test Readability


ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Order

The assertEquals method parameters are in the wrong order. The expected value should be the first parameter, and the actual value should be the second parameter. This can lead to confusing error messages when tests fail.

Standards
  • JUnit Best Practices
  • Test Readability

@refacto-visz
Copy link

refacto-visz bot commented Oct 1, 2025

⚠️ No Active Seat

The user @DDShantanuBadmanji does not have an active seat allocated for code reviews.

Please contact your organization admin to assign a seat or upgrade your subscription.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants