@@ -1027,6 +1027,8 @@ class KafkaApis(val requestChannel: RequestChannel,
10271027 offsetFetchRequest : OffsetFetchRequestData .OffsetFetchRequestGroup ,
10281028 requireStable : Boolean
10291029 ): CompletableFuture [OffsetFetchResponseData .OffsetFetchResponseGroup ] = {
1030+ val useTopicIds = OffsetFetchRequest .useTopicIds(requestContext.apiVersion)
1031+
10301032 groupCoordinator.fetchAllOffsets(
10311033 requestContext,
10321034 offsetFetchRequest,
@@ -1040,13 +1042,33 @@ class KafkaApis(val requestChannel: RequestChannel,
10401042 offsetFetchResponse
10411043 } else {
10421044 // Clients are not allowed to see offsets for topics that are not authorized for Describe.
1043- val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized (
1045+ val authorizedNames = authHelper.filterByAuthorized (
10441046 requestContext,
10451047 DESCRIBE ,
10461048 TOPIC ,
10471049 offsetFetchResponse.topics.asScala
10481050 )(_.name)
1049- offsetFetchResponse.setTopics(authorizedOffsets.asJava)
1051+
1052+ val topics = new mutable.ArrayBuffer [OffsetFetchResponseData .OffsetFetchResponseTopics ]
1053+ offsetFetchResponse.topics.forEach { topic =>
1054+ if (authorizedNames.contains(topic.name)) {
1055+ if (useTopicIds) {
1056+ // If the topic is not provided by the group coordinator, we set it
1057+ // using the metadata cache.
1058+ if (topic.topicId == Uuid .ZERO_UUID ) {
1059+ metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name))
1060+ }
1061+ // If we don't have the topic id at all, we skip the topic because
1062+ // we can not serialize it without it.
1063+ if (topic.topicId != Uuid .ZERO_UUID ) {
1064+ topics += topic
1065+ }
1066+ } else {
1067+ topics += topic
1068+ }
1069+ }
1070+ }
1071+ offsetFetchResponse.setTopics(topics.asJava)
10501072 }
10511073 }
10521074 }
@@ -1056,14 +1078,53 @@ class KafkaApis(val requestChannel: RequestChannel,
10561078 offsetFetchRequest : OffsetFetchRequestData .OffsetFetchRequestGroup ,
10571079 requireStable : Boolean
10581080 ): CompletableFuture [OffsetFetchResponseData .OffsetFetchResponseGroup ] = {
1081+ val useTopicIds = OffsetFetchRequest .useTopicIds(requestContext.apiVersion)
1082+
1083+ if (useTopicIds) {
1084+ offsetFetchRequest.topics.forEach { topic =>
1085+ if (topic.topicId != Uuid .ZERO_UUID ) {
1086+ metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name))
1087+ }
1088+ }
1089+ }
1090+
10591091 // Clients are not allowed to see offsets for topics that are not authorized for Describe.
1060- val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized (
1092+ val authorizedTopicNames = authHelper.filterByAuthorized (
10611093 requestContext,
10621094 DESCRIBE ,
10631095 TOPIC ,
10641096 offsetFetchRequest.topics.asScala
10651097 )(_.name)
10661098
1099+ val authorizedTopics = new mutable.ArrayBuffer [OffsetFetchRequestData .OffsetFetchRequestTopics ]
1100+ val errorTopics = new mutable.ArrayBuffer [OffsetFetchResponseData .OffsetFetchResponseTopics ]
1101+
1102+ def buildErrorResponse (
1103+ topic : OffsetFetchRequestData .OffsetFetchRequestTopics ,
1104+ error : Errors
1105+ ): OffsetFetchResponseData .OffsetFetchResponseTopics = {
1106+ val topicResponse = new OffsetFetchResponseData .OffsetFetchResponseTopics ()
1107+ .setTopicId(topic.topicId)
1108+ .setName(topic.name)
1109+ topic.partitionIndexes.forEach { partitionIndex =>
1110+ topicResponse.partitions.add(new OffsetFetchResponseData .OffsetFetchResponsePartitions ()
1111+ .setPartitionIndex(partitionIndex)
1112+ .setCommittedOffset(- 1 )
1113+ .setErrorCode(error.code))
1114+ }
1115+ topicResponse
1116+ }
1117+
1118+ offsetFetchRequest.topics.forEach { topic =>
1119+ if (useTopicIds && topic.name.isEmpty) {
1120+ errorTopics += buildErrorResponse(topic, Errors .UNKNOWN_TOPIC_ID )
1121+ } else if (! authorizedTopicNames.contains(topic.name)) {
1122+ errorTopics += buildErrorResponse(topic, Errors .TOPIC_AUTHORIZATION_FAILED )
1123+ } else {
1124+ authorizedTopics += topic
1125+ }
1126+ }
1127+
10671128 groupCoordinator.fetchOffsets(
10681129 requestContext,
10691130 new OffsetFetchRequestData .OffsetFetchRequestGroup ()
@@ -1081,19 +1142,10 @@ class KafkaApis(val requestChannel: RequestChannel,
10811142 offsetFetchResponse
10821143 } else {
10831144 val topics = new util.ArrayList [OffsetFetchResponseData .OffsetFetchResponseTopics ](
1084- offsetFetchResponse.topics.size + unauthorizedTopics .size
1145+ offsetFetchResponse.topics.size + errorTopics .size
10851146 )
10861147 topics.addAll(offsetFetchResponse.topics)
1087- unauthorizedTopics.foreach { topic =>
1088- val topicResponse = new OffsetFetchResponseData .OffsetFetchResponseTopics ().setName(topic.name)
1089- topic.partitionIndexes.forEach { partitionIndex =>
1090- topicResponse.partitions.add(new OffsetFetchResponseData .OffsetFetchResponsePartitions ()
1091- .setPartitionIndex(partitionIndex)
1092- .setCommittedOffset(- 1 )
1093- .setErrorCode(Errors .TOPIC_AUTHORIZATION_FAILED .code))
1094- }
1095- topics.add(topicResponse)
1096- }
1148+ topics.addAll(errorTopics.asJava)
10971149 offsetFetchResponse.setTopics(topics)
10981150 }
10991151 }
0 commit comments