Skip to content

Conversation

@DDShantanuBadmanji
Copy link

@DDShantanuBadmanji DDShantanuBadmanji commented Oct 1, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Config providers are now loaded as plugins and can expose metrics with standard tags, improving observability across Kafka Connect and MirrorMaker.
    • Safer lifecycle management when resolving and closing providers.
  • Documentation

    • Clarified guidance for implementing metrics in config providers.
  • Tests

    • Added tests for monitorable config providers and plugin metrics integration.
  • Refactor

    • Migrated internals to plugin-based provider handling across components.
  • Chores

    • Minor formatting/comment update in development environment configuration.

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@coderabbitai
Copy link

coderabbitai bot commented Oct 1, 2025

Walkthrough

The change replaces direct usage of ConfigProvider instances with Plugin-wrapped providers across core config resolution and Connect runtime. Constructors, fields, and call sites now accept Map<String, Plugin>. Plugins.newConfigProvider wraps providers with metrics and tags. Tests updated accordingly; a monitorable test provider class and related tests were added. Minor Vagrantfile comment tweak.

Changes

Cohort / File(s) Summary
Dev Env
Vagrantfile
Inserted a blank line after require and added a cautionary API/syntax comment.
Core Config Pluginization
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java, clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java, clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
Switched from Map<String, ConfigProvider> to Map<String, Plugin> for provider instantiation, wiring, and closing; updated constructor and usage in ConfigTransformer; added Javadoc about Monitorable metrics to ConfigProvider.
Connect Runtime Pluginization
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java, connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
Updated APIs and internals to use Plugin; Plugins.newConfigProvider now returns Plugin and accepts Metrics; Worker and WorkerConfigTransformer store and pass plugin-wrapped providers; cleanup adjusted to close plugins.
MirrorMaker Integration
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
Replaced direct providers with Plugin map; constructed ConfigTransformer with plugins; updated close semantics.
Tests: Core
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
Wrapped test provider with Plugin.wrapInstance and switched to Map.of(...) usage.
Tests: Connect Runtime
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
Switched map constructions to Map.of(...); minor return constructions updated accordingly.
Tests: Monitorable Provider
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java, connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
Added a Monitorable test provider; added tests validating metrics wrapping/tags and Plugins.newConfigProvider behavior with Metrics and plugin class loader; introduced a CustomMonitorableConfigProvider for assertions.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Admin as Admin/Worker
  participant Plugins as Plugins
  participant PluginWrap as Plugin<ConfigProvider>
  participant CT as ConfigTransformer
  participant CP as ConfigProvider

  Admin->>Plugins: newConfigProvider(config, providerName, usage, metrics)
  Plugins->>Plugins: Load provider class and configure
  Plugins-->>PluginWrap: wrapInstance(provider, metrics, "config.providers", tags)
  Admin->>CT: new ConfigTransformer(Map<name, Plugin<ConfigProvider>>)
  Admin->>CT: transform(props, configProviders, secretRefs)
  CT->>PluginWrap: get()
  PluginWrap-->>CT: CP instance
  CT->>CP: get(path[, keys])
  CP-->>CT: ConfigData
  CT-->>Admin: Transformed configs
  Admin->>PluginWrap: close (via Utils.closeQuietly)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I nibbled through configs, byte by byte,
Wrapped the providers snug and tight.
Plugins hopped in, with metrics tags,
Worker and Mirror shook out the lags.
Now secrets bloom where carrots hide—
Transform, resolve, then bound with pride. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone kafka 18894” does not describe the actual changes in this pull request and instead references a generic issue or task number, making it unclear to reviewers what the PR accomplishes and which modules or APIs are affected. Please revise the title to a concise, descriptive summary of the primary change, for example “Refactor ConfigProvider usage to Plugin across core and connect modules,” so that reviewers immediately understand the scope and purpose of the pull request.
Description Check ⚠️ Warning The pull request description remains the unchanged placeholder template text and does not provide any details about the actual code modifications, objectives, or testing strategy, leaving required sections such as change summary and test plan entirely blank. Please replace the placeholder text with a detailed description that outlines the refactoring of ConfigProvider to use Plugin, enumerates the affected files and modules, and summarizes the testing approach including any new or updated unit tests introduced for plugin-based configuration providers.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.89% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-18894

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.5)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant refactoring of how ConfigProvider instances are managed and instantiated within Kafka, particularly within the Connect framework. By wrapping ConfigProvider instances in a new Plugin object, the changes lay the groundwork for improved observability and lifecycle management. A key benefit is the integration of metrics for ConfigProvider implementations, allowing them to be Monitorable and automatically tagged for easier monitoring. This change impacts core configuration handling and various Connect components, ensuring that ConfigProviders can be properly monitored.

Highlights

  • Plugin Wrapper for Config Providers: ConfigProvider instances are now wrapped in a new Plugin object, enabling enhanced management and functionality, particularly for metrics integration.
  • Metrics Integration for Config Providers: The ConfigProvider interface now supports implementing the Monitorable interface, allowing for the exposure of metrics with automatic tagging (e.g., config, class, provider) for better observability.
  • Refactoring of Config Provider Instantiation: The internal instantiation and handling of ConfigProvider instances have been updated across AbstractConfig, ConfigTransformer, and Kafka Connect's Worker and Plugins classes to accommodate the new Plugin<ConfigProvider> wrapper.
  • Test Suite Enhancements: Existing tests for ConfigTransformer and WorkerConfigTransformer have been updated to reflect the new Plugin usage, and a new MonitorableConfigProvider test class was added to validate the metrics integration.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Clone kafka 18894

TL;DR: Backports Java 8 compatibility changes by removing Java record classes, pattern matching, and other Java 11+ features.


Refacto PR Summary

This PR reverts Java 11+ language features to maintain Java 8 compatibility across the codebase.
The changes focus on replacing Java records with traditional classes, removing pattern matching switch expressions, and fixing other Java 11+ features. The PR primarily affects the Connect runtime module and core components, replacing records with equivalent classes that have explicit constructors, getters, equals/hashCode implementations, and proper serialization support. Additional changes include moving the RemoteLogManager from server to core package, fixing imports, and adjusting method signatures to use Short instead of int for API versions.

Change Highlights

Click to expand
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/RestClientTest.java: Replaced TestDTO record with equivalent class
  • connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java: Converted record to class with explicit equals/hashCode
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartsAndStops.java: Replaced record with standard class
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java: Replaced switch pattern matching with traditional switch
  • core/src/main/java/kafka/log/remote/RemoteLogManager.java: Added file that was previously in server package
  • coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java: Changed apiVersion from int to short
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java: Fixed generic type parameters

Sequence Diagram

sequenceDiagram
    participant Developer as Developer
    participant BuildSystem as Build System
    participant JavaCompiler as Java Compiler
    participant ConnectRuntime as Connect Runtime
    participant RemoteLogManager as RemoteLogManager
    
    Developer->>BuildSystem: Submit code with Java 8 compatible changes
    BuildSystem->>JavaCompiler: Compile with Java 8 target
    JavaCompiler-->>BuildSystem: Verify compatibility
    
    Note over ConnectRuntime: Replace records with classes
    Note over ConnectRuntime: Replace pattern matching
    Note over RemoteLogManager: Move from server to core package
    
    BuildSystem->>ConnectRuntime: Load Connect runtime components
    ConnectRuntime->>RemoteLogManager: Initialize remote log functionality
    RemoteLogManager-->>ConnectRuntime: Provide backward compatible implementation
    ConnectRuntime-->>BuildSystem: All components compatible with Java 8
    BuildSystem-->>Developer: Build successful
Loading

Testing Guide

Click to expand
  1. Verify the build works with JDK 8 by running: ./gradlew clean build -Djdk=1.8
  2. Test Connect runtime functionality by starting a Connect worker and verifying it can load connectors
  3. Test RemoteLogManager functionality by configuring a broker with remote storage and verifying log segments are properly copied and managed
  4. Verify API compatibility by testing client applications that use the modified APIs (particularly those using the coordinator-common components)
  5. Run integration tests for Connect to ensure the refactored classes maintain the same behavior as the original record implementations

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to expose metrics from ConfigProvider implementations by wrapping them in a new Plugin<T> class. This allows providers that implement the Monitorable interface to register their own metrics. The changes are well-implemented and consistent across the codebase. The Plugin wrapper is cleanly integrated, and relevant classes like AbstractConfig, ConfigTransformer, and Worker have been updated to use it. The lifecycle management, including closing providers and cleaning up metrics, is handled correctly. The addition of new tests to verify metrics registration and the lifecycle of Monitorable config providers is a great enhancement. Overall, this is a solid contribution that improves the observability of config providers, and I have no suggestions for improvement.

@DDShantanuBadmanji
Copy link
Author

/refacto-bot

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Multi-Domain Review: Connect

👍 Well Done
Plugin Architecture Implementation

Improved config provider loading as plugins with metrics support enhances observability.

Backward Compatibility

Careful refactoring maintains API compatibility while modernizing the codebase.

📌 Files Processed
  • .asf.yaml
  • LICENSE-binary
  • checkstyle/import-control-storage.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/PrivilegedWriteException.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
  • checkstyle/import-control.xml
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartsAndStops.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/integration/TestableSourceConnector.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
  • checkstyle/suppressions.xml
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
  • coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
  • coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
  • coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
  • core/src/main/java/kafka/log/remote/RemoteLogManager.java
  • core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java
  • core/src/main/java/kafka/log/remote/RemoteLogReader.java
  • core/src/main/java/kafka/server/QuotaFactory.java
  • core/src/main/java/kafka/server/TierStateMachine.java
  • core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
  • core/src/main/scala/kafka/admin/ConfigCommand.scala
  • core/src/main/scala/kafka/cluster/Partition.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
  • core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
  • core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
  • core/src/main/scala/kafka/log/LogManager.scala
  • core/src/main/scala/kafka/network/RequestChannel.scala
  • core/src/main/scala/kafka/network/SocketServer.scala
  • core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
  • core/src/main/scala/kafka/raft/RaftManager.scala
  • core/src/main/scala/kafka/server/AclApis.scala
  • core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
  • core/src/main/scala/kafka/server/ApiVersionManager.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
  • core/src/main/scala/kafka/server/AuthHelper.scala
  • core/src/main/scala/kafka/server/BrokerServer.scala
  • core/src/main/scala/kafka/server/ConfigHandler.scala
  • core/src/main/scala/kafka/server/ControllerApis.scala
  • core/src/main/scala/kafka/server/ControllerServer.scala
  • core/src/main/scala/kafka/server/DelayedElectLeader.scala
  • core/src/main/scala/kafka/server/DelegationTokenManager.scala
  • core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
  • core/src/main/scala/kafka/server/DynamicConfig.scala
  • core/src/main/scala/kafka/server/FetchSession.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java
  • core/src/main/scala/kafka/server/ForwardingManager.scala
  • core/src/main/scala/kafka/server/KafkaApis.scala
  • core/src/main/scala/kafka/server/KafkaBroker.scala
  • core/src/main/scala/kafka/server/KafkaConfig.scala
  • core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  • core/src/main/scala/kafka/server/ReplicaManager.scala
  • core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
  • core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
  • core/src/main/scala/kafka/tools/TestRaftServer.scala
  • core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java
  • core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
  • core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
  • core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
  • core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
  • core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
  • core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  • core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
  • core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
  • core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  • core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
  • core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
  • core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
  • core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
  • core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
  • core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
  • core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
  • core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
  • core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
  • core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
  • core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
  • clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
  • core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
  • core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
  • core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
  • core/src/test/scala/unit/kafka/log/LogTestUtils.scala
  • core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
  • core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
  • core/src/test/scala/unit/kafka/network/ProcessorTest.scala
  • core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  • core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
  • core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
  • README.md
  • clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
  • core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
  • core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
  • core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
  • core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
  • core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
  • core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  • clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
  • core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
  • core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
  • core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
  • core/src/test/scala/unit/kafka/utils/TestUtils.scala
  • docker/README.md
  • docker/native/README.md
  • docker/native/native-image-configs/resource-config.json
  • docs/upgrade.html
  • gradle/dependencies.gradle
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
  • clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
  • jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
  • metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  • metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
  • metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
  • metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
  • metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
  • metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  • metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
  • raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
  • server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
  • server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
  • server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
  • clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java
  • server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java
  • server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
  • server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java
  • server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunction.java
  • server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
  • server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
  • server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
  • server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  • server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
  • server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerUtilsTest.java
  • server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java
  • server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
  • storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
  • storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
  • storage/src/main/java/org/apache/kafka/server/log/remote/TopicPartitionLog.java
  • storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
  • storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
  • storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
  • storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
  • storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
  • streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
  • streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
  • streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  • streams/src/main/java/org/apache/kafka/streams/query/Position.java
  • streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
  • streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
  • test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
  • test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
  • tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
  • tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
  • tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
  • tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
  • tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
  • tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
  • tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
  • tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
  • tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
  • tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
  • tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
  • trogdor/README.md
  • Vagrantfile
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
  • clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
  • clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
  • clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
  • bin/kafka-run-class.sh
  • clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java
  • clients/src/main/java/org/apache/kafka/common/errors/ShareSessionLimitReachedException.java
  • clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
  • clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  • clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
  • clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
  • clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
  • clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
  • clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  • clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  • build.gradle
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
  • clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProvider.java
  • checkstyle/import-control-clients-integration-tests.xml
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProviderTest.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsSource.java
  • connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
  • connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
  • checkstyle/import-control-core.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SessionKey.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibility.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  • checkstyle/import-control-metadata.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
  • checkstyle/import-control-server.xml
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

public class TransformationDoc {

private record DocInfo(String transformationName, String overview, ConfigDef configDef) {
private static final class DocInfo {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Record to Class Conversion

Converting a Java record to a class loses the automatic generation of equals, hashCode, and toString methods. These methods should be implemented to maintain proper behavior in collections and debugging.

Standards
  • Clean Code
  • Java Best Practices
  • Object Contract

@@ -22,6 +22,9 @@
* Used when a write that requires {@link ConfigBackingStore#claimWritePrivileges() special privileges} fails
*/
public class PrivilegedWriteException extends ConnectException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Error Handling

The class is missing a constructor that takes only a message parameter. This limits error handling flexibility when detailed cause information isn't available or needed.

Standards
  • Complete Exception Hierarchy
  • Java Exception Best Practices

@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<>() {
ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diamond Operator Missing

The diamond operator is not being used correctly. In older Java versions, type parameters need to be explicitly specified in the constructor when using anonymous classes.

Standards
  • Java Backward Compatibility
  • Type Safety

Comment on lines 25 to 26
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging Missing Import

Logger imports were added but the class doesn't use them. Missing logger field initialization and usage leads to inconsistent logging across the codebase.

Standards
  • Consistent Logging
  • Resource Management

TestPlugins.noOpLoaderSwap());
TransformationChain<T, R> realTransformationChainRetriableException = new TransformationChain<>(List.of(stage), toleranceOperator);
return Mockito.spy(realTransformationChainRetriableException);
TransformationChain<T, R> realTransformationChainRetriableException = new TransformationChain(List.of(stage), toleranceOperator);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant Type Parameters

The code uses raw types in the constructor but maintains generic types in the variable declaration. This inconsistency can lead to type safety issues and compiler warnings.

Standards
  • Type Safety
  • Consistent Generic Usage

case EXACTLY_ONCE_FAIL -> throw new ConnectException("oops");
default -> null;
};
switch (supportLevel) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch Expression Refactoring

Switch expressions are a Java 14+ feature. For compatibility with Java 8, traditional switch statements should be used with explicit return statements.

Standards
  • Java 8 Compatibility
  • Control Flow Clarity

case TRANSACTION_BOUNDARIES_NULL -> null;
default -> ConnectorTransactionBoundaries.UNSUPPORTED;
};
switch (supportLevel) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second Switch Expression

Another instance of switch expressions which are a Java 14+ feature. For compatibility with Java 8, traditional switch statements should be used with explicit return statements.

Standards
  • Java 8 Compatibility
  • Control Flow Clarity

case "awaitStop" -> true;
default -> null;
};
switch (invocation.getMethod().getName()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker Switch Expression

Another instance of switch expressions which are a Java 14+ feature. For compatibility with Java 8, traditional switch statements should be used with explicit return statements.

Standards
  • Java 8 Compatibility
  • Control Flow Clarity

package org.apache.kafka.connect.integration;

public record StartsAndStops(int starts, int stops) {
public class StartsAndStops {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java Record Conversion

Java records were introduced in Java 16. For compatibility with Java 8, a regular class with getters should be used instead.

Standards
  • Java 8 Compatibility
  • API Design

}

private record TestDTO(String content) {
private static class TestDTO {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestDTO Record Conversion

Java records were introduced in Java 16. For compatibility with Java 8, a regular class with getters, equals, and hashCode methods should be used instead.

Standards
  • Java 8 Compatibility
  • API Design
  • Object Contract

private static String formatAssignment(Map<String, Collection<String>> assignment) {
StringBuilder result = new StringBuilder();
for (String worker : assignment.keySet().stream().sorted().toList()) {
for (String worker : assignment.keySet().stream().sorted().collect(Collectors.toList())) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList() Usage

Stream.toList() is a Java 16+ feature. For compatibility with Java 8, collect(Collectors.toList()) should be used instead.

Standards
  • Java 8 Compatibility
  • Stream API Best Practices

"simulated-task-producer-" + CONNECTOR_NAME + "-" + i,
Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i)
)).toList();
)).collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList() in ExactlyOnce

Stream.toList() is a Java 16+ feature. For compatibility with Java 8, collect(Collectors.toList()) should be used instead.

Standards
  • Java 8 Compatibility
  • Stream API Best Practices

.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.toList();
.collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList() in TestPlugins

Stream.toList() is a Java 16+ feature. For compatibility with Java 8, collect(Collectors.toList()) should be used instead.

Standards
  • Java 8 Compatibility
  • Stream API Best Practices

.filter(path -> !path.toFile().getName().endsWith(".java"))
.filter(path -> !removeRuntimeClasses.test(path.toFile().getName()))
.toList();
.collect(Collectors.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream toList() in TestPlugins

Stream.toList() is a Java 16+ feature. For compatibility with Java 8, collect(Collectors.toList()) should be used instead.

Standards
  • Java 8 Compatibility
  • Stream API Best Practices

private String executePut(URI serverUrl, String endpoint, String jsonBody) throws IOException {
HttpPut request = new HttpPut(endpoint);
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8);
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Charset Name String

StringEntity constructor with Charset parameter is not available in older versions of HttpClient. Using the charset name as a string ensures compatibility.

Standards
  • API Compatibility
  • Backward Compatibility

ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);

assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Parameter Order

The assertEquals parameters are in the wrong order. The expected value should be first, followed by the actual value. This makes error messages clearer when assertions fail.

Standards
  • JUnit Best Practices
  • Test Readability


ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Parameter Order

The assertEquals parameters are in the wrong order. The expected value should be first, followed by the actual value. This makes error messages clearer when assertions fail.

Standards
  • JUnit Best Practices
  • Test Readability


ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Parameter Order

The assertEquals parameters are in the wrong order. The expected value should be first, followed by the actual value. This makes error messages clearer when assertions fail.

Standards
  • JUnit Best Practices
  • Test Readability

.thenReturn(SERIALIZED_RECORD);
assertEquals(SERIALIZED_KEY, keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY));
assertEquals(SERIALIZED_RECORD, valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD));
assertEquals(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY), SERIALIZED_KEY);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Parameter Order

The assertEquals parameters are in the wrong order. The expected value should be first, followed by the actual value. This makes error messages clearer when assertions fail.

Standards
  • JUnit Best Practices
  • Test Readability

ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class);
verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull());
assertEquals("Connector " + CONNECTOR_NAME + " not found", exceptionCaptor.getValue().getMessage());
assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Parameter Order

The assertEquals parameters are in the wrong order. The expected value should be first, followed by the actual value. This makes error messages clearer when assertions fail.

Standards
  • JUnit Best Practices
  • Test Readability

Comment on lines 235 to 238
assertEquals(configuration.dlqTopicName(), DLQ_TOPIC);

configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "7"));
assertEquals(7, configuration.dlqTopicReplicationFactor());
assertEquals(configuration.dlqTopicReplicationFactor(), 7);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion Parameter Order

The assertEquals parameters are in the wrong order. The expected value should be first, followed by the actual value. This makes error messages clearer when assertions fail.

Standards
  • JUnit Best Practices
  • Test Readability

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

598-641: Trim provider names and close on configure failure

  • Trim entries from config.providers to avoid accidental blanks.
  • If configure(...) fails, close the instance before rethrowing. Mirrors existing getConfiguredInstance failure cleanup.
-        for (String provider : configProviders.split(",")) {
+        for (String raw : configProviders.split(",")) {
+            String provider = raw.trim();
+            if (provider.isEmpty()) continue;
             String providerClass = providerClassProperty(provider);
             if (indirectConfigs.containsKey(providerClass)) {
                 String providerClassName = indirectConfigs.get(providerClass);
                 if (classNameFilter.test(providerClassName)) {
                     providerMap.put(provider, providerClassName);
                 } else {
                     throw new ConfigException(providerClassName + " is not allowed. Update System property '"
                             + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName);
                 }
             }
         }
         // Instantiate Config Providers
         Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
-                provider.configure(configProperties);
-                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                try {
+                    provider.configure(configProperties);
+                } catch (Throwable t) {
+                    Utils.maybeCloseQuietly(provider, "config provider");
+                    throw t;
+                }
+                Plugin<ConfigProvider> providerPlugin =
+                    Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
                 configProviderPluginInstances.put(entry.getKey(), providerPlugin);
             } catch (ClassNotFoundException e) {
                 log.error("Could not load config provider class {}", entry.getValue(), e);
                 throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
             }
         }
🧹 Nitpick comments (12)
Vagrantfile (1)

33-33: Optional: Modernize Vagrant defaults (EOL base box, legacy AWS bits).

While here, consider follow-ups:

  • base_box ubuntu/trusty64 is long EOL; bump to a supported Ubuntu LTS.
  • m3.medium/spot price/old AMI likely deprecated; refresh to current types/AMI.
  • ifconfig parsing is brittle; prefer ip route/addr or cloud-init metadata for private IP.

Also applies to: 45-47, 115-119, 138-141

clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)

32-35: Clarify metrics tag semantics in Javadoc.

“provider” tag isn’t universally automatic; it’s added when the runtime wraps providers with extra tags (e.g., Connect’s Plugins.newConfigProvider). Direct wraps without extra tags only include “config” and “class”, and no metrics are registered if metrics is null. Suggest tightening wording.

Apply this doc tweak:

- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
- * The following tags are automatically added to all metrics registered: <code>config</code> set to
- * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
- * and <code>provider</code> set to the provider name.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
+ * When constructed by the Kafka Connect runtime, metrics registered by the provider will be tagged with:
+ * <code>config</code>=<code>config.providers</code>, <code>class</code>=the provider’s simple class name,
+ * and <code>provider</code>=the configured provider name. Custom wrappers may omit the <code>provider</code> tag,
+ * and no metrics are registered if no {@code Metrics} instance is supplied.
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)

79-82: Add a test for unknown provider behavior.

Cover the case where a placeholder references a provider not present in the map; the value should remain unchanged. This guards the null-plugin path.

You can add:

+    @Test
+    public void testUnknownProviderLeavesValueUnchanged() {
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${unknown:testPath:testKey}"));
+        assertEquals("${unknown:testPath:testKey}", result.data().get(MY_KEY));
+    }
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (2)

67-69: Defensively copy provider map in constructor.

Avoid external mutation by copying the incoming map.

-    public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
-        this.configProviderPlugins = configProviderPlugins;
-    }
+    public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
+        this.configProviderPlugins = Map.copyOf(configProviderPlugins);
+    }

98-105: Minor: avoid repeated plugin.get() per path.

Cache the provider once per provider iteration for readability and micro-efficiency.

-            if (providerPlugin != null && keysByPath != null) {
+            if (providerPlugin != null && keysByPath != null) {
+                ConfigProvider provider = providerPlugin.get();
                 for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
                     String path = pathWithKeys.getKey();
                     Set<String> keys = new HashSet<>(pathWithKeys.getValue());
-                    ConfigData configData = providerPlugin.get().get(path, keys);
+                    ConfigData configData = provider.get(path, keys);
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)

2904-2951: Stabilize the test and reduce brittleness

  • Start/stop the worker to ensure provider initialization regardless of constructor timing.
  • In assertMetrics, match expected tags as a subset instead of exact-equality to avoid future default-tag drift.
 @Test
 public void testMonitorableConfigProvider() {
-    setup(false);
+    setup(false);
     Map<String, String> props = new HashMap<>(this.workerProps);
     props.put("config.providers", "monitorable,monitorable2");
     config = new StandaloneConfig(props);
     mockKafkaClusterId();
@@
-    worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
+    worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
+    worker.start();
     Metrics metrics = worker.metrics().metrics();
@@
-    assertMetrics(metrics,
+    assertMetrics(metrics,
         1,
         expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable")));
@@
-    assertMetrics(metrics,
+    assertMetrics(metrics,
         1,
         expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2")));
+    worker.stop();
 }
@@
-    private void assertMetrics(Metrics metrics, int expected, Map<String, String> expectedTags) {
+    private void assertMetrics(Metrics metrics, int expected, Map<String, String> expectedTags) {
         int found = 0;
         for (MetricName metricName : metrics.metrics().keySet()) {
             if (metricName.group().equals("plugins")) {
                 Map<String, String> tags = metricName.tags();
-                if (expectedTags.equals(tags)) {
+                boolean matches = expectedTags.entrySet().stream()
+                    .allMatch(e -> e.getValue().equals(tags.get(e.getKey())));
+                if (matches) {
                     assertEquals(MonitorableConfigProvider.NAME, metricName.name());
                     assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description());
                     found++;
                 }
             }
         }
         assertEquals(expected, found);
     }
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)

48-52: Defensively copy provider map

Prevent accidental external mutation by copying to an unmodifiable map before storing.

-    public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
+    public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
         this.worker = worker;
-        this.configProviderPlugins = configProviderPlugins;
+        this.configProviderPlugins = Map.copyOf(configProviderPlugins);
         this.configTransformer = new ConfigTransformer(configProviderPlugins);
     }
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)

150-158: Prefer returning empty ConfigData

Minor: returning an empty ConfigData instead of Map.of() on every branch keeps behavior explicit and avoids accidental nulls. Current code is fine though.

clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)

41-48: Return empty ConfigData instead of null

Avoid potential NPEs if a test invokes get(...).

-    public ConfigData get(String path) {
-        return null;
-    }
+    public ConfigData get(String path) {
+        return new ConfigData(Map.of());
+    }
@@
-    public ConfigData get(String path, Set<String> keys) {
-        return null;
-    }
+    public ConfigData get(String path, Set<String> keys) {
+        return new ConfigData(Map.of());
+    }
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (1)

404-413: Tidy up resources (Metrics and plugin)

Close Metrics and the returned Plugin to avoid thread/resource leaks in tests.

-    Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
-    assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+    Metrics metrics = new Metrics();
+    try {
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, metrics);
+        assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
+        plugin.close();
+    } finally {
+        metrics.close();
+    }
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)

270-286: Close transformer before stopping metrics

Close workerConfigTransformer (which now owns plugin-wrapped providers with plugin metrics) before metrics.stop() so plugins can unregister cleanly.

         globalOffsetBackingStore.stop();
-        metrics.stop();
+        workerConfigTransformer.close();
+        metrics.stop();
         log.info("Worker stopped");
         workerMetricsGroup.close();
         if (connectorStatusMetricsGroup != null) {
             connectorStatusMetricsGroup.close();
         }
-        workerConfigTransformer.close();
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

598-622: Optional: warn when provider is listed but its .class is missing

Today such providers are silently ignored. Consider logging a warning to aid misconfiguration diagnosis.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 5989a26.

📒 Files selected for processing (13)
  • Vagrantfile (1 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1 hunks)
  • connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3 hunks)
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (5 hunks)
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
  • Plugins (59-696)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
  • withPluginMetrics (6299-6301)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
  • Utils (93-1700)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
  • WorkerConfig (58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
  • MonitorableConfigProvider (29-58)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
  • Metrics (70-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (33-92)
🔇 Additional comments (10)
Vagrantfile (1)

20-21: No-op formatting; fine to merge.

Blank line + guard comment improve readability; no functional change.

clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)

49-50: Test setup aligns with new Plugin-wrapped providers.

Using Plugin.wrapInstance with null metrics is appropriate for these unit tests. LGTM.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)

382-389: Good migration to Plugin-wrapped providers

Mock returns a Plugin with provider tag; aligns with new Plugins.newConfigProvider signature. LGTM.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)

100-102: Close semantics LGTM

Closing Plugin wrappers via Utils.closeQuietly is correct and ensures both provider and attached PluginMetrics are released.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)

69-70: Updated setup matches new API

Wrapping TestConfigProvider in Plugin and passing to WorkerConfigTransformer is correct. Tests remain valid.

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)

391-402: LGTM on Plugin usage

Switch to Plugin wrapper and accessing plugin.get() is correct.


810-816: Nice assertion of configure-before-metrics

Overriding withPluginMetrics to assert configured=true is a clean way to validate ordering.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)

632-639: Clarify/guard null return contract

This method returns null when a provider is listed but .class is absent. Ensure all call sites handle null to avoid NPEs during transformer initialization. Consider logging a warning here or throwing for stricter config validation.

clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)

23-24: LGTM on adopting Plugin wrapper in common config

Import and usage align with the new plugin-wrapped provider model.

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)

210-223: Ignore WorkerConfigTransformer null-safety concerns WorkerConfigTransformer never calls get() on the plugin map—it only passes it to ConfigTransformer and iterates values() in close(), both of which handle an empty or missing entry map without NPEs.

Likely an incorrect or invalid review comment.

Comment on lines +550 to 561
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

return new ResolvingMap<>(resolvedOriginals, originals);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Always close provider plugins even if transform throws

Wrap transform(...) in try/finally so provider plugins are closed on all paths. Prevents leaking provider resources and plugin metrics.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
-        if (!providerPlugins.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
-            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
-            if (!result.data().isEmpty()) {
-                resolvedOriginals.putAll(result.data());
-            }
-        }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        Map<String, Plugin<ConfigProvider>> providerPlugins =
+            instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+        try {
+            if (!providerPlugins.isEmpty()) {
+                ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+                ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+                if (!result.data().isEmpty()) {
+                    resolvedOriginals.putAll(result.data());
+                }
+            }
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return new ResolvingMap<>(resolvedOriginals, originals);
Map<String, Plugin<ConfigProvider>> providerPlugins =
instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
try {
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
} finally {
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
return new ResolvingMap<>(resolvedOriginals, originals);
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 550 to 561, the call to configTransformer.transform(...) is not
wrapped in a try/finally, so if transform throws the provider plugins may not be
closed; change the code to instantiate the ConfigTransformer and then call
transform inside a try block, applying any result.data() to resolvedOriginals in
the try, and move providerPlugins.values().forEach(x -> Utils.closeQuietly(x,
"config provider plugin")); into a finally block so provider plugins are always
closed even when transform throws.

Comment on lines 270 to 287
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, ConfigProvider> providers = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Ensure plugins are always closed; skip null plugins.

If transform throws, provider plugins aren’t closed, leaking resources/classloaders. Also, newConfigProvider may return null; avoid putting nulls and guard close.

Apply:

-        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
-        for (String name : providerNames) {
-            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
-                    this,
-                    name,
-                    Plugins.ClassLoaderUsage.PLUGINS,
-                    null
-            );
-            providerPlugins.put(name, configProviderPlugin);
-        }
-        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
-        Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
-        return transformed;
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+        for (String name : providerNames) {
+            Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
+                    this,
+                    name,
+                    Plugins.ClassLoaderUsage.PLUGINS,
+                    null
+            );
+            if (plugin != null) {
+                providerPlugins.put(name, plugin);
+            }
+        }
+        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+        try {
+            return transformer.transform(props).data();
+        } finally {
+            providerPlugins.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin"));
+        }

If you decide not to filter nulls, please confirm Utils.closeQuietly tolerates nulls.


🏁 Script executed:

#!/bin/bash
set -e
echo "=== newConfigProvider definitions and usage ==="
rg -nR "newConfigProvider" -n -C5 --type java
echo ""
echo "=== Utils.closeQuietly implementations ==="
rg -nR "closeQuietly" -n -C5 --type java

Length of output: 452


🏁 Script executed:

#!/bin/bash
echo "=== Searching for newConfigProvider definitions ==="
rg -n "newConfigProvider" -g '*.java' -C3
echo "=== Searching for Utils.closeQuietly ==="
rg -n "closeQuietly" -g '*.java' -C3

Length of output: 168790


🏁 Script executed:

#!/bin/bash
sed -n '600,670p' connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java

Length of output: 3899


Always null-check and close config provider plugins in a finally block

  • Skip any null returned by plugins.newConfigProvider before putting into providerPlugins.
  • Wrap the call to transformer.transform(props) in a try/finally so Utils.closeQuietly runs even if transform throws.
  • (Note: Utils.closeQuietly(null, …) is a no-op, but filtering out null plugins avoids invalid entries.)
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 270 to 287, the code currently puts possible null config-provider
plugins into the providerPlugins map and only closes plugins after
transformer.transform(props) completes; change it to skip any null returned by
plugins.newConfigProvider before inserting into providerPlugins, call
transformer.transform(props) inside a try block and move the
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider
plugin")) into a finally block so plugins are closed even if transform throws,
ensuring no null entries are stored and resources are always released.

Comment on lines 646 to 650
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Close provider if configure fails to prevent leaks

If configure(...) throws, the created provider instance is never closed. Close it on failure before wrapping to avoid resource leaks.

-        try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
-            plugin.configure(configProviderConfig);
-        }
-        return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
+        try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
+            plugin.configure(configProviderConfig);
+        } catch (Throwable t) {
+            // Ensure we don't leak a partially configured provider
+            Utils.maybeCloseQuietly(plugin, "config provider");
+            throw t;
+        }
+        return Plugin.wrapInstance(
+            plugin,
+            metrics,
+            WorkerConfig.CONFIG_PROVIDERS_CONFIG,
+            Map.of("provider", providerName)
+        );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
}
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
} catch (Throwable t) {
// Ensure we don't leak a partially configured provider
Utils.maybeCloseQuietly(plugin, "config provider");
throw t;
}
return Plugin.wrapInstance(
plugin,
metrics,
WorkerConfig.CONFIG_PROVIDERS_CONFIG,
Map.of("provider", providerName)
);
🤖 Prompt for AI Agents
In
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
around lines 646-650, the code opens a provider instance and calls
plugin.configure(...) but does not close the provider if configure throws,
leaking resources; wrap the configure call in a try/catch so that on any
Throwable you attempt to close the provider (if it implements
AutoCloseable/Closeable cast and call close()), and if close() itself throws add
that exception as suppressed to the original exception before rethrowing; only
call Plugin.wrapInstance(...) after configure completes successfully.

Comment on lines +214 to 221
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard against null providers to avoid NPEs in transformer

plugins.newConfigProvider(...) can return null. Currently, null is inserted into the map, risking NPEs downstream. Skip nulls and warn.

-            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                 config,
                 providerName,
                 ClassLoaderUsage.PLUGINS,
                 metrics.metrics()
             );
-            providerPluginMap.put(providerName, configProviderPlugin);
+            if (configProviderPlugin != null) {
+                providerPluginMap.put(providerName, configProviderPlugin);
+            } else {
+                log.warn("Skipping config provider '{}' as no class configured under {}.{}.class",
+                        providerName, WorkerConfig.CONFIG_PROVIDERS_CONFIG, providerName);
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
if (configProviderPlugin != null) {
providerPluginMap.put(providerName, configProviderPlugin);
} else {
log.warn("Skipping config provider '{}' as no class configured under {}.{}.class",
providerName, WorkerConfig.CONFIG_PROVIDERS_CONFIG, providerName);
}
🤖 Prompt for AI Agents
In connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
around lines 214 to 221, plugins.newConfigProvider(...) may return null; change
the logic to check the returned Plugin<ConfigProvider> for null before putting
it into providerPluginMap — if the plugin is null, do not insert it and emit a
warning-level log that includes the providerName (and any contextual config if
useful) so downstream transformers won't encounter an NPE; otherwise proceed to
put the non-null plugin into the map as before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants