@@ -8295,6 +8295,151 @@ class KafkaApisTest extends Logging {
82958295 }
82968296 }
82978297
8298+ @ParameterizedTest
8299+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
8300+ def testHandleOffsetFetchWithUnknownTopicIds(version: Short): Unit = {
8301+ // We only test with topic ids.
8302+ if (version < 10) return
8303+
8304+ val foo = "foo"
8305+ val bar = "bar"
8306+ val fooId = Uuid.randomUuid()
8307+ val barId = Uuid.randomUuid()
8308+ addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
8309+
8310+ def makeRequest(version: Short): RequestChannel.Request = {
8311+ buildRequest(
8312+ new OffsetFetchRequest.Builder(
8313+ new OffsetFetchRequestData()
8314+ .setGroups(List(
8315+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8316+ .setGroupId("group-1")
8317+ .setTopics(List(
8318+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8319+ .setName(foo)
8320+ .setTopicId(fooId)
8321+ .setPartitionIndexes(List[Integer](0).asJava),
8322+ // bar does not exist so it must return UNKNOWN_TOPIC_ID.
8323+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8324+ .setName(bar)
8325+ .setTopicId(barId)
8326+ .setPartitionIndexes(List[Integer](0).asJava)
8327+ ).asJava),
8328+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8329+ .setGroupId("group-2")
8330+ .setTopics(null)
8331+ ).asJava),
8332+ false
8333+ ).build(version)
8334+ )
8335+ }
8336+
8337+ val requestChannelRequest = makeRequest(version)
8338+
8339+ val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
8340+ when(groupCoordinator.fetchOffsets(
8341+ requestChannelRequest.context,
8342+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8343+ .setGroupId("group-1")
8344+ .setTopics(List(
8345+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8346+ .setTopicId(fooId)
8347+ .setName("foo")
8348+ .setPartitionIndexes(List[Integer](0).asJava)).asJava),
8349+ false
8350+ )).thenReturn(group1Future)
8351+
8352+ val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
8353+ when(groupCoordinator.fetchAllOffsets(
8354+ requestChannelRequest.context,
8355+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8356+ .setGroupId("group-2")
8357+ .setTopics(null),
8358+ false
8359+ )).thenReturn(group2Future)
8360+
8361+ kafkaApis = createKafkaApis()
8362+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
8363+
8364+ val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
8365+ .setGroupId("group-1")
8366+ .setTopics(List(
8367+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8368+ .setTopicId(fooId)
8369+ .setName(foo)
8370+ .setPartitions(List(
8371+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8372+ .setPartitionIndex(0)
8373+ .setCommittedOffset(100)
8374+ .setCommittedLeaderEpoch(1)
8375+ ).asJava)
8376+ ).asJava)
8377+
8378+ val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
8379+ .setGroupId("group-2")
8380+ .setTopics(List(
8381+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8382+ .setName(foo)
8383+ .setPartitions(List(
8384+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8385+ .setPartitionIndex(0)
8386+ .setCommittedOffset(100)
8387+ .setCommittedLeaderEpoch(1)
8388+ ).asJava),
8389+ // bar does not exist so it must be filtered out.
8390+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8391+ .setName(bar)
8392+ .setPartitions(List(
8393+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8394+ .setPartitionIndex(0)
8395+ .setCommittedOffset(100)
8396+ .setCommittedLeaderEpoch(1)
8397+ ).asJava)
8398+ ).asJava)
8399+
8400+ val expectedResponse = new OffsetFetchResponseData()
8401+ .setGroups(List(
8402+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
8403+ .setGroupId("group-1")
8404+ .setTopics(List(
8405+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8406+ .setTopicId(fooId)
8407+ .setPartitions(List(
8408+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8409+ .setPartitionIndex(0)
8410+ .setCommittedOffset(100)
8411+ .setCommittedLeaderEpoch(1)
8412+ ).asJava),
8413+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8414+ .setTopicId(barId)
8415+ .setPartitions(List(
8416+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8417+ .setPartitionIndex(0)
8418+ .setCommittedOffset(-1)
8419+ .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
8420+ ).asJava)
8421+ ).asJava),
8422+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
8423+ .setGroupId("group-2")
8424+ .setTopics(List(
8425+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8426+ .setTopicId(fooId)
8427+ .setPartitions(List(
8428+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8429+ .setPartitionIndex(0)
8430+ .setCommittedOffset(100)
8431+ .setCommittedLeaderEpoch(1)
8432+ ).asJava)
8433+ ).asJava)
8434+ ).asJava)
8435+
8436+ group1Future.complete(group1Response)
8437+ group2Future.complete(group2Response)
8438+
8439+ val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
8440+ assertEquals(expectedResponse, response.data)
8441+ }
8442+
82988443 @ParameterizedTest
82998444 @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
83008445 def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
@@ -8663,27 +8808,55 @@ class KafkaApisTest extends Logging {
86638808 assertEquals(expectedOffsetFetchResponse, response.data)
86648809 }
86658810
8666- // TODO Add test for unknown topic id
8667- // TODO Add test for topic id provided by coordinator that should not be overriden.
8811+ @ParameterizedTest
8812+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
8813+ def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(version: Short): Unit = {
8814+ // We don't test the non batched API.
8815+ if (version < 8) return
8816+
8817+ val foo = "foo"
8818+ val bar = "bar"
8819+ val fooId = Uuid.randomUuid()
8820+ val barId = Uuid.randomUuid()
8821+ addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
8822+ addTopicToMetadataCache(bar, topicId = barId, numPartitions = 2)
86688823
8669- // TODO Parameterize it.
8670- @Test
8671- def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
86728824 def makeRequest(version: Short): RequestChannel.Request = {
8673- val groups = Map(
8674- "group-1" -> List(
8675- new TopicPartition("foo", 0),
8676- new TopicPartition("bar", 0)
8677- ).asJava,
8678- "group-2" -> List(
8679- new TopicPartition("foo", 0),
8680- new TopicPartition("bar", 0)
8681- ).asJava
8682- ).asJava
8683- buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
8825+ buildRequest(
8826+ new OffsetFetchRequest.Builder(
8827+ new OffsetFetchRequestData()
8828+ .setGroups(List(
8829+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8830+ .setGroupId("group-1")
8831+ .setTopics(List(
8832+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8833+ .setName(foo)
8834+ .setTopicId(fooId)
8835+ .setPartitionIndexes(List[Integer](0).asJava),
8836+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8837+ .setName(bar)
8838+ .setTopicId(barId)
8839+ .setPartitionIndexes(List[Integer](0).asJava)
8840+ ).asJava),
8841+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8842+ .setGroupId("group-2")
8843+ .setTopics(List(
8844+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8845+ .setName(foo)
8846+ .setTopicId(fooId)
8847+ .setPartitionIndexes(List[Integer](0).asJava),
8848+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8849+ .setName(bar)
8850+ .setTopicId(barId)
8851+ .setPartitionIndexes(List[Integer](0).asJava)
8852+ ).asJava)
8853+ ).asJava),
8854+ false
8855+ ).build(version)
8856+ )
86848857 }
86858858
8686- val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion(false) )
8859+ val requestChannelRequest = makeRequest(version )
86878860
86888861 val authorizer: Authorizer = mock(classOf[Authorizer])
86898862
@@ -8711,7 +8884,8 @@ class KafkaApisTest extends Logging {
87118884 new OffsetFetchRequestData.OffsetFetchRequestGroup()
87128885 .setGroupId("group-1")
87138886 .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
8714- .setName("bar")
8887+ .setName(bar)
8888+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
87158889 .setPartitionIndexes(List[Integer](0).asJava)).asJava),
87168890 false
87178891 )).thenReturn(group1Future)
@@ -8722,7 +8896,8 @@ class KafkaApisTest extends Logging {
87228896 new OffsetFetchRequestData.OffsetFetchRequestGroup()
87238897 .setGroupId("group-2")
87248898 .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
8725- .setName("bar")
8899+ .setName(bar)
8900+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
87268901 .setPartitionIndexes(List[Integer](0).asJava)).asJava),
87278902 false
87288903 )).thenReturn(group1Future)
0 commit comments