-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-10357: Refactor makeReady to reduce complexity and support upcoming KIP-698 changes #20326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
… to create, so we should return those
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Show resolved
Hide resolved
final Set<String> topicsNotReady, | ||
final Set<String> newTopics) { | ||
final Set<String> tempUnknownTopics = new HashSet<>(); | ||
final Set<String> validatedTopics = validateTopics(topicsNotReady, topics, tempUnknownTopics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't validateTopics
return a set of topic it could not validate, and still need to get created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value represents topics that are configured correctly, and that also need to get created
The method call here iterates the provided topics, and validates three things:
- The
topicsMap
provided in the config knows about all of the topics we are trying to validate - The number of partitions for each internal topic is defined
- If the internal topic already exists, we decide not to add it to the return set but validate the number of partitions it has matches what the
InternalTopicManager
has
If all 1 and 2 pass, we add to the return set, and repeat for all topics passed into the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value represents topics that are configured correctly, and that also need to get created
Well, the method does validate topics if they exist. That's why the method is called validateTopics
-- but topics that do not exist, can by definition not be validated, but need to be created. So the returned set of topics, as you also confirmed (also need to get created
), are non-existing topics, and thus, they did not get validated as there is nothing to be done ("validation" mean, checks if an existing topic meets the expected partition count).
Does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.. I see your point, it may be a good opportunity to rename. If it's not properly validating the topics, I don't think we should name is validateTopics
, this is throwing me off.
The purpose of this method is to verify the partition count of the existing topics and return topics that have not yet been created.. maybe findTopicsToCreate
, determineTopicsToCreate
, computeMissingTopics
.. do any of these sound good to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the method does two things, validate existing topics and identify non-existing ones. So I think both names are ok, the current validateTopics
or any of the names you proposed. As the method does two thing, there will be some side-effect. I am fine either way.
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Outdated
Show resolved
Hide resolved
Add log line for incomplete topic creation call to broker Pass deadline parameter into createTopics for proper timeout handling Add exponential backoff back to createTopics (removed by mistake)
private void readyTopics(final Set<NewTopic> topicsToCreate, | ||
final Set<String> topicsNotReady) { | ||
private void createTopics(final Set<NewTopic> topicsToCreate, | ||
final Set<String> topicsNotReady, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indention
@@ -578,8 +579,29 @@ private void readyTopics(final Set<NewTopic> topicsToCreate, | |||
} | |||
} | |||
} | |||
|
|||
if (!topicsNotReady.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this go after the for-loop? I don't think we need to retry-backoff after verifying a single topic, but should be sufficient to check once for the whole "batch" of topics?
It seems |
This PR reduces the cyclomatic complexity of the makeReady method by
extracting parts of its logic into smaller, well-named helper methods.
This structural cleanup makes the code easier to understand and
maintain.
These changes are intended to prepare for upcoming work related to
KIP-698,
which will introduce additional branching logic to makeReady.
Refactoring now ensures that future modifications can be made in a more
modular and testable way.
Reviewer: Matthias J. Sax [email protected]