- 
                Notifications
    You must be signed in to change notification settings 
- Fork 0
KAFKA-14691; Add TopicId to OffsetFetch API #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
| Warning Rate limit exceeded@arvi18 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 17 minutes and 41 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the  We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (7)
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 CodeRabbit Configuration File ( | 
| /codehelper | 
    
      
        1 similar comment
      
    
  
    | /codehelper | 
| @ParameterizedTest | ||
| @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) | ||
| def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = { | ||
| val foo = "foo" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused variable in test method: The variable bar is declared but not used in this test method. Consider removing it or using it in the test to avoid confusion.
|  | ||
| private def fetchOffsetsForGroup( | ||
| requestContext: RequestContext, | ||
| offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Topic ID Support Version Handling: Please verify that the implementation correctly handles both topic IDs and topic names across different API versions, especially around version 10 which introduces topic ID support. The conditional logic paths based on version checks need careful verification to ensure backward compatibility is maintained.
| .setGroupId(offsetFetchRequest.groupId) | ||
| .setErrorCode(Errors.forException(exception).code) | ||
| } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) { | ||
| offsetFetchResponse | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Authorization Logic with Topic IDs: This section modifies authorization logic to handle topic IDs. Please confirm that authorization checks work properly when using topic IDs instead of topic names, as this is security-critical functionality.
| .setErrorCode(Errors.INVALID_GROUP_ID.code) | ||
|  | ||
| val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() | ||
| .setGroupId("group-4") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test Coverage Suggestion: Consider adding specific tests for version compatibility edge cases. The PR adds version-specific behavior for TopicId support (version >= 10), but test coverage for version boundary transitions could be strengthened.
| /refacto-test | 
| Refacto is reviewing this PR. Please wait for the review comments to be posted. | 
| Code Review: Kafka OffsetFetch API Topic ID Support👍 Well Done
 📌 Files Processed
 📝 Additional Comments
 | 
| offsetFetchRequest.topics.forEach { topic => | ||
| if (topic.topicId != Uuid.ZERO_UUID) { | ||
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Null Check Missing
Missing null check on offsetFetchRequest.topics could cause NullPointerException. Attackers could send malformed requests with null topics to trigger server errors.
| offsetFetchRequest.topics.forEach { topic => | |
| if (topic.topicId != Uuid.ZERO_UUID) { | |
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | |
| } | |
| } | |
| if (useTopicIds) { | |
| if (offsetFetchRequest.topics != null) { | |
| offsetFetchRequest.topics.forEach { topic => | |
| if (topic.topicId != Uuid.ZERO_UUID) { | |
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | |
| } | |
| } | |
| } | |
| } | 
Standards
- CWE-476
- OWASP-A06
| val authorizedNames = authHelper.filterByAuthorized( | ||
| requestContext, | ||
| DESCRIBE, | ||
| TOPIC, | ||
| offsetFetchResponse.topics.asScala | ||
| )(_.name) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Null Check Missing
No null check on offsetFetchResponse.topics before conversion to Scala collection. Could cause NullPointerException if topics is null, enabling denial of service attacks.
| val authorizedNames = authHelper.filterByAuthorized( | |
| requestContext, | |
| DESCRIBE, | |
| TOPIC, | |
| offsetFetchResponse.topics.asScala | |
| )(_.name) | |
| val authorizedNames = if (offsetFetchResponse.topics == null) { | |
| Set.empty[String] | |
| } else { | |
| authHelper.filterByAuthorized( | |
| requestContext, | |
| DESCRIBE, | |
| TOPIC, | |
| offsetFetchResponse.topics.asScala | |
| )(_.name) | |
| } | 
Standards
- CWE-476
- OWASP-A06
| if (useTopicIds) { | ||
| offsetFetchRequest.topics.forEach { topic => | ||
| if (topic.topicId != Uuid.ZERO_UUID) { | ||
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | ||
| } | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing TopicId Validation
Missing validation when topic ID is present but name lookup fails. This creates inconsistency where topicId exists but name remains empty, causing potential reliability issues.
| if (useTopicIds) { | |
| offsetFetchRequest.topics.forEach { topic => | |
| if (topic.topicId != Uuid.ZERO_UUID) { | |
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | |
| } | |
| } | |
| } | |
| if (useTopicIds) { | |
| offsetFetchRequest.topics.forEach { topic => | |
| if (topic.topicId != Uuid.ZERO_UUID) { | |
| val topicNameOpt = metadataCache.getTopicName(topic.topicId) | |
| if (topicNameOpt.isPresent) { | |
| topic.setName(topicNameOpt.get) | |
| } else { | |
| // If we can't resolve the topic ID, mark it with empty name | |
| // It will be handled later with UNKNOWN_TOPIC_ID error | |
| topic.setName("") | |
| } | |
| } | |
| } | |
| } | 
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- DbC-Precondition-Validation
- SRE-Error-Handling
| def buildErrorResponse( | ||
| topic: OffsetFetchRequestData.OffsetFetchRequestTopics, | ||
| error: Errors | ||
| ): OffsetFetchResponseData.OffsetFetchResponseTopics = { | ||
| val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics() | ||
| .setTopicId(topic.topicId) | ||
| .setName(topic.name) | ||
| topic.partitionIndexes.forEach { partitionIndex => | ||
| topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||
| .setPartitionIndex(partitionIndex) | ||
| .setCommittedOffset(-1) | ||
| .setErrorCode(error.code)) | ||
| } | ||
| topicResponse | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete Error Handling
Error response builder doesn't set committed leader epoch. This inconsistency could lead to client-side errors when processing responses with errors, as other code paths set this field.
| def buildErrorResponse( | |
| topic: OffsetFetchRequestData.OffsetFetchRequestTopics, | |
| error: Errors | |
| ): OffsetFetchResponseData.OffsetFetchResponseTopics = { | |
| val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics() | |
| .setTopicId(topic.topicId) | |
| .setName(topic.name) | |
| topic.partitionIndexes.forEach { partitionIndex => | |
| topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | |
| .setPartitionIndex(partitionIndex) | |
| .setCommittedOffset(-1) | |
| .setErrorCode(error.code)) | |
| } | |
| topicResponse | |
| } | |
| def buildErrorResponse( | |
| topic: OffsetFetchRequestData.OffsetFetchRequestTopics, | |
| error: Errors | |
| ): OffsetFetchResponseData.OffsetFetchResponseTopics = { | |
| val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics() | |
| .setTopicId(topic.topicId) | |
| .setName(topic.name) | |
| topic.partitionIndexes.forEach { partitionIndex => | |
| topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | |
| .setPartitionIndex(partitionIndex) | |
| .setCommittedOffset(-1) | |
| .setCommittedLeaderEpoch(-1) | |
| .setErrorCode(error.code)) | |
| } | |
| topicResponse | |
| } | 
Standards
- ISO-IEC-25010-Reliability-Maturity
- ISO-IEC-25010-Functional-Correctness-Completeness
- DbC-Postcondition-Consistency
- SRE-Error-Handling
| new OffsetFetchResponseData.OffsetFetchResponseTopics() | ||
| .setTopicId(topic.topicId()) | ||
| .setName(topic.name()); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent Name Handling
Topic name is set inconsistently in the response. When using topic IDs, the response must include both ID and name for backward compatibility and proper client-side handling.
| new OffsetFetchResponseData.OffsetFetchResponseTopics() | |
| .setTopicId(topic.topicId()) | |
| .setName(topic.name()); | |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() | |
| .setTopicId(topic.topicId()) | |
| .setName(topic.name()) | 
Standards
- ISO-IEC-25010-Reliability-Maturity
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- DbC-Contract-Consistency
- SRE-Backward-Compatibility
| val authorizedNames = authHelper.filterByAuthorized( | ||
| requestContext, | ||
| DESCRIBE, | ||
| TOPIC, | ||
| offsetFetchResponse.topics.asScala | ||
| )(_.name) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary String Comparisons
Filtering by topic names causes unnecessary string comparisons when topic IDs are available. With many topics, this creates O(n) lookups where O(1) is possible using topic IDs directly.
| val authorizedNames = authHelper.filterByAuthorized( | |
| requestContext, | |
| DESCRIBE, | |
| TOPIC, | |
| offsetFetchResponse.topics.asScala | |
| )(_.name) | |
| val authorizedIdentifiers = if (useTopicIds) { | |
| authHelper.filterByAuthorized(requestContext, DESCRIBE, TOPIC, offsetFetchResponse.topics.asScala)(topic => metadataCache.getTopicName(topic.topicId).orElse(topic.name)) | |
| } else { | |
| authHelper.filterByAuthorized(requestContext, DESCRIBE, TOPIC, offsetFetchResponse.topics.asScala)(_.name) | |
| } | 
Standards
- ISO-IEC-25010-Performance-Time-Behaviour
- Algorithm-Opt-Hash-Map
- Netflix-Hot-Path-Optimization
| Reopening this in a new PR | 
| /refacto-test | 
| Refacto is reviewing this PR. Please wait for the review comments to be posted. | 
| /refacto-test | 
| /refacto-test-arvi | 
| Refacto is reviewing this PR. Please wait for the review comments to be posted. | 
| Code Review: OffsetFetch API Topic ID Support👍 Well Done
 📌 Files Processed
 📝 Additional Comments
 | 
|  | ||
| if (useTopicIds) { | ||
| offsetFetchRequest.topics.forEach { topic => | ||
| if (topic.topicId != Uuid.ZERO_UUID) { | ||
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Topic ID Inconsistency
Topic name resolution only happens when topicId is non-zero, but later code at line 1119 checks for empty name regardless of ID value. This creates inconsistent validation where topics with ZERO_UUID but empty names trigger UNKNOWN_TOPIC_ID error incorrectly.
Standards
- Logic-Verification-Consistency
- Business-Rule-Validation
- Algorithm-Correctness-Conditional
|  | ||
| if (useTopicIds) { | ||
| offsetFetchRequest.topics.forEach { topic => | ||
| if (topic.topicId != Uuid.ZERO_UUID) { | ||
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | ||
| } | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant Topic Name
Setting topic names when using topic IDs creates unnecessary overhead. When using topic IDs (version >= 10), the name isn't required for lookups. This redundant operation adds CPU and memory overhead by populating fields that won't be used in the authorization path.
Standards
- ISO-IEC-25010-Performance-Resource-Utilization
- Algorithm-Opt-Redundant-Operation
| offsetFetchRequest.topics.forEach { topic => | ||
| if (useTopicIds && topic.name.isEmpty) { | ||
| errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID) | ||
| } else if (!authorizedTopicNames.contains(topic.name)) { | ||
| errorTopics += buildErrorResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete Error Handling
Logic fails to handle case where topic.name is non-empty but invalid/non-existent. Authorization check will fail but with incorrect error code (TOPIC_AUTHORIZATION_FAILED instead of UNKNOWN_TOPIC_ID) when topic ID is provided but resolves to non-existent name.
Standards
- Business-Rule-Validation
- Logic-Verification-Completeness
- Algorithm-Correctness-ErrorHandling
| val expectedGroup1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() | ||
| .setGroupId("group-1") | ||
| .setTopics(List( | ||
| new OffsetFetchResponseData.OffsetFetchResponseTopics() | ||
| .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID) | ||
| .setName(if (version < 10) foo else "") | ||
| .setPartitions(List( | ||
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||
| .setPartitionIndex(0) | ||
| .setCommittedOffset(100) | ||
| .setCommittedLeaderEpoch(1), | ||
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||
| .setPartitionIndex(1) | ||
| .setCommittedOffset(200) | ||
| .setCommittedLeaderEpoch(2) | ||
| ).asJava) | ||
| ).asJava) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated Response Mapping
Duplicated response object construction pattern appears multiple times in the test. This creates maintenance burden when response structure changes. Extract helper method to build response objects with version-specific fields.
Standards
- Clean-Code-DRY
- Design-Pattern-Factory
| .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID) | ||
| .setName(if (version < 10) foo else "") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conditional Field Setting
Conditional logic for version-specific field values is repeated across multiple test cases. This creates maintenance overhead when version logic changes. Extract version-specific field setting into helper methods.
Standards
- Clean-Code-DRY
- Design-Pattern-Strategy
| val authorizedTopics = new mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics] | ||
| val errorTopics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] | ||
|  | ||
| def buildErrorResponse( | ||
| topic: OffsetFetchRequestData.OffsetFetchRequestTopics, | ||
| error: Errors | ||
| ): OffsetFetchResponseData.OffsetFetchResponseTopics = { | ||
| val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics() | ||
| .setTopicId(topic.topicId) | ||
| .setName(topic.name) | ||
| topic.partitionIndexes.forEach { partitionIndex => | ||
| topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||
| .setPartitionIndex(partitionIndex) | ||
| .setCommittedOffset(-1) | ||
| .setErrorCode(error.code)) | ||
| } | ||
| topicResponse | ||
| } | ||
|  | ||
| offsetFetchRequest.topics.forEach { topic => | ||
| if (useTopicIds && topic.name.isEmpty) { | ||
| errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID) | ||
| } else if (!authorizedTopicNames.contains(topic.name)) { | ||
| errorTopics += buildErrorResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) | ||
| } else { | ||
| authorizedTopics += topic | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inefficient Collection Conversion
Multiple collection conversions and intermediate buffers create unnecessary overhead. The code creates temporary buffers and performs multiple iterations over the topic collection. Consider using partition/filter operations to categorize topics in a single pass, reducing memory allocations and CPU cycles.
Standards
- ISO-IEC-25010-Performance-Resource-Utilization
- Algorithm-Opt-Collection-Processing
WIP