Skip to content

Conversation

@joshua2519
Copy link
Contributor

@joshua2519 joshua2519 commented Jan 7, 2026

Migrates NodeToControllerChannelManager and related classes from Scala
(core module) to Java (server module).

New Java Classes (server module) include ControllerInformation,
RaftControllerNodeProvider, NodeToControllerChannelManagerImpl,
NodeToControllerRequestThread and NodeToControllerQueueItem.

In AbstractKafkaConfig, added
addReconfigurable()/removeReconfigurable() abstract methods to
support migration. In KafkaConfig, added override keywords for
reconfigurable methods. Updated imports and usages across core module
classes and tests. Replace ControllerNodeProvider by
Supplier<ControllerInformation>.

Reviewers: Chia-Ping Tsai [email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels Jan 7, 2026
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joshua2519 thanks for this patch!

/**
* Discovers the active controller node and provides connection details for communicating with it.
*/
public interface ControllerNodeProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use Supplier<ControllerInformation> instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. ControllerNodeProvider now extends Supplier for better idiomatic Java usage.

*
* @param reconfigurable the component to register for configuration updates
*/
public void addReconfigurable(Reconfigurable reconfigurable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't set it as abstract method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed and changed. Since all current implementations (KafkaConfig) already provide concrete implementations, making them abstract clarifies the contract and ensures future subclasses don't accidentally rely on no-op behavior.

*
* @param reconfigurable the component to unregister
*/
public void removeReconfigurable(Reconfigurable reconfigurable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

private final ApiVersions apiVersions = new ApiVersions();
private final NodeToControllerRequestThread requestThread;

@SuppressWarnings("this-escape")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind inlining newRequestThread to get rid of this warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else if (response.wasDisconnected()) {
updateControllerAddress(null);
try {
requestQueue.putFirst(queueItem);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can substitute addFirst for putFirst, since requestQueue is an unbounded LinkedBlockingDeque

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Since requestQueue is unbounded (LinkedBlockingDeque with no capacity limit), putFirst() will never block and the InterruptedException handling is unnecessary. Changed to addFirst() for simplicity.

@github-actions github-actions bot removed the triage PRs from the community label Jan 11, 2026
/**
* Discovers the active controller node and provides connection details for communicating with it.
*/
public interface ControllerNodeProvider extends Supplier<ControllerInformation> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the standard Supplier<ControllerInformation> interface could simply replace ControllerNodeProvider

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Replaced ControllerNodeProvider with Supplier throughout the codebase since there's only one production implementation.

}

@Override
public void shutdown() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an exception here to align with the Scala implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Updated shutdown() to declare throws InterruptedException allowing the exception to propagate instead of being swallowed.

Optional<Node> controllerAddress = activeControllerAddress();
if (controllerAddress.isPresent()) {
requestIter.remove();
return Collections.singletonList(new RequestAndCompletionHandler(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List.of

}
}

return Collections.emptyList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List.of

activeControllerAddress().map(Node::idString).orElse("null"));
maybeDisconnectAndUpdateController();
try {
requestQueue.putFirst(queueItem);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addFirst?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@SuppressWarnings("resource") // RaftClient lifecycle managed by RaftManager
private Optional<Node> idToNode(int id) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this private method? Having two @SuppressWarnings seems to double the noise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I have inlined to get().

@chia7712 chia7712 merged commit 148be48 into apache:trunk Jan 15, 2026
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants