Skip to content

Commit 873e4ee

Browse files
committed
WIP fix.
1 parent fd3dc94 commit 873e4ee

File tree

6 files changed

+85
-48
lines changed

6 files changed

+85
-48
lines changed

batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ func (batch *Batch) ReadMessage() (Message, error) {
232232
msg.HighWaterMark = batch.highWaterMark
233233
msg.Time = makeTime(timestamp)
234234
msg.Headers = headers
235+
msg.GenerationId = batch.conn.generationId
235236

236237
return msg, err
237238
}

commit.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@ package kafka
33
// A commit represents the instruction of publishing an update of the last
44
// offset read by a program for a topic and partition.
55
type commit struct {
6-
topic string
7-
partition int
8-
offset int64
6+
topic string
7+
partition int
8+
offset int64
9+
generationId int32
910
}
1011

1112
// makeCommit builds a commit value from a message, the resulting commit takes
1213
// its topic, partition, and offset from the message.
1314
func makeCommit(msg Message) commit {
1415
return commit{
15-
topic: msg.Topic,
16-
partition: msg.Partition,
17-
offset: msg.Offset + 1,
16+
topic: msg.Topic,
17+
partition: msg.Partition,
18+
offset: msg.Offset + 1,
19+
generationId: msg.GenerationId,
1820
}
1921
}
2022

conn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ type Conn struct {
6565
apiVersions atomic.Value // apiVersionMap
6666

6767
transactionalID *string
68+
69+
generationId int32
6870
}
6971

7072
type apiVersionMap map[apiKey]ApiVersion
@@ -182,6 +184,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
182184
offset: FirstOffset,
183185
requiredAcks: -1,
184186
transactionalID: emptyToNullable(config.TransactionalID),
187+
generationId: -1,
185188
}
186189

187190
c.wb.w = &c.wbuf
@@ -388,6 +391,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
388391
return joinGroupResponseV1{}, Error(response.ErrorCode)
389392
}
390393

394+
c.generationId = response.GenerationID
391395
return response, nil
392396
}
393397

message.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ type Message struct {
2020
Value []byte
2121
Headers []Header
2222

23+
// If the message has been sent by a consumer group, it contains the
24+
// generation's id. Value is -1 if not using consumer groups.
25+
GenerationId int32
26+
2327
// This field is used to hold arbitrary data you wish to include, so it
2428
// will be available when handle it on the Writer's `Completion` method,
2529
// this support the application can do any post operation on each message.

reader.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,35 +150,49 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
150150
backoffDelayMax = 5 * time.Second
151151
)
152152

153+
messagesToSend := make(map[string]map[int]int64)
154+
for topic, partitionsInfo := range offsetStash {
155+
messagesToSend[topic] = make(map[int]int64)
156+
for partition, commitInfo := range partitionsInfo {
157+
messagesToSend[topic][partition] = commitInfo.offset
158+
}
159+
}
153160
for attempt := 0; attempt < retries; attempt++ {
154161
if attempt != 0 {
155162
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
156163
return
157164
}
158165
}
159166

160-
if err = gen.CommitOffsets(offsetStash); err == nil {
167+
if err = gen.CommitOffsets(messagesToSend); err == nil {
161168
return
162169
}
163170
}
164171

165172
return // err will not be nil
166173
}
167174

168-
// offsetStash holds offsets by topic => partition => offset.
169-
type offsetStash map[string]map[int]int64
175+
// offsetStash holds offsets by topic => partition => offsetEntry.
176+
type offsetEntry struct {
177+
offset int64
178+
generationId int32
179+
}
180+
type offsetStash map[string]map[int]offsetEntry
170181

171182
// merge updates the offsetStash with the offsets from the provided messages.
172183
func (o offsetStash) merge(commits []commit) {
173184
for _, c := range commits {
174185
offsetsByPartition, ok := o[c.topic]
175186
if !ok {
176-
offsetsByPartition = map[int]int64{}
187+
offsetsByPartition = map[int]offsetEntry{}
177188
o[c.topic] = offsetsByPartition
178189
}
179190

180-
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
181-
offsetsByPartition[c.partition] = c.offset
191+
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
192+
offsetsByPartition[c.partition] = offsetEntry{
193+
offset: c.offset,
194+
generationId: c.generationId,
195+
}
182196
}
183197
}
184198
}

reader_test.go

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,36 @@ func TestReaderConsumerGroup(t *testing.T) {
897897
}
898898
}
899899

900+
func TestReaderConsumerGroup2(t *testing.T) {
901+
// It appears that some of the tests depend on all these tests being
902+
// run concurrently to pass... this is brittle and should be fixed
903+
// at some point.
904+
t.Parallel()
905+
906+
topic := makeTopic()
907+
createTopic(t, topic, 2)
908+
defer deleteTopic(t, topic)
909+
910+
groupID := makeGroupID()
911+
r := NewReader(ReaderConfig{
912+
Brokers: []string{"localhost:9092"},
913+
Topic: topic,
914+
GroupID: groupID,
915+
HeartbeatInterval: 2 * time.Second,
916+
CommitInterval: 0,
917+
RebalanceTimeout: 2 * time.Second,
918+
RetentionTime: time.Hour,
919+
MinBytes: 1,
920+
MaxBytes: 1e6,
921+
})
922+
defer r.Close()
923+
924+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
925+
defer cancel()
926+
927+
testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t, ctx, r)
928+
}
929+
900930
func testReaderConsumerGroupHandshake(t *testing.T, ctx context.Context, r *Reader) {
901931
prepareReader(t, context.Background(), r, makeTestSequence(5)...)
902932

@@ -1228,27 +1258,7 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12281258
}
12291259

12301260
require.NoError(t, secondReader.CommitMessages(ctx, msgsForSecondReader[len(msgsForSecondReader)-1]))
1231-
require.NoError(t, firstReader.CommitMessages(ctx, msgsForSecondReader[0]))
1232-
resp, err := client.OffsetFetch(
1233-
ctx,
1234-
&OffsetFetchRequest{
1235-
GroupID: firstReader.config.GroupID,
1236-
Topics: map[string][]int{firstReader.config.Topic: {0, 1}},
1237-
},
1238-
)
1239-
require.NoError(t, err)
1240-
require.NotNil(t, resp)
1241-
if topics, ok := resp.Topics[firstReader.config.Topic]; !ok {
1242-
require.True(t, ok, "Topic not found")
1243-
} else {
1244-
for _, topic := range topics {
1245-
if offset, ok := topicsToCommit[topic.Partition]; ok {
1246-
assert.Equal(t, offset+1, topic.CommittedOffset, "committed partition %d had committed offset %d instead of %d", topic.Partition, topic.CommittedOffset, offset)
1247-
} else {
1248-
assert.Equal(t, int64(-1), topic.CommittedOffset, "not-committed partition %d had committed offset %d instead of 0", topic.Partition, topic.CommittedOffset)
1249-
}
1250-
}
1251-
}
1261+
require.ErrorIs(t, errInvalidWritePartition, firstReader.CommitMessages(ctx, msgsForSecondReader[0]))
12521262
}
12531263

12541264
func TestOffsetStash(t *testing.T) {
@@ -1272,16 +1282,16 @@ func TestOffsetStash(t *testing.T) {
12721282
Given: offsetStash{},
12731283
Messages: []Message{newMessage(0, 0)},
12741284
Expected: offsetStash{
1275-
topic: {0: 1},
1285+
topic: {0: {1, 1}},
12761286
},
12771287
},
12781288
"ignores earlier offsets": {
12791289
Given: offsetStash{
1280-
topic: {0: 2},
1290+
topic: {0: {2, 1}},
12811291
},
12821292
Messages: []Message{newMessage(0, 0)},
12831293
Expected: offsetStash{
1284-
topic: {0: 2},
1294+
topic: {0: {2, 1}},
12851295
},
12861296
},
12871297
"uses latest offset": {
@@ -1292,7 +1302,7 @@ func TestOffsetStash(t *testing.T) {
12921302
newMessage(0, 1),
12931303
},
12941304
Expected: offsetStash{
1295-
topic: {0: 4},
1305+
topic: {0: {4, 1}},
12961306
},
12971307
},
12981308
"uses latest offset, across multiple topics": {
@@ -1306,8 +1316,8 @@ func TestOffsetStash(t *testing.T) {
13061316
},
13071317
Expected: offsetStash{
13081318
topic: {
1309-
0: 4,
1310-
1: 7,
1319+
0: {4, 1},
1320+
1: {7, 1},
13111321
},
13121322
},
13131323
},
@@ -1359,10 +1369,11 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
13591369
return offsetCommitResponseV2{}, nil
13601370
},
13611371
},
1362-
done: make(chan struct{}),
1363-
log: func(func(Logger)) {},
1364-
logError: func(func(Logger)) {},
1365-
joined: make(chan struct{}),
1372+
done: make(chan struct{}),
1373+
log: func(func(Logger)) {},
1374+
logError: func(func(Logger)) {},
1375+
joined: make(chan struct{}),
1376+
Assignments: map[string][]PartitionAssignment{"topic": {{0, 1}}},
13661377
}
13671378

13681379
// initialize commits so that the commitLoopImmediate select statement blocks
@@ -1396,7 +1407,7 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
13961407
}
13971408

13981409
func TestCommitOffsetsWithRetry(t *testing.T) {
1399-
offsets := offsetStash{"topic": {0: 0}}
1410+
offsets := offsetStash{"topic": {0: {0, 1}}}
14001411

14011412
tests := map[string]struct {
14021413
Fails int
@@ -1430,9 +1441,10 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
14301441
return offsetCommitResponseV2{}, nil
14311442
},
14321443
},
1433-
done: make(chan struct{}),
1434-
log: func(func(Logger)) {},
1435-
logError: func(func(Logger)) {},
1444+
done: make(chan struct{}),
1445+
log: func(func(Logger)) {},
1446+
logError: func(func(Logger)) {},
1447+
Assignments: map[string][]PartitionAssignment{"topic": {{0, 1}}},
14361448
}
14371449

14381450
r := &Reader{stctx: context.Background()}
@@ -1636,7 +1648,7 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
16361648
}
16371649
}
16381650

1639-
func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) {
1651+
func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) {
16401652
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
16411653
defer cancel()
16421654

0 commit comments

Comments
 (0)