Skip to content

Commit 5744d1d

Browse files
committed
Add a fix.
1 parent 873e4ee commit 5744d1d

File tree

3 files changed

+129
-46
lines changed

3 files changed

+129
-46
lines changed

batch.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ func (batch *Batch) ReadMessage() (Message, error) {
232232
msg.HighWaterMark = batch.highWaterMark
233233
msg.Time = makeTime(timestamp)
234234
msg.Headers = headers
235-
msg.GenerationId = batch.conn.generationId
235+
if batch.conn != nil {
236+
msg.GenerationId = batch.conn.generationId
237+
}
236238

237239
return msg, err
238240
}

reader.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() {
121121
// another consumer to avoid such a race.
122122
}
123123

124-
func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
124+
func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) {
125125
offsets := make(map[topicPartition]int64)
126126
for topic, assignments := range allAssignments {
127127
for _, assignment := range assignments {
@@ -134,7 +134,7 @@ func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
134134
}
135135

136136
r.mutex.Lock()
137-
r.start(offsets)
137+
r.start(generationId, offsets)
138138
r.mutex.Unlock()
139139

140140
r.withLogger(func(l Logger) {
@@ -152,9 +152,31 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
152152

153153
messagesToSend := make(map[string]map[int]int64)
154154
for topic, partitionsInfo := range offsetStash {
155-
messagesToSend[topic] = make(map[int]int64)
155+
msgsForTopic := make(map[int]int64)
156156
for partition, commitInfo := range partitionsInfo {
157-
messagesToSend[topic][partition] = commitInfo.offset
157+
// if there is a generation and it is different than the current
158+
// it means there was a rebalance
159+
if commitInfo.generationId != -1 {
160+
if gen.ID == commitInfo.generationId {
161+
msgsForTopic[partition] = commitInfo.offset
162+
} else {
163+
if assignments, ok := gen.Assignments[topic]; ok {
164+
for _, assignment := range assignments {
165+
if assignment.ID == partition && commitInfo.generationId > gen.ID {
166+
msgsForTopic[partition] = commitInfo.offset
167+
}
168+
}
169+
}
170+
r.withErrorLogger(func(l Logger) {
171+
l.Printf("Discarding commint for %s - %d: %d . Current generation is %d, commit generation is %d", topic, partition, commitInfo.offset, gen.ID, commitInfo.generationId)
172+
})
173+
}
174+
} else {
175+
msgsForTopic[partition] = commitInfo.offset
176+
}
177+
}
178+
if len(msgsForTopic) > 0 {
179+
messagesToSend[topic] = msgsForTopic
158180
}
159181
}
160182
for attempt := 0; attempt < retries; attempt++ {
@@ -343,7 +365,7 @@ func (r *Reader) run(cg *ConsumerGroup) {
343365

344366
r.stats.rebalances.observe(1)
345367

346-
r.subscribe(gen.Assignments)
368+
r.subscribe(gen.ID, gen.Assignments)
347369

348370
gen.Start(func(ctx context.Context) {
349371
r.commitLoop(ctx, gen)
@@ -833,7 +855,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
833855
r.mutex.Lock()
834856

835857
if !r.closed && r.version == 0 {
836-
r.start(r.getTopicPartitionOffset())
858+
r.start(-1, r.getTopicPartitionOffset())
837859
}
838860

839861
version := r.version
@@ -1054,7 +1076,7 @@ func (r *Reader) SetOffset(offset int64) error {
10541076
r.offset = offset
10551077

10561078
if r.version != 0 {
1057-
r.start(r.getTopicPartitionOffset())
1079+
r.start(-1, r.getTopicPartitionOffset())
10581080
}
10591081

10601082
r.activateReadLag()
@@ -1192,7 +1214,7 @@ func (r *Reader) readLag(ctx context.Context) {
11921214
}
11931215
}
11941216

1195-
func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
1217+
func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) {
11961218
if r.closed {
11971219
// don't start child reader if parent Reader is closed
11981220
return
@@ -1230,7 +1252,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
12301252

12311253
// backwards-compatibility flags
12321254
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
1233-
}).run(ctx, offset)
1255+
}).run(ctx, generationId, offset)
12341256
}(ctx, key, offset, &r.join)
12351257
}
12361258
}
@@ -1267,7 +1289,7 @@ type readerMessage struct {
12671289
error error
12681290
}
12691291

1270-
func (r *reader) run(ctx context.Context, offset int64) {
1292+
func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
12711293
// This is the reader's main loop, it only ends if the context is canceled
12721294
// and will keep attempting to reader messages otherwise.
12731295
//
@@ -1320,6 +1342,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13201342
}
13211343
continue
13221344
}
1345+
conn.generationId = generationId
13231346

13241347
// Resetting the attempt counter ensures that if a failure occurs after
13251348
// a successful initialization we don't keep increasing the backoff

reader_test.go

Lines changed: 93 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -897,36 +897,6 @@ func TestReaderConsumerGroup(t *testing.T) {
897897
}
898898
}
899899

900-
func TestReaderConsumerGroup2(t *testing.T) {
901-
// It appears that some of the tests depend on all these tests being
902-
// run concurrently to pass... this is brittle and should be fixed
903-
// at some point.
904-
t.Parallel()
905-
906-
topic := makeTopic()
907-
createTopic(t, topic, 2)
908-
defer deleteTopic(t, topic)
909-
910-
groupID := makeGroupID()
911-
r := NewReader(ReaderConfig{
912-
Brokers: []string{"localhost:9092"},
913-
Topic: topic,
914-
GroupID: groupID,
915-
HeartbeatInterval: 2 * time.Second,
916-
CommitInterval: 0,
917-
RebalanceTimeout: 2 * time.Second,
918-
RetentionTime: time.Hour,
919-
MinBytes: 1,
920-
MaxBytes: 1e6,
921-
})
922-
defer r.Close()
923-
924-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
925-
defer cancel()
926-
927-
testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t, ctx, r)
928-
}
929-
930900
func testReaderConsumerGroupHandshake(t *testing.T, ctx context.Context, r *Reader) {
931901
prepareReader(t, context.Background(), r, makeTestSequence(5)...)
932902

@@ -1231,6 +1201,23 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12311201
t.Fatalf("bad write err: %v", err)
12321202
}
12331203

1204+
// read all messages for the first reader
1205+
msgsForFirstReader := make(map[int][]Message, 0)
1206+
totalEvents := 0
1207+
for i := 0; i < messageCount; i++ {
1208+
if msg, err := firstReader.FetchMessage(ctx); err != nil {
1209+
t.Errorf("reader %v expected to read 1 message", i)
1210+
} else {
1211+
msgs, ok := msgsForFirstReader[msg.Partition]
1212+
if !ok {
1213+
msgs = make([]Message, 0)
1214+
}
1215+
msgs = append(msgs, msg)
1216+
msgsForFirstReader[msg.Partition] = msgs
1217+
totalEvents++
1218+
}
1219+
}
1220+
require.Equal(t, messageCount, totalEvents)
12341221
secondReader := NewReader(firstReader.config)
12351222
defer secondReader.Close()
12361223

@@ -1244,21 +1231,92 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12441231
assert.NoError(t, err)
12451232
assert.NotNil(t, resp)
12461233
return len(resp.Groups[0].Members) == 2
1247-
}, 10*time.Second, 100*time.Millisecond)
1234+
}, 10*time.Second, 100*time.Millisecond, "Group does not have 2 members")
12481235

1249-
topicsToCommit := make(map[int]int64)
1236+
var partitionAssignedToSecondConsumer int
12501237
msgsForSecondReader := make([]Message, 0, messageCount)
12511238
for i := 0; i < messageCount/2; i++ {
12521239
if msg, err := secondReader.FetchMessage(ctx); err != nil {
12531240
t.Errorf("reader %v expected to read 1 message", i)
12541241
} else {
12551242
msgsForSecondReader = append(msgsForSecondReader, msg)
1256-
topicsToCommit[msg.Partition] = msg.Offset
1243+
partitionAssignedToSecondConsumer = msg.Partition
1244+
}
1245+
}
1246+
partitionAssignedToFirstConsumer := (partitionAssignedToSecondConsumer + 1) % 2
1247+
1248+
// commit all messages for the second reader and wait until commits reach the server
1249+
require.NoError(t, secondReader.CommitMessages(ctx, msgsForSecondReader...))
1250+
require.Eventually(t, func() bool {
1251+
resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
1252+
GroupID: firstReader.config.GroupID,
1253+
Topics: map[string][]int{firstReader.config.Topic: []int{partitionAssignedToSecondConsumer}},
1254+
})
1255+
require.NoError(t, err)
1256+
require.NotNil(t, resp)
1257+
1258+
for _, topicOffsets := range resp.Topics {
1259+
for _, offsetPartition := range topicOffsets {
1260+
if offsetPartition.Partition == partitionAssignedToSecondConsumer {
1261+
return msgsForSecondReader[len(msgsForSecondReader)-1].Offset+1 == offsetPartition.CommittedOffset
1262+
}
1263+
}
1264+
}
1265+
return false
1266+
}, 5*time.Second, 100*time.Millisecond, "Offsets were never committed")
1267+
1268+
// commit first message for the second reader on the first reader
1269+
require.NoError(t, firstReader.CommitMessages(ctx, msgsForFirstReader[partitionAssignedToSecondConsumer][0]))
1270+
require.Eventually(t, func() bool {
1271+
resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
1272+
GroupID: firstReader.config.GroupID,
1273+
Topics: map[string][]int{firstReader.config.Topic: []int{partitionAssignedToSecondConsumer}},
1274+
})
1275+
require.NoError(t, err)
1276+
require.NotNil(t, resp)
1277+
1278+
for _, topicOffsets := range resp.Topics {
1279+
for _, offsetPartition := range topicOffsets {
1280+
if offsetPartition.Partition == partitionAssignedToSecondConsumer {
1281+
return msgsForSecondReader[len(msgsForSecondReader)-1].Offset+1 == offsetPartition.CommittedOffset
1282+
}
1283+
}
1284+
}
1285+
return false
1286+
}, 5*time.Second, 100*time.Millisecond, "Offsets were altered")
1287+
1288+
// commit the messages it can actually commit and verify it works
1289+
for i := 0; i < len(msgsForFirstReader); i++ {
1290+
if msg, err := firstReader.FetchMessage(ctx); err != nil {
1291+
t.Errorf("reader %v expected to read 1 message", i)
1292+
} else {
1293+
msgs, ok := msgsForFirstReader[msg.Partition]
1294+
if !ok {
1295+
msgs = make([]Message, 0)
1296+
}
1297+
msgs = append(msgs, msg)
1298+
msgsForFirstReader[msg.Partition] = msgs
1299+
totalEvents++
12571300
}
12581301
}
1302+
require.NoError(t, firstReader.CommitMessages(ctx, msgsForFirstReader[partitionAssignedToFirstConsumer][len(msgsForFirstReader[partitionAssignedToFirstConsumer])-1]))
1303+
require.Eventually(t, func() bool {
1304+
resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
1305+
GroupID: firstReader.config.GroupID,
1306+
Topics: map[string][]int{firstReader.config.Topic: []int{partitionAssignedToFirstConsumer}},
1307+
})
1308+
require.NoError(t, err)
1309+
require.NotNil(t, resp)
12591310

1260-
require.NoError(t, secondReader.CommitMessages(ctx, msgsForSecondReader[len(msgsForSecondReader)-1]))
1261-
require.ErrorIs(t, errInvalidWritePartition, firstReader.CommitMessages(ctx, msgsForSecondReader[0]))
1311+
for _, topicOffsets := range resp.Topics {
1312+
for _, offsetPartition := range topicOffsets {
1313+
if offsetPartition.Partition == partitionAssignedToFirstConsumer {
1314+
return msgsForFirstReader[partitionAssignedToFirstConsumer][len(msgsForSecondReader)-1].Offset+1 == offsetPartition.CommittedOffset
1315+
}
1316+
}
1317+
}
1318+
return false
1319+
}, 5*time.Second, 100*time.Millisecond, "Could not commit the new element")
12621320
}
12631321

12641322
func TestOffsetStash(t *testing.T) {

0 commit comments

Comments
 (0)