Skip to content

Commit 7501938

Browse files
authored
Support for offsetdelete (#1010)
* support for offsetdelete * gate test for proper kafka version * add test for offsetdelete response * fixed word in comment
1 parent 40898d3 commit 7501938

File tree

4 files changed

+365
-0
lines changed

4 files changed

+365
-0
lines changed

offsetdelete.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/offsetdelete"
10+
)
11+
12+
// OffsetDelete deletes the offset for a consumer group on a particular topic
13+
// for a particular partition.
14+
type OffsetDelete struct {
15+
Topic string
16+
Partition int
17+
}
18+
19+
// OffsetDeleteRequest represents a request sent to a kafka broker to delete
20+
// the offsets for a partition on a given topic associated with a consumer group.
21+
type OffsetDeleteRequest struct {
22+
// Address of the kafka broker to send the request to.
23+
Addr net.Addr
24+
25+
// ID of the consumer group to delete the offsets for.
26+
GroupID string
27+
28+
// Set of topic partitions to delete offsets for.
29+
Topics map[string][]int
30+
}
31+
32+
// OffsetDeleteResponse represents a response from a kafka broker to a delete
33+
// offset request.
34+
type OffsetDeleteResponse struct {
35+
// An error that may have occurred while attempting to delete an offset
36+
Error error
37+
38+
// The amount of time that the broker throttled the request.
39+
Throttle time.Duration
40+
41+
// Set of topic partitions that the kafka broker has additional info (error?)
42+
// for.
43+
Topics map[string][]OffsetDeletePartition
44+
}
45+
46+
// OffsetDeletePartition represents the state of a status of a partition in response
47+
// to deleting offsets.
48+
type OffsetDeletePartition struct {
49+
// ID of the partition.
50+
Partition int
51+
52+
// An error that may have occurred while attempting to delete an offset for
53+
// this partition.
54+
Error error
55+
}
56+
57+
// OffsetDelete sends a delete offset request to a kafka broker and returns the
58+
// response.
59+
func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) {
60+
topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics))
61+
62+
for topicName, partitionIndexes := range req.Topics {
63+
partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes))
64+
65+
for i, c := range partitionIndexes {
66+
partitions[i] = offsetdelete.RequestPartition{
67+
PartitionIndex: int32(c),
68+
}
69+
}
70+
71+
topics = append(topics, offsetdelete.RequestTopic{
72+
Name: topicName,
73+
Partitions: partitions,
74+
})
75+
}
76+
77+
m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{
78+
GroupID: req.GroupID,
79+
Topics: topics,
80+
})
81+
if err != nil {
82+
return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err)
83+
}
84+
r := m.(*offsetdelete.Response)
85+
86+
res := &OffsetDeleteResponse{
87+
Error: makeError(r.ErrorCode, ""),
88+
Throttle: makeDuration(r.ThrottleTimeMs),
89+
Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)),
90+
}
91+
92+
for _, topic := range r.Topics {
93+
partitions := make([]OffsetDeletePartition, len(topic.Partitions))
94+
95+
for i, p := range topic.Partitions {
96+
partitions[i] = OffsetDeletePartition{
97+
Partition: int(p.PartitionIndex),
98+
Error: makeError(p.ErrorCode, ""),
99+
}
100+
}
101+
102+
res.Topics[topic.Name] = partitions
103+
}
104+
105+
return res, nil
106+
}

offsetdelete_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
ktesting "github.com/segmentio/kafka-go/testing"
12+
)
13+
14+
func TestClientDeleteOffset(t *testing.T) {
15+
if !ktesting.KafkaIsAtLeast("2.4.0") {
16+
return
17+
}
18+
19+
topic := makeTopic()
20+
client, shutdown := newLocalClientWithTopic(topic, 3)
21+
defer shutdown()
22+
now := time.Now()
23+
24+
const N = 10 * 3
25+
records := make([]Record, 0, N)
26+
for i := 0; i < N; i++ {
27+
records = append(records, Record{
28+
Time: now,
29+
Value: NewBytes([]byte("test-message-" + strconv.Itoa(i))),
30+
})
31+
}
32+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
33+
defer cancel()
34+
res, err := client.Produce(ctx, &ProduceRequest{
35+
Topic: topic,
36+
RequiredAcks: RequireAll,
37+
Records: NewRecordReader(records...),
38+
})
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
43+
if res.Error != nil {
44+
t.Error(res.Error)
45+
}
46+
47+
for index, err := range res.RecordErrors {
48+
t.Fatalf("record at index %d produced an error: %v", index, err)
49+
}
50+
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
51+
defer cancel()
52+
groupID := makeGroupID()
53+
54+
group, err := NewConsumerGroup(ConsumerGroupConfig{
55+
ID: groupID,
56+
Topics: []string{topic},
57+
Brokers: []string{"localhost:9092"},
58+
HeartbeatInterval: 2 * time.Second,
59+
RebalanceTimeout: 2 * time.Second,
60+
RetentionTime: time.Hour,
61+
Logger: log.New(os.Stdout, "cg-test: ", 0),
62+
})
63+
if err != nil {
64+
t.Fatal(err)
65+
}
66+
67+
gen, err := group.Next(ctx)
68+
if err != nil {
69+
t.Fatal(err)
70+
}
71+
72+
ocr, err := client.OffsetCommit(ctx, &OffsetCommitRequest{
73+
Addr: nil,
74+
GroupID: groupID,
75+
GenerationID: int(gen.ID),
76+
MemberID: gen.MemberID,
77+
Topics: map[string][]OffsetCommit{
78+
topic: {
79+
{Partition: 0, Offset: 10},
80+
{Partition: 1, Offset: 10},
81+
{Partition: 2, Offset: 10},
82+
},
83+
},
84+
})
85+
if err != nil {
86+
t.Fatal(err)
87+
}
88+
89+
group.Close()
90+
91+
resps := ocr.Topics[topic]
92+
if len(resps) != 3 {
93+
t.Fatalf("expected 3 offsetcommitpartition responses; got %d", len(resps))
94+
}
95+
96+
for _, resp := range resps {
97+
if resp.Error != nil {
98+
t.Fatal(resp.Error)
99+
}
100+
}
101+
102+
ofr, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
103+
GroupID: groupID,
104+
Topics: map[string][]int{topic: {0, 1, 2}},
105+
})
106+
if err != nil {
107+
t.Fatal(err)
108+
}
109+
110+
if ofr.Error != nil {
111+
t.Error(res.Error)
112+
}
113+
114+
fetresps := ofr.Topics[topic]
115+
if len(fetresps) != 3 {
116+
t.Fatalf("expected 3 offsetfetchpartition responses; got %d", len(resps))
117+
}
118+
119+
for _, r := range fetresps {
120+
if r.Error != nil {
121+
t.Fatal(r.Error)
122+
}
123+
124+
if r.CommittedOffset != 10 {
125+
t.Fatalf("expected committed offset to be 10; got: %v for partition: %v", r.CommittedOffset, r.Partition)
126+
}
127+
}
128+
129+
// Remove offsets
130+
odr, err := client.OffsetDelete(ctx, &OffsetDeleteRequest{
131+
GroupID: groupID,
132+
Topics: map[string][]int{topic: {0, 1, 2}},
133+
})
134+
if err != nil {
135+
t.Fatal(err)
136+
}
137+
138+
if odr.Error != nil {
139+
t.Error(odr.Error)
140+
}
141+
142+
// Fetch the offsets again
143+
ofr, err = client.OffsetFetch(ctx, &OffsetFetchRequest{
144+
GroupID: groupID,
145+
Topics: map[string][]int{topic: {0, 1, 2}},
146+
})
147+
if err != nil {
148+
t.Fatal(err)
149+
}
150+
151+
if ofr.Error != nil {
152+
t.Error(res.Error)
153+
}
154+
155+
for _, r := range ofr.Topics[topic] {
156+
if r.CommittedOffset != -1 {
157+
t.Fatalf("expected committed offset to be -1; got: %v for partition: %v", r.CommittedOffset, r.Partition)
158+
}
159+
}
160+
}

protocol/offsetdelete/offsetdelete.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package offsetdelete
2+
3+
import "github.com/segmentio/kafka-go/protocol"
4+
5+
func init() {
6+
protocol.Register(&Request{}, &Response{})
7+
}
8+
9+
type Request struct {
10+
GroupID string `kafka:"min=v0,max=v0"`
11+
Topics []RequestTopic `kafka:"min=v0,max=v0"`
12+
}
13+
14+
func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetDelete }
15+
16+
func (r *Request) Group() string { return r.GroupID }
17+
18+
type RequestTopic struct {
19+
Name string `kafka:"min=v0,max=v0"`
20+
Partitions []RequestPartition `kafka:"min=v0,max=v0"`
21+
}
22+
23+
type RequestPartition struct {
24+
PartitionIndex int32 `kafka:"min=v0,max=v0"`
25+
}
26+
27+
var (
28+
_ protocol.GroupMessage = (*Request)(nil)
29+
)
30+
31+
type Response struct {
32+
ErrorCode int16 `kafka:"min=v0,max=v0"`
33+
ThrottleTimeMs int32 `kafka:"min=v0,max=v0"`
34+
Topics []ResponseTopic `kafka:"min=v0,max=v0"`
35+
}
36+
37+
func (r *Response) ApiKey() protocol.ApiKey { return protocol.OffsetDelete }
38+
39+
type ResponseTopic struct {
40+
Name string `kafka:"min=v0,max=v0"`
41+
Partitions []ResponsePartition `kafka:"min=v0,max=v0"`
42+
}
43+
44+
type ResponsePartition struct {
45+
PartitionIndex int32 `kafka:"min=v0,max=v0"`
46+
ErrorCode int16 `kafka:"min=v0,max=v0"`
47+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package offsetdelete_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go/protocol/offsetdelete"
7+
"github.com/segmentio/kafka-go/protocol/prototest"
8+
)
9+
10+
func TestOffsetDeleteRequest(t *testing.T) {
11+
for _, version := range []int16{0} {
12+
prototest.TestRequest(t, version, &offsetdelete.Request{
13+
GroupID: "group-0",
14+
Topics: []offsetdelete.RequestTopic{
15+
{
16+
Name: "topic-0",
17+
Partitions: []offsetdelete.RequestPartition{
18+
{
19+
PartitionIndex: 0,
20+
},
21+
{
22+
PartitionIndex: 1,
23+
},
24+
},
25+
},
26+
},
27+
})
28+
}
29+
}
30+
31+
func TestOffsetDeleteResponse(t *testing.T) {
32+
for _, version := range []int16{0} {
33+
prototest.TestResponse(t, version, &offsetdelete.Response{
34+
ErrorCode: 0,
35+
Topics: []offsetdelete.ResponseTopic{
36+
{
37+
Name: "topic-0",
38+
Partitions: []offsetdelete.ResponsePartition{
39+
{
40+
PartitionIndex: 0,
41+
ErrorCode: 1,
42+
},
43+
{
44+
PartitionIndex: 1,
45+
ErrorCode: 1,
46+
},
47+
},
48+
},
49+
},
50+
})
51+
}
52+
}

0 commit comments

Comments
 (0)