Skip to content

Commit f3475c5

Browse files
committed
Fail attempts to commit offsets when the generationID is not the same.
1 parent 815771e commit f3475c5

File tree

3 files changed

+69
-52
lines changed

3 files changed

+69
-52
lines changed

consumergroup.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,12 @@ func (g *Generation) Start(fn func(ctx context.Context)) {
416416
// consumer group coordinator. This can be used to reset the consumer to
417417
// explicit offsets.
418418
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
419+
return g.CommitOffsetsForGenID(g.ID, offsets)
420+
}
421+
422+
// CommitOffsetsForGenID commits the provided topic+partition+offset combos to the
423+
// consumer group coordinator specifying the given genID.
424+
func (g *Generation) CommitOffsetsForGenID(genID int32, offsets map[string]map[int]int64) error {
419425
if len(offsets) == 0 {
420426
return nil
421427
}
@@ -434,7 +440,7 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
434440

435441
request := offsetCommitRequestV2{
436442
GroupID: g.GroupID,
437-
GenerationID: g.ID,
443+
GenerationID: genID,
438444
MemberID: g.MemberID,
439445
RetentionTime: g.retentionMillis,
440446
Topics: topics,
@@ -925,12 +931,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
925931
// the leader. Otherwise, GroupMemberAssignments will be nil.
926932
//
927933
// Possible kafka error codes returned:
928-
// * GroupLoadInProgress:
929-
// * GroupCoordinatorNotAvailable:
930-
// * NotCoordinatorForGroup:
931-
// * InconsistentGroupProtocol:
932-
// * InvalidSessionTimeout:
933-
// * GroupAuthorizationFailed:
934+
// - GroupLoadInProgress:
935+
// - GroupCoordinatorNotAvailable:
936+
// - NotCoordinatorForGroup:
937+
// - InconsistentGroupProtocol:
938+
// - InvalidSessionTimeout:
939+
// - GroupAuthorizationFailed:
934940
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
935941
request, err := cg.makeJoinGroupRequestV1(memberID)
936942
if err != nil {
@@ -1073,11 +1079,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
10731079
// Readers subscriptions topic => partitions
10741080
//
10751081
// Possible kafka error codes returned:
1076-
// * GroupCoordinatorNotAvailable:
1077-
// * NotCoordinatorForGroup:
1078-
// * IllegalGeneration:
1079-
// * RebalanceInProgress:
1080-
// * GroupAuthorizationFailed:
1082+
// - GroupCoordinatorNotAvailable:
1083+
// - NotCoordinatorForGroup:
1084+
// - IllegalGeneration:
1085+
// - RebalanceInProgress:
1086+
// - GroupAuthorizationFailed:
10811087
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
10821088
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
10831089
response, err := conn.syncGroup(request)

reader.go

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -150,54 +150,53 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
150150
backoffDelayMax = 5 * time.Second
151151
)
152152

153-
messagesToSend := make(map[string]map[int]int64)
153+
messagesToSendForGeneration := make(map[int32]map[string]map[int]int64)
154154
for topic, partitionsInfo := range offsetStash {
155-
msgsForTopic := make(map[int]int64)
156155
for partition, commitInfo := range partitionsInfo {
157-
// if there is a generation and it is different than the current
158-
// it means there was a rebalance
159-
if commitInfo.generationId != undefinedGenerationId {
160-
if gen.ID == commitInfo.generationId {
161-
msgsForTopic[partition] = commitInfo.offset
162-
} else {
163-
if assignments, ok := gen.Assignments[topic]; ok {
164-
for _, assignment := range assignments {
165-
if assignment.ID == partition && commitInfo.generationId > gen.ID {
166-
msgsForTopic[partition] = commitInfo.offset
167-
}
168-
}
169-
}
170-
r.withErrorLogger(func(l Logger) {
171-
l.Printf("Discarding commint for %s - %d: %d . Current generation is %d, commit generation is %d", topic, partition, commitInfo.offset, gen.ID, commitInfo.generationId)
172-
})
173-
}
174-
} else {
175-
msgsForTopic[partition] = commitInfo.offset
156+
if _, ok := messagesToSendForGeneration[commitInfo.generationID]; !ok {
157+
messagesToSendForGeneration[commitInfo.generationID] = make(map[string]map[int]int64)
176158
}
177-
}
178-
if len(msgsForTopic) > 0 {
179-
messagesToSend[topic] = msgsForTopic
159+
msgsForTopic := messagesToSendForGeneration[commitInfo.generationID]
160+
if _, ok := msgsForTopic[topic]; !ok {
161+
msgsForTopic[topic] = make(map[int]int64)
162+
}
163+
msgsForPartition := msgsForTopic[topic]
164+
msgsForPartition[partition] = commitInfo.offset
180165
}
181166
}
182-
for attempt := 0; attempt < retries; attempt++ {
183-
if attempt != 0 {
184-
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
185-
return
167+
var illegalGenerationErr error
168+
for generationID, messages := range messagesToSendForGeneration {
169+
for attempt := 0; attempt < retries; attempt++ {
170+
if attempt != 0 {
171+
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
172+
continue
173+
}
186174
}
187-
}
188175

189-
if err = gen.CommitOffsets(messagesToSend); err == nil {
190-
return
176+
if err = gen.CommitOffsetsForGenID(generationID, messages); err == nil {
177+
continue
178+
}
179+
// IllegalGeneration error is not retriable, but we should attempt to
180+
// perform the remaining commits
181+
if err == IllegalGeneration {
182+
illegalGenerationErr = err
183+
// we prevent useless retries and we will attempt to
184+
// commit the remaining generations.
185+
err = nil
186+
offsetStash.removeGenerationID(generationID)
187+
}
191188
}
192189
}
193-
190+
if illegalGenerationErr != nil {
191+
err = illegalGenerationErr
192+
}
194193
return // err will not be nil
195194
}
196195

197196
// offsetStash holds offsets by topic => partition => offsetEntry.
198197
type offsetEntry struct {
199198
offset int64
200-
generationId int32
199+
generationID int32
201200
}
202201
type offsetStash map[string]map[int]offsetEntry
203202

@@ -213,7 +212,7 @@ func (o offsetStash) merge(commits []commit) {
213212
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
214213
offsetsByPartition[c.partition] = offsetEntry{
215214
offset: c.offset,
216-
generationId: c.generationId,
215+
generationID: c.generationId,
217216
}
218217
}
219218
}
@@ -226,6 +225,16 @@ func (o offsetStash) reset() {
226225
}
227226
}
228227

228+
func (o offsetStash) removeGenerationID(genID int32) {
229+
for _, offsetsForTopic := range o {
230+
for partition, offsetsForPartition := range offsetsForTopic {
231+
if offsetsForPartition.generationID == genID {
232+
delete(offsetsForTopic, partition)
233+
}
234+
}
235+
}
236+
}
237+
229238
// commitLoopImmediate handles each commit synchronously.
230239
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
231240
offsets := offsetStash{}

reader_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -856,11 +856,11 @@ func TestReaderConsumerGroup(t *testing.T) {
856856
partitions: 1,
857857
function: testConsumerGroupSimple,
858858
},
859-
860859
{
861-
scenario: "Do not commit not assigned messages after rebalance",
862-
partitions: 2,
863-
function: testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions,
860+
scenario: "Do not commit not assigned messages after rebalance",
861+
partitions: 2,
862+
function: testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions,
863+
commitInterval: 0,
864864
},
865865
}
866866

@@ -1203,6 +1203,7 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12031203

12041204
// read all messages for the first reader
12051205
msgsForFirstReader := make(map[int][]Message, 0)
1206+
allMessages := make([]Message, 0)
12061207
totalEvents := 0
12071208
for i := 0; i < messageCount; i++ {
12081209
if msg, err := firstReader.FetchMessage(ctx); err != nil {
@@ -1213,6 +1214,7 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12131214
msgs = make([]Message, 0)
12141215
}
12151216
msgs = append(msgs, msg)
1217+
allMessages = append(allMessages, msg)
12161218
msgsForFirstReader[msg.Partition] = msgs
12171219
totalEvents++
12181220
}
@@ -1265,8 +1267,8 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12651267
return false
12661268
}, 5*time.Second, 100*time.Millisecond, "Offsets were never committed")
12671269

1268-
// commit first message for the second reader on the first reader
1269-
require.NoError(t, firstReader.CommitMessages(ctx, msgsForFirstReader[partitionAssignedToSecondConsumer][0]))
1270+
// commit all messages the first reader received
1271+
require.ErrorIs(t, IllegalGeneration, firstReader.CommitMessages(ctx, allMessages...))
12701272
require.Eventually(t, func() bool {
12711273
resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
12721274
GroupID: firstReader.config.GroupID,

0 commit comments

Comments
 (0)