Skip to content

Commit bedcf4a

Browse files
authored
Add TxnOffsetCommit to Client (#741)
1 parent e6f0ae9 commit bedcf4a

File tree

4 files changed

+632
-0
lines changed

4 files changed

+632
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package txnoffsetcommit
2+
3+
import "github.com/segmentio/kafka-go/protocol"
4+
5+
func init() {
6+
protocol.Register(&Request{}, &Response{})
7+
}
8+
9+
type Request struct {
10+
// We need at least one tagged field to indicate that this is a "flexible" message
11+
// type.
12+
_ struct{} `kafka:"min=v3,max=v3,tag"`
13+
14+
TransactionalID string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
15+
GroupID string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
16+
ProducerID int64 `kafka:"min=v0,max=v3"`
17+
ProducerEpoch int16 `kafka:"min=v0,max=v3"`
18+
GenerationID int32 `kafka:"min=v3,max=v3"`
19+
MemberID string `kafka:"min=v3,max=v3,compact"`
20+
GroupInstanceID string `kafka:"min=v3,max=v3,compact,nullable"`
21+
Topics []RequestTopic `kafka:"min=v0,max=v3"`
22+
}
23+
24+
type RequestTopic struct {
25+
// We need at least one tagged field to indicate that this is a "flexible" message
26+
// type.
27+
_ struct{} `kafka:"min=v3,max=v3,tag"`
28+
29+
Name string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
30+
Partitions []RequestPartition `kafka:"min=v0,max=v3"`
31+
}
32+
33+
type RequestPartition struct {
34+
// We need at least one tagged field to indicate that this is a "flexible" message
35+
// type.
36+
_ struct{} `kafka:"min=v3,max=v3,tag"`
37+
38+
Partition int32 `kafka:"min=v0,max=v3"`
39+
CommittedOffset int64 `kafka:"min=v0,max=v3"`
40+
CommittedLeaderEpoch int32 `kafka:"min=v2,max=v3"`
41+
CommittedMetadata string `kafka:"min=v0,max=v2|min=v3,max=v3,nullable,compact"`
42+
}
43+
44+
func (r *Request) ApiKey() protocol.ApiKey { return protocol.TxnOffsetCommit }
45+
46+
type Response struct {
47+
// We need at least one tagged field to indicate that this is a "flexible" message
48+
// type.
49+
_ struct{} `kafka:"min=v3,max=v3,tag"`
50+
51+
ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
52+
Topics []ResponseTopic `kafka:"min=v0,max=v3"`
53+
}
54+
55+
type ResponseTopic struct {
56+
// We need at least one tagged field to indicate that this is a "flexible" message
57+
// type.
58+
_ struct{} `kafka:"min=v3,max=v3,tag"`
59+
60+
Name string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
61+
Partitions []ResponsePartition `kafka:"min=v0,max=v3"`
62+
}
63+
64+
type ResponsePartition struct {
65+
// We need at least one tagged field to indicate that this is a "flexible" message
66+
// type.
67+
_ struct{} `kafka:"min=v3,max=v3,tag"`
68+
69+
Partition int32 `kafka:"min=v0,max=v3"`
70+
ErrorCode int16 `kafka:"min=v0,max=v3"`
71+
}
72+
73+
func (r *Response) ApiKey() protocol.ApiKey { return protocol.TxnOffsetCommit }
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package txnoffsetcommit_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go/protocol/prototest"
7+
"github.com/segmentio/kafka-go/protocol/txnoffsetcommit"
8+
)
9+
10+
func TestTxnOffsetCommitRequest(t *testing.T) {
11+
for _, version := range []int16{0, 1} {
12+
prototest.TestRequest(t, version, &txnoffsetcommit.Request{
13+
TransactionalID: "transactional-id-0",
14+
GroupID: "group-0",
15+
ProducerID: 10,
16+
ProducerEpoch: 100,
17+
Topics: []txnoffsetcommit.RequestTopic{
18+
{
19+
Name: "topic-0",
20+
Partitions: []txnoffsetcommit.RequestPartition{
21+
{
22+
Partition: 0,
23+
CommittedOffset: 10,
24+
CommittedMetadata: "meta-0-0",
25+
},
26+
{
27+
Partition: 1,
28+
CommittedOffset: 10,
29+
CommittedMetadata: "meta-0-1",
30+
},
31+
},
32+
},
33+
{
34+
Name: "topic-1",
35+
Partitions: []txnoffsetcommit.RequestPartition{
36+
{
37+
Partition: 0,
38+
CommittedOffset: 10,
39+
CommittedMetadata: "meta-1-0",
40+
},
41+
{
42+
Partition: 1,
43+
CommittedOffset: 10,
44+
CommittedMetadata: "meta-1-1",
45+
},
46+
},
47+
},
48+
},
49+
})
50+
}
51+
52+
// Version 2 added:
53+
// Topics.RequestTopic.Partitions.CommittedLeaderEpoch
54+
for _, version := range []int16{2} {
55+
prototest.TestRequest(t, version, &txnoffsetcommit.Request{
56+
TransactionalID: "transactional-id-0",
57+
GroupID: "group-0",
58+
ProducerID: 10,
59+
ProducerEpoch: 100,
60+
Topics: []txnoffsetcommit.RequestTopic{
61+
{
62+
Name: "topic-0",
63+
Partitions: []txnoffsetcommit.RequestPartition{
64+
{
65+
Partition: 0,
66+
CommittedOffset: 10,
67+
CommittedLeaderEpoch: 100,
68+
CommittedMetadata: "meta-0-0",
69+
},
70+
{
71+
Partition: 1,
72+
CommittedOffset: 10,
73+
CommittedLeaderEpoch: 100,
74+
CommittedMetadata: "meta-0-1",
75+
},
76+
},
77+
},
78+
{
79+
Name: "topic-1",
80+
Partitions: []txnoffsetcommit.RequestPartition{
81+
{
82+
Partition: 0,
83+
CommittedOffset: 10,
84+
CommittedLeaderEpoch: 100,
85+
CommittedMetadata: "meta-1-0",
86+
},
87+
{
88+
Partition: 1,
89+
CommittedOffset: 10,
90+
CommittedLeaderEpoch: 100,
91+
CommittedMetadata: "meta-1-1",
92+
},
93+
},
94+
},
95+
},
96+
})
97+
}
98+
99+
// Version 3 added:
100+
// GenerationID
101+
// MemberID
102+
// GroupInstanceID
103+
for _, version := range []int16{3} {
104+
prototest.TestRequest(t, version, &txnoffsetcommit.Request{
105+
TransactionalID: "transactional-id-0",
106+
GroupID: "group-0",
107+
ProducerID: 10,
108+
ProducerEpoch: 100,
109+
GenerationID: 2,
110+
MemberID: "member-0",
111+
GroupInstanceID: "group-instance-id-0",
112+
Topics: []txnoffsetcommit.RequestTopic{
113+
{
114+
Name: "topic-0",
115+
Partitions: []txnoffsetcommit.RequestPartition{
116+
{
117+
Partition: 0,
118+
CommittedOffset: 10,
119+
CommittedLeaderEpoch: 100,
120+
CommittedMetadata: "meta-0-0",
121+
},
122+
{
123+
Partition: 1,
124+
CommittedOffset: 10,
125+
CommittedLeaderEpoch: 100,
126+
CommittedMetadata: "meta-0-1",
127+
},
128+
},
129+
},
130+
{
131+
Name: "topic-1",
132+
Partitions: []txnoffsetcommit.RequestPartition{
133+
{
134+
Partition: 0,
135+
CommittedOffset: 10,
136+
CommittedLeaderEpoch: 100,
137+
CommittedMetadata: "meta-1-0",
138+
},
139+
{
140+
Partition: 1,
141+
CommittedOffset: 10,
142+
CommittedLeaderEpoch: 100,
143+
CommittedMetadata: "meta-1-1",
144+
},
145+
},
146+
},
147+
},
148+
})
149+
}
150+
}
151+
152+
func TestTxnOffsetCommitResponse(t *testing.T) {
153+
for _, version := range []int16{0, 1, 2, 3} {
154+
prototest.TestResponse(t, version, &txnoffsetcommit.Response{
155+
ThrottleTimeMs: 10,
156+
Topics: []txnoffsetcommit.ResponseTopic{
157+
{
158+
Name: "topic-0",
159+
Partitions: []txnoffsetcommit.ResponsePartition{
160+
{
161+
Partition: 0,
162+
ErrorCode: 0,
163+
},
164+
{
165+
Partition: 1,
166+
ErrorCode: 10,
167+
},
168+
},
169+
},
170+
{
171+
Name: "topic-1",
172+
Partitions: []txnoffsetcommit.ResponsePartition{
173+
{
174+
Partition: 0,
175+
ErrorCode: 0,
176+
},
177+
{
178+
Partition: 1,
179+
ErrorCode: 10,
180+
},
181+
},
182+
},
183+
},
184+
})
185+
}
186+
}

0 commit comments

Comments
 (0)