Skip to content

Commit 80b9abe

Browse files
committed
KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)
This fixes librdkafka older than the recently released 2.11.0 with Kerberos authentication and Apache Kafka 4.x. Even though this is a bug in librdkafka, a key goal of KIP-896 is not to break the popular client libraries listed in it. Adding back JoinGroup v0 & v1 is a very small change and worth it from that perspective. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 8433ac4 commit 80b9abe

File tree

4 files changed

+29
-6
lines changed

4 files changed

+29
-6
lines changed

clients/src/main/resources/common/message/JoinGroupRequest.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
"type": "request",
1919
"listeners": ["broker"],
2020
"name": "JoinGroupRequest",
21-
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
22-
//
2321
// Version 1 adds RebalanceTimeoutMs. Version 2 and 3 are the same as version 1.
2422
//
2523
// Starting from version 4, the client needs to issue a second request to join group
@@ -34,7 +32,7 @@
3432
// Version 8 adds the Reason field (KIP-800).
3533
//
3634
// Version 9 is the same as version 8.
37-
"validVersions": "2-9",
35+
"validVersions": "0-9",
3836
"flexibleVersions": "6+",
3937
"fields": [
4038
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

clients/src/main/resources/common/message/JoinGroupResponse.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
"apiKey": 11,
1818
"type": "response",
1919
"name": "JoinGroupResponse",
20-
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
21-
//
2220
// Version 1 is the same as version 0.
2321
//
2422
// Version 2 adds throttle time.
@@ -37,7 +35,7 @@
3735
// Version 8 is the same as version 7.
3836
//
3937
// Version 9 adds the SkipAssignment field.
40-
"validVersions": "2-9",
38+
"validVersions": "0-9",
4139
"flexibleVersions": "6+",
4240
"fields": [
4341
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,

clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
import org.apache.kafka.common.errors.InvalidConfigurationException;
2020
import org.apache.kafka.common.errors.UnsupportedVersionException;
2121
import org.apache.kafka.common.message.JoinGroupRequestData;
22+
import org.apache.kafka.common.protocol.MessageUtil;
2223
import org.apache.kafka.test.TestUtils;
2324

2425
import org.junit.jupiter.api.Test;
2526

27+
import java.nio.ByteBuffer;
2628
import java.util.Arrays;
2729

30+
import static org.junit.jupiter.api.Assertions.assertEquals;
2831
import static org.junit.jupiter.api.Assertions.assertThrows;
2932
import static org.junit.jupiter.api.Assertions.fail;
3033

@@ -65,4 +68,20 @@ public void testRequestVersionCompatibilityFailBuild() {
6568
.setProtocolType("consumer")
6669
).build((short) 4));
6770
}
71+
72+
@Test
73+
public void testRebalanceTimeoutDefaultsToSessionTimeoutV0() {
74+
int sessionTimeoutMs = 30000;
75+
short version = 0;
76+
77+
ByteBuffer buffer = MessageUtil.toByteBuffer(new JoinGroupRequestData()
78+
.setGroupId("groupId")
79+
.setMemberId("consumerId")
80+
.setProtocolType("consumer")
81+
.setSessionTimeoutMs(sessionTimeoutMs), version);
82+
83+
JoinGroupRequest request = JoinGroupRequest.parse(buffer, version);
84+
assertEquals(sessionTimeoutMs, request.data().sessionTimeoutMs());
85+
assertEquals(sessionTimeoutMs, request.data().rebalanceTimeoutMs());
86+
}
6887
}

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,14 @@ public void testFetchRequestWithMetadata() {
665665
assertEquals(request.isolationLevel(), deserialized.isolationLevel());
666666
}
667667

668+
@Test
669+
public void testJoinGroupRequestV0RebalanceTimeout() {
670+
final short version = 0;
671+
JoinGroupRequest jgr = createJoinGroupRequest(version);
672+
JoinGroupRequest jgr2 = JoinGroupRequest.parse(jgr.serialize(), version);
673+
assertEquals(jgr2.data().rebalanceTimeoutMs(), jgr.data().rebalanceTimeoutMs());
674+
}
675+
668676
@Test
669677
public void testSerializeWithHeader() {
670678
CreatableTopicCollection topicsToCreate = new CreatableTopicCollection(1);

0 commit comments

Comments
 (0)