- 
                Notifications
    You must be signed in to change notification settings 
- Fork 0
KAFKA-14691; Add TopicId to OffsetFetch API #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|  | @@ -1027,6 +1027,8 @@ class KafkaApis(val requestChannel: RequestChannel, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requireStable: Boolean | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| groupCoordinator.fetchAllOffsets( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requestContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchRequest, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | @@ -1040,13 +1042,33 @@ class KafkaApis(val requestChannel: RequestChannel, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Clients are not allowed to see offsets for topics that are not authorized for Describe. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val authorizedNames = authHelper.filterByAuthorized( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requestContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DESCRIBE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TOPIC, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse.topics.asScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| )(_.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +1045
     to 
      1050
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null Check MissingNo null check on offsetFetchResponse.topics before conversion to Scala collection. Could cause NullPointerException if topics is null, enabling denial of service attacks. 
        Suggested change
       
 Standards
 
      Comment on lines
    
      +1045
     to 
      1050
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary String ComparisonsFiltering by topic names causes unnecessary string comparisons when topic IDs are available. With many topics, this creates O(n) lookups where O(1) is possible using topic IDs directly. 
        Suggested change
       
 Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse.setTopics(authorizedOffsets.asJava) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val topics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse.topics.forEach { topic => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (authorizedNames.contains(topic.name)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (useTopicIds) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If the topic is not provided by the group coordinator, we set it | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // using the metadata cache. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (topic.topicId == Uuid.ZERO_UUID) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic.setTopicId(metadataCache.getTopicId(topic.name)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If we don't have the topic id at all, we skip the topic because | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // we can not serialize it without it. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (topic.topicId != Uuid.ZERO_UUID) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topics += topic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topics += topic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse.setTopics(topics.asJava) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | @@ -1056,14 +1078,53 @@ class KafkaApis(val requestChannel: RequestChannel, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Topic ID Support Version Handling: Please verify that the implementation correctly handles both topic IDs and topic names across different API versions, especially around version 10 which introduces topic ID support. The conditional logic paths based on version checks need careful verification to ensure backward compatibility is maintained. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requireStable: Boolean | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (useTopicIds) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchRequest.topics.forEach { topic => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (topic.topicId != Uuid.ZERO_UUID) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +1084
     to 
      +1088
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null Check MissingMissing null check on offsetFetchRequest.topics could cause NullPointerException. Attackers could send malformed requests with null topics to trigger server errors. 
        Suggested change
       
 Standards
 
      Comment on lines
    
      +1082
     to 
      +1088
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Topic ID InconsistencyTopic name resolution only happens when topicId is non-zero, but later code at line 1119 checks for empty name regardless of ID value. This creates inconsistent validation where topics with ZERO_UUID but empty names trigger UNKNOWN_TOPIC_ID error incorrectly. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +1083
     to 
      +1089
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing TopicId ValidationMissing validation when topic ID is present but name lookup fails. This creates inconsistency where topicId exists but name remains empty, causing potential reliability issues. 
        Suggested change
       
 Standards
 
      Comment on lines
    
      +1082
     to 
      +1089
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant Topic NameSetting topic names when using topic IDs creates unnecessary overhead. When using topic IDs (version >= 10), the name isn't required for lookups. This redundant operation adds CPU and memory overhead by populating fields that won't be used in the authorization path. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Clients are not allowed to see offsets for topics that are not authorized for Describe. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val authorizedTopicNames = authHelper.filterByAuthorized( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requestContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DESCRIBE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TOPIC, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchRequest.topics.asScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| )(_.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val authorizedTopics = new mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val errorTopics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def buildErrorResponse( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic: OffsetFetchRequestData.OffsetFetchRequestTopics, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| error: Errors | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): OffsetFetchResponseData.OffsetFetchResponseTopics = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setTopicId(topic.topicId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setName(topic.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic.partitionIndexes.forEach { partitionIndex => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setPartitionIndex(partitionIndex) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setCommittedOffset(-1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setErrorCode(error.code)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topicResponse | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +1102
     to 
      +1116
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete Error HandlingError response builder doesn't set committed leader epoch. This inconsistency could lead to client-side errors when processing responses with errors, as other code paths set this field. 
        Suggested change
       
 Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchRequest.topics.forEach { topic => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (useTopicIds && topic.name.isEmpty) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (!authorizedTopicNames.contains(topic.name)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| errorTopics += buildErrorResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +1118
     to 
      +1122
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete Error HandlingLogic fails to handle case where topic.name is non-empty but invalid/non-existent. Authorization check will fail but with incorrect error code (TOPIC_AUTHORIZATION_FAILED instead of UNKNOWN_TOPIC_ID) when topic ID is provided but resolves to non-existent name. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| authorizedTopics += topic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +1099
     to 
      +1126
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inefficient Collection ConversionMultiple collection conversions and intermediate buffers create unnecessary overhead. The code creates temporary buffers and performs multiple iterations over the topic collection. Consider using partition/filter operations to categorize topics in a single pass, reducing memory allocations and CPU cycles. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| groupCoordinator.fetchOffsets( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| requestContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new OffsetFetchRequestData.OffsetFetchRequestGroup() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | @@ -1081,19 +1142,10 @@ class KafkaApis(val requestChannel: RequestChannel, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse.topics.size + unauthorizedTopics.size | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse.topics.size + errorTopics.size | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topics.addAll(offsetFetchResponse.topics) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| unauthorizedTopics.foreach { topic => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic.partitionIndexes.forEach { partitionIndex => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setPartitionIndex(partitionIndex) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setCommittedOffset(-1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topics.add(topicResponse) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topics.addAll(errorTopics.asJava) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetFetchResponse.setTopics(topics) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Authorization Logic with Topic IDs: This section modifies authorization logic to handle topic IDs. Please confirm that authorization checks work properly when using topic IDs instead of topic names, as this is security-critical functionality.