Skip to content

Commit 6351bc0

Browse files
MINOR: Fix response for consumer group describe with empty group id (#20030) (#20036)
ConsumerGroupDescribe with an empty group id returns a response containing `null` groupId in a non-nullable field. Since the response cannot be serialized, this results in UNKNOWN_SERVER_ERROR being returned to the client. This PR sets the group id in the response to an empty string instead and adds request tests for empty group id. Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 15ec053 commit 6351bc0

File tree

10 files changed

+137
-12
lines changed

10 files changed

+137
-12
lines changed

core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,20 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
211211
)
212212

213213
assertEquals(expected, actual)
214+
215+
val unknownGroupResponse = consumerGroupDescribe(
216+
groupIds = List("grp-unknown"),
217+
includeAuthorizedOperations = true,
218+
version = version.toShort,
219+
)
220+
assertEquals(Errors.GROUP_ID_NOT_FOUND.code, unknownGroupResponse.head.errorCode())
221+
222+
val emptyGroupResponse = consumerGroupDescribe(
223+
groupIds = List(""),
224+
includeAuthorizedOperations = true,
225+
version = version.toShort,
226+
)
227+
assertEquals(Errors.INVALID_GROUP_ID.code, emptyGroupResponse.head.errorCode())
214228
}
215229
} finally {
216230
admin.close()

core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,48 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
301301
}
302302
}
303303

304+
@ClusterTest
305+
def testEmptyConsumerGroupId(): Unit = {
306+
val admin = cluster.admin()
307+
308+
// Creates the __consumer_offsets topics because it won't be created automatically
309+
// in this test because it does not use FindCoordinator API.
310+
try {
311+
TestUtils.createOffsetsTopicWithAdmin(
312+
admin = admin,
313+
brokers = cluster.brokers.values().asScala.toSeq,
314+
controllers = cluster.controllers().values().asScala.toSeq
315+
)
316+
317+
// Heartbeat request to join the group. Note that the member subscribes
318+
// to an nonexistent topic.
319+
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
320+
new ConsumerGroupHeartbeatRequestData()
321+
.setGroupId("")
322+
.setMemberId(Uuid.randomUuid().toString)
323+
.setMemberEpoch(0)
324+
.setRebalanceTimeoutMs(5 * 60 * 1000)
325+
.setSubscribedTopicNames(List("foo").asJava)
326+
.setTopicPartitions(List.empty.asJava),
327+
true
328+
).build()
329+
330+
// Send the request until receiving a successful response. There is a delay
331+
// here because the group coordinator is loaded in the background.
332+
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
333+
TestUtils.waitUntilTrue(() => {
334+
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
335+
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code
336+
}, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
337+
338+
// Verify the response.
339+
assertEquals(Errors.INVALID_REQUEST.code, consumerGroupHeartbeatResponse.data.errorCode)
340+
assertEquals("GroupId can't be empty.", consumerGroupHeartbeatResponse.data.errorMessage)
341+
} finally {
342+
admin.close()
343+
}
344+
}
345+
304346
@ClusterTest
305347
def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = {
306348
val admin = cluster.admin()

core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
9999
)
100100

101101
deleteGroups(
102-
groupIds = List("grp-non-empty", "grp"),
103-
expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE),
102+
groupIds = List("grp-non-empty", "grp", ""),
103+
expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE, Errors.GROUP_ID_NOT_FOUND),
104104
version = version.toShort
105105
)
106106

core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,15 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat
104104
.setGroupId("grp-unknown")
105105
.setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist.
106106
.setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code())
107-
.setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null)
107+
.setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null),
108+
new DescribedGroup()
109+
.setGroupId("")
110+
.setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist.
111+
.setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code())
112+
.setErrorMessage(if (version >= 6) "Group not found." else null)
108113
),
109114
describeGroups(
110-
groupIds = List("grp-1", "grp-2", "grp-unknown"),
115+
groupIds = List("grp-1", "grp-2", "grp-unknown", ""),
111116
version = version.toShort
112117
)
113118
)

core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,15 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
190190
expectedError = Errors.UNKNOWN_MEMBER_ID,
191191
version = version.toShort
192192
)
193+
194+
// Heartbeat with empty group id.
195+
heartbeat(
196+
groupId = "",
197+
memberId = leaderMemberId,
198+
generationId = -1,
199+
expectedError = Errors.INVALID_GROUP_ID,
200+
version = version.toShort
201+
)
193202
}
194203
}
195204
}

core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
149149
)
150150
)
151151

152+
// Join with an empty group id.
153+
verifyJoinGroupResponseDataEquals(
154+
new JoinGroupResponseData()
155+
.setErrorCode(Errors.INVALID_GROUP_ID.code)
156+
.setProtocolName(if (version >= 7) null else ""),
157+
sendJoinRequest(
158+
groupId = "",
159+
version = version.toShort
160+
)
161+
)
162+
152163
// Join with an inconsistent protocolType.
153164
verifyJoinGroupResponseDataEquals(
154165
new JoinGroupResponseData()

core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,39 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
269269
)
270270
)
271271

272+
// Fetch with empty group id.
273+
assertEquals(
274+
new OffsetFetchResponseData.OffsetFetchResponseGroup()
275+
.setGroupId("")
276+
.setTopics(List(
277+
new OffsetFetchResponseData.OffsetFetchResponseTopics()
278+
.setName("foo")
279+
.setPartitions(List(
280+
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
281+
.setPartitionIndex(0)
282+
.setCommittedOffset(-1L),
283+
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
284+
.setPartitionIndex(1)
285+
.setCommittedOffset(-1L),
286+
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
287+
.setPartitionIndex(5)
288+
.setCommittedOffset(-1L)
289+
).asJava)
290+
).asJava),
291+
fetchOffsets(
292+
groupId = "",
293+
memberId = memberId,
294+
memberEpoch = memberEpoch,
295+
partitions = List(
296+
new TopicPartition("foo", 0),
297+
new TopicPartition("foo", 1),
298+
new TopicPartition("foo", 5) // This one does not exist.
299+
),
300+
requireStable = requireStable,
301+
version = version.toShort
302+
)
303+
)
304+
272305
// Fetch with stale member epoch.
273306
assertEquals(
274307
new OffsetFetchResponseData.OffsetFetchResponseGroup()

core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,17 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
7676
version = version.toShort
7777
)
7878

79+
// Sync with empty group id.
80+
verifySyncGroupWithOldProtocol(
81+
groupId = "",
82+
memberId = "member-id",
83+
generationId = -1,
84+
expectedProtocolType = null,
85+
expectedProtocolName = null,
86+
expectedError = Errors.INVALID_GROUP_ID,
87+
version = version.toShort
88+
)
89+
7990
val metadata = ConsumerProtocol.serializeSubscription(
8091
new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo"))
8192
).array

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
636636
} else {
637637
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
638638
new ConsumerGroupDescribeResponseData.DescribedGroup()
639-
.setGroupId(null)
639+
.setGroupId("")
640640
.setErrorCode(Errors.INVALID_GROUP_ID.code())
641641
)));
642642
}
@@ -687,7 +687,7 @@ public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
687687
} else {
688688
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
689689
new ShareGroupDescribeResponseData.DescribedGroup()
690-
.setGroupId(null)
690+
.setGroupId("")
691691
.setErrorCode(Errors.INVALID_GROUP_ID.code())
692692
)));
693693
}
@@ -736,7 +736,7 @@ public CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> descri
736736
if (groupId == null) {
737737
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
738738
new DescribeGroupsResponseData.DescribedGroup()
739-
.setGroupId(null)
739+
.setGroupId("")
740740
.setErrorCode(Errors.INVALID_GROUP_ID.code())
741741
)));
742742
} else {

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,7 +1038,7 @@ public void testDescribeGroupsInvalidGroupId() throws Exception {
10381038
.setGroupId("");
10391039
List<DescribeGroupsResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
10401040
new DescribeGroupsResponseData.DescribedGroup()
1041-
.setGroupId(null)
1041+
.setGroupId("")
10421042
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
10431043
describedGroup
10441044
);
@@ -1470,11 +1470,11 @@ public void testConsumerGroupDescribeInvalidGroupId() throws ExecutionException,
14701470
service.startup(() -> partitionCount);
14711471

14721472
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
1473-
.setGroupId(null)
1473+
.setGroupId("")
14741474
.setErrorCode(Errors.INVALID_GROUP_ID.code());
14751475
List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
14761476
new ConsumerGroupDescribeResponseData.DescribedGroup()
1477-
.setGroupId(null)
1477+
.setGroupId("")
14781478
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
14791479
describedGroup
14801480
);
@@ -2347,11 +2347,11 @@ public void testShareGroupDescribeInvalidGroupId() throws ExecutionException, In
23472347
service.startup(() -> partitionCount);
23482348

23492349
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
2350-
.setGroupId(null)
2350+
.setGroupId("")
23512351
.setErrorCode(Errors.INVALID_GROUP_ID.code());
23522352
List<ShareGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
23532353
new ShareGroupDescribeResponseData.DescribedGroup()
2354-
.setGroupId(null)
2354+
.setGroupId("")
23552355
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
23562356
describedGroup
23572357
);

0 commit comments

Comments
 (0)