Skip to content

Commit f0d5443

Browse files
authored
add consumer group apis to client (#943)
* add consumer group apis to client JoinGroup,SyncGroup,LeaveGroup
1 parent da91759 commit f0d5443

17 files changed

+1441
-5
lines changed

.golangci.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ linters:
1111
disable:
1212
# Temporarily disabling so it can be addressed in a dedicated PR.
1313
- errcheck
14-
- goerr113
14+
- goerr113
15+
16+
linters-settings:
17+
goconst:
18+
ignore-tests: true

error.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ func (e Error) Title() string {
327327
return "Unknown Leader Epoch"
328328
case UnsupportedCompressionType:
329329
return "Unsupported Compression Type"
330+
case MemberIDRequired:
331+
return "Member ID Required"
330332
case EligibleLeadersNotAvailable:
331333
return "Eligible Leader Not Available"
332334
case ElectionNotNeeded:
@@ -534,6 +536,8 @@ func (e Error) Description() string {
534536
return "the leader epoch in the request is newer than the epoch on the broker"
535537
case UnsupportedCompressionType:
536538
return "the requesting client does not support the compression type of given partition"
539+
case MemberIDRequired:
540+
return "the group member needs to have a valid member id before actually entering a consumer group"
537541
case EligibleLeadersNotAvailable:
538542
return "eligible topic partition leaders are not available"
539543
case ElectionNotNeeded:

findcoordinator_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"bytes"
66
"context"
77
"errors"
8-
"fmt"
98
"reflect"
109
"strings"
1110
"testing"
@@ -53,7 +52,6 @@ func TestClientFindCoordinator(t *testing.T) {
5352
Key: "TransactionalID-1",
5453
KeyType: CoordinatorKeyTypeTransaction,
5554
})
56-
5755
if err != nil {
5856
t.Fatal(err)
5957
}
@@ -65,7 +63,6 @@ func TestClientFindCoordinator(t *testing.T) {
6563

6664
// WaitForCoordinatorIndefinitely is a blocking call till a coordinator is found.
6765
func waitForCoordinatorIndefinitely(ctx context.Context, c *Client, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
68-
fmt.Println("Trying to find Coordinator.")
6966
resp, err := c.FindCoordinator(ctx, req)
7067

7168
for shouldRetryfindingCoordinator(resp, err) && ctx.Err() == nil {

joingroup.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,192 @@ package kafka
33
import (
44
"bufio"
55
"bytes"
6+
"context"
7+
"fmt"
8+
"net"
9+
"time"
10+
11+
"github.com/segmentio/kafka-go/protocol"
12+
"github.com/segmentio/kafka-go/protocol/consumer"
13+
"github.com/segmentio/kafka-go/protocol/joingroup"
614
)
715

16+
// JoinGroupRequest is the request structure for the JoinGroup function.
17+
type JoinGroupRequest struct {
18+
// Address of the kafka broker to send the request to.
19+
Addr net.Addr
20+
21+
// GroupID of the group to join.
22+
GroupID string
23+
24+
// The duration after which the coordinator considers the consumer dead
25+
// if it has not received a heartbeat.
26+
SessionTimeout time.Duration
27+
28+
// The duration the coordination will wait for each member to rejoin when rebalancing the group.
29+
RebalanceTimeout time.Duration
30+
31+
// The ID assigned by the group coordinator.
32+
MemberID string
33+
34+
// The unique identifier for the consumer instance.
35+
GroupInstanceID string
36+
37+
// The name for the class of protocols implemented by the group being joined.
38+
ProtocolType string
39+
40+
// The list of protocols the member supports.
41+
Protocols []GroupProtocol
42+
}
43+
44+
// GroupProtocol represents a consumer group protocol.
45+
type GroupProtocol struct {
46+
// The protocol name.
47+
Name string
48+
49+
// The protocol metadata.
50+
Metadata GroupProtocolSubscription
51+
}
52+
53+
type GroupProtocolSubscription struct {
54+
// The Topics to subscribe to.
55+
Topics []string
56+
57+
// UserData assosiated with the subscription for the given protocol
58+
UserData []byte
59+
60+
// Partitions owned by this consumer.
61+
OwnedPartitions map[string][]int
62+
}
63+
64+
// JoinGroupResponse is the response structure for the JoinGroup function.
65+
type JoinGroupResponse struct {
66+
// An error that may have occurred when attempting to join the group.
67+
//
68+
// The errors contain the kafka error code. Programs may use the standard
69+
// errors.Is function to test the error against kafka error codes.
70+
Error error
71+
72+
// The amount of time that the broker throttled the request.
73+
Throttle time.Duration
74+
75+
// The generation ID of the group.
76+
GenerationID int
77+
78+
// The group protocol selected by the coordinatior.
79+
ProtocolName string
80+
81+
// The group protocol name.
82+
ProtocolType string
83+
84+
// The leader of the group.
85+
LeaderID string
86+
87+
// The group member ID.
88+
MemberID string
89+
90+
// The members of the group.
91+
Members []JoinGroupResponseMember
92+
}
93+
94+
// JoinGroupResponseMember represents a group memmber in a reponse to a JoinGroup request.
95+
type JoinGroupResponseMember struct {
96+
// The group memmber ID.
97+
ID string
98+
99+
// The unique identifier of the consumer instance.
100+
GroupInstanceID string
101+
102+
// The group member metadata.
103+
Metadata GroupProtocolSubscription
104+
}
105+
106+
// JoinGroup sends a join group request to the coordinator and returns the response.
107+
func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error) {
108+
joinGroup := joingroup.Request{
109+
GroupID: req.GroupID,
110+
SessionTimeoutMS: int32(req.SessionTimeout.Milliseconds()),
111+
RebalanceTimeoutMS: int32(req.RebalanceTimeout.Milliseconds()),
112+
MemberID: req.MemberID,
113+
GroupInstanceID: req.GroupInstanceID,
114+
ProtocolType: req.ProtocolType,
115+
Protocols: make([]joingroup.RequestProtocol, 0, len(req.Protocols)),
116+
}
117+
118+
for _, proto := range req.Protocols {
119+
protoMeta := consumer.Subscription{
120+
Version: consumer.MaxVersionSupported,
121+
Topics: proto.Metadata.Topics,
122+
UserData: proto.Metadata.UserData,
123+
OwnedPartitions: make([]consumer.TopicPartition, 0, len(proto.Metadata.OwnedPartitions)),
124+
}
125+
for topic, partitions := range proto.Metadata.OwnedPartitions {
126+
tp := consumer.TopicPartition{
127+
Topic: topic,
128+
Partitions: make([]int32, 0, len(partitions)),
129+
}
130+
for _, partition := range partitions {
131+
tp.Partitions = append(tp.Partitions, int32(partition))
132+
}
133+
protoMeta.OwnedPartitions = append(protoMeta.OwnedPartitions, tp)
134+
}
135+
136+
metaBytes, err := protocol.Marshal(consumer.MaxVersionSupported, protoMeta)
137+
if err != nil {
138+
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
139+
}
140+
141+
joinGroup.Protocols = append(joinGroup.Protocols, joingroup.RequestProtocol{
142+
Name: proto.Name,
143+
Metadata: metaBytes,
144+
})
145+
}
146+
147+
m, err := c.roundTrip(ctx, req.Addr, &joinGroup)
148+
if err != nil {
149+
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
150+
}
151+
152+
r := m.(*joingroup.Response)
153+
154+
res := &JoinGroupResponse{
155+
Error: makeError(r.ErrorCode, ""),
156+
Throttle: makeDuration(r.ThrottleTimeMS),
157+
GenerationID: int(r.GenerationID),
158+
ProtocolName: r.ProtocolName,
159+
ProtocolType: r.ProtocolType,
160+
LeaderID: r.LeaderID,
161+
MemberID: r.MemberID,
162+
Members: make([]JoinGroupResponseMember, 0, len(r.Members)),
163+
}
164+
165+
for _, member := range r.Members {
166+
var meta consumer.Subscription
167+
err = protocol.Unmarshal(member.Metadata, consumer.MaxVersionSupported, &meta)
168+
if err != nil {
169+
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
170+
}
171+
subscription := GroupProtocolSubscription{
172+
Topics: meta.Topics,
173+
UserData: meta.UserData,
174+
OwnedPartitions: make(map[string][]int, len(meta.OwnedPartitions)),
175+
}
176+
for _, owned := range meta.OwnedPartitions {
177+
subscription.OwnedPartitions[owned.Topic] = make([]int, 0, len(owned.Partitions))
178+
for _, partition := range owned.Partitions {
179+
subscription.OwnedPartitions[owned.Topic] = append(subscription.OwnedPartitions[owned.Topic], int(partition))
180+
}
181+
}
182+
res.Members = append(res.Members, JoinGroupResponseMember{
183+
ID: member.MemberID,
184+
GroupInstanceID: member.GroupInstanceID,
185+
Metadata: subscription,
186+
})
187+
}
188+
189+
return res, nil
190+
}
191+
8192
type groupMetadata struct {
9193
Version int16
10194
Topics []string

joingroup_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,127 @@ package kafka
33
import (
44
"bufio"
55
"bytes"
6+
"context"
7+
"errors"
68
"reflect"
79
"testing"
10+
"time"
11+
12+
ktesting "github.com/segmentio/kafka-go/testing"
813
)
914

15+
func TestClientJoinGroup(t *testing.T) {
16+
topic := makeTopic()
17+
client, shutdown := newLocalClient()
18+
defer shutdown()
19+
20+
err := clientCreateTopic(client, topic, 3)
21+
if err != nil {
22+
t.Fatal(err)
23+
}
24+
25+
groupID := makeGroupID()
26+
27+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
28+
defer cancel()
29+
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
30+
Addr: client.Addr,
31+
Key: groupID,
32+
KeyType: CoordinatorKeyTypeConsumer,
33+
})
34+
if err != nil {
35+
t.Fatal(err)
36+
}
37+
38+
if respc.Error != nil {
39+
t.Fatal(err)
40+
}
41+
42+
groupInstanceID := "group-instance-id"
43+
if !ktesting.KafkaIsAtLeast("2.4.1") {
44+
groupInstanceID = ""
45+
}
46+
const userData = "user-data"
47+
48+
req := &JoinGroupRequest{
49+
GroupID: groupID,
50+
GroupInstanceID: groupInstanceID,
51+
ProtocolType: "consumer",
52+
SessionTimeout: time.Minute,
53+
RebalanceTimeout: time.Minute,
54+
Protocols: []GroupProtocol{
55+
{
56+
Name: RoundRobinGroupBalancer{}.ProtocolName(),
57+
Metadata: GroupProtocolSubscription{
58+
Topics: []string{topic},
59+
UserData: []byte(userData),
60+
OwnedPartitions: map[string][]int{
61+
topic: {0, 1, 2},
62+
},
63+
},
64+
},
65+
},
66+
}
67+
68+
var resp *JoinGroupResponse
69+
70+
for {
71+
resp, err = client.JoinGroup(ctx, req)
72+
if err != nil {
73+
t.Fatal(err)
74+
}
75+
76+
if errors.Is(resp.Error, MemberIDRequired) {
77+
req.MemberID = resp.MemberID
78+
time.Sleep(time.Second)
79+
continue
80+
}
81+
82+
if resp.Error != nil {
83+
t.Fatal(resp.Error)
84+
}
85+
break
86+
}
87+
88+
if resp.GenerationID != 1 {
89+
t.Fatalf("expected generation ID to be 1 but got %v", resp.GenerationID)
90+
}
91+
92+
if resp.MemberID == "" {
93+
t.Fatal("expected a member ID in response")
94+
}
95+
96+
if resp.LeaderID != resp.MemberID {
97+
t.Fatalf("expected to be group leader but got %v", resp.LeaderID)
98+
}
99+
100+
if len(resp.Members) != 1 {
101+
t.Fatalf("expected 1 member got %v", resp.Members)
102+
}
103+
104+
member := resp.Members[0]
105+
106+
if member.ID != resp.MemberID {
107+
t.Fatal("expected to be the only group memmber")
108+
}
109+
110+
if member.GroupInstanceID != groupInstanceID {
111+
t.Fatalf("expected the group instance ID to be %v, got %v", groupInstanceID, member.GroupInstanceID)
112+
}
113+
114+
expectedMetadata := GroupProtocolSubscription{
115+
Topics: []string{topic},
116+
UserData: []byte(userData),
117+
OwnedPartitions: map[string][]int{
118+
topic: {0, 1, 2},
119+
},
120+
}
121+
122+
if !reflect.DeepEqual(member.Metadata, expectedMetadata) {
123+
t.Fatalf("\nexpected assignment to be \n%v\nbut got\n%v", expectedMetadata, member.Metadata)
124+
}
125+
}
126+
10127
func TestSaramaCompatibility(t *testing.T) {
11128
var (
12129
// sample data from github.com/Shopify/sarama

0 commit comments

Comments
 (0)