Skip to content

Commit 20a09a8

Browse files
committed
Fix test errors.
1 parent 9d57482 commit 20a09a8

File tree

5 files changed

+20
-19
lines changed

5 files changed

+20
-19
lines changed

consumergroup.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -931,12 +931,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
931931
// the leader. Otherwise, GroupMemberAssignments will be nil.
932932
//
933933
// Possible kafka error codes returned:
934-
// - GroupLoadInProgress:
935-
// - GroupCoordinatorNotAvailable:
936-
// - NotCoordinatorForGroup:
937-
// - InconsistentGroupProtocol:
938-
// - InvalidSessionTimeout:
939-
// - GroupAuthorizationFailed:
934+
// * GroupLoadInProgress:
935+
// * GroupCoordinatorNotAvailable:
936+
// * NotCoordinatorForGroup:
937+
// * InconsistentGroupProtocol:
938+
// * InvalidSessionTimeout:
939+
// * GroupAuthorizationFailed:
940940
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
941941
request, err := cg.makeJoinGroupRequestV1(memberID)
942942
if err != nil {
@@ -1079,11 +1079,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
10791079
// Readers subscriptions topic => partitions
10801080
//
10811081
// Possible kafka error codes returned:
1082-
// - GroupCoordinatorNotAvailable:
1083-
// - NotCoordinatorForGroup:
1084-
// - IllegalGeneration:
1085-
// - RebalanceInProgress:
1086-
// - GroupAuthorizationFailed:
1082+
// * GroupCoordinatorNotAvailable:
1083+
// * NotCoordinatorForGroup:
1084+
// * IllegalGeneration:
1085+
// * RebalanceInProgress:
1086+
// * GroupAuthorizationFailed:
10871087
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
10881088
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
10891089
response, err := conn.syncGroup(request)

error.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const (
3434
NotEnoughReplicas Error = 19
3535
NotEnoughReplicasAfterAppend Error = 20
3636
InvalidRequiredAcks Error = 21
37-
IllegalGenerationErr Error = 22
37+
IllegalGeneration Error = 22
3838
InconsistentGroupProtocol Error = 23
3939
InvalidGroupId Error = 24
4040
UnknownMemberId Error = 25
@@ -217,7 +217,7 @@ func (e Error) Title() string {
217217
return "Not Enough Replicas After Append"
218218
case InvalidRequiredAcks:
219219
return "Invalid Required Acks"
220-
case IllegalGenerationErr:
220+
case IllegalGeneration:
221221
return "Illegal Generation"
222222
case InconsistentGroupProtocol:
223223
return "Inconsistent Group Protocol"
@@ -426,7 +426,7 @@ func (e Error) Description() string {
426426
return "the message was written to the log, but with fewer in-sync replicas than required."
427427
case InvalidRequiredAcks:
428428
return "the requested requiredAcks is invalid (anything other than -1, 1, or 0)"
429-
case IllegalGenerationErr:
429+
case IllegalGeneration:
430430
return "the generation id provided in the request is not the current generation"
431431
case InconsistentGroupProtocol:
432432
return "the member provided a protocol type or set of protocols which is not compatible with the current group"

error_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestError(t *testing.T) {
2828
NotEnoughReplicas,
2929
NotEnoughReplicasAfterAppend,
3030
InvalidRequiredAcks,
31-
IllegalGenerationErr,
31+
IllegalGeneration,
3232
InconsistentGroupProtocol,
3333
InvalidGroupId,
3434
UnknownMemberId,

reader.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,23 +174,24 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
174174
}
175175

176176
if err = gen.CommitOffsetsForGenID(generationID, messages); err == nil {
177-
continue
177+
break
178178
}
179179

180180
// IllegalGeneration error is not retriable, but we should attempt to
181181
// perform the remaining commits
182-
if errors.Is(err, IllegalGenerationErr) {
182+
if errors.Is(err, IllegalGeneration) {
183183
r.withErrorLogger(func(l Logger) { l.Printf("%v", err) })
184184
illegalGenerationErr = true
185185
err = nil
186186
offsetStash.removeGenerationID(generationID)
187187
}
188+
return
188189
}
189190
}
190191

191192
// if configured to ignore the error
192193
if illegalGenerationErr && r.config.ErrorOnWrongGenerationCommit {
193-
err = IllegalGenerationErr
194+
err = IllegalGeneration
194195
}
195196
return // err will not be nil
196197
}

reader_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,7 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12611261
require.NoError(t, secondReader.CommitMessages(ctx, msgsForSecondReader...))
12621262

12631263
// commit all messages the first reader received, we expect an error
1264-
require.ErrorIs(t, IllegalGenerationErr, firstReader.CommitMessages(ctx, allMessages...))
1264+
require.ErrorIs(t, IllegalGeneration, firstReader.CommitMessages(ctx, allMessages...))
12651265

12661266
// verify that no offsets have been altered
12671267
require.Eventually(t, func() bool {

0 commit comments

Comments
 (0)