Skip to content

Commit c67e4d7

Browse files
committed
Make erroring optional and simplify the test.
1 parent f3475c5 commit c67e4d7

File tree

2 files changed

+46
-55
lines changed

2 files changed

+46
-55
lines changed

reader.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
179179
// IllegalGeneration error is not retriable, but we should attempt to
180180
// perform the remaining commits
181181
if err == IllegalGeneration {
182+
r.withErrorLogger(func(l Logger) { l.Printf("%v", err) })
182183
illegalGenerationErr = err
183184
// we prevent useless retries and we will attempt to
184185
// commit the remaining generations.
@@ -187,7 +188,9 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
187188
}
188189
}
189190
}
190-
if illegalGenerationErr != nil {
191+
192+
// if configured to ignore the error
193+
if illegalGenerationErr != nil && r.config.ErrorOnWrongGenerationCommit {
191194
err = illegalGenerationErr
192195
}
193196
return // err will not be nil
@@ -567,6 +570,10 @@ type ReaderConfig struct {
567570
// This flag is being added to retain backwards-compatibility, so it will be
568571
// removed in a future version of kafka-go.
569572
OffsetOutOfRangeError bool
573+
574+
// ErrorOnWrongGenerationCommit indicates that we should return an error when
575+
// attempting to commit a message to a generation different than the current one.
576+
ErrorOnWrongGenerationCommit bool
570577
}
571578

572579
// Validate method validates ReaderConfig properties.

reader_test.go

Lines changed: 38 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -792,10 +792,11 @@ func TestExtractTopics(t *testing.T) {
792792

793793
func TestReaderConsumerGroup(t *testing.T) {
794794
tests := []struct {
795-
scenario string
796-
partitions int
797-
commitInterval time.Duration
798-
function func(*testing.T, context.Context, *Reader)
795+
scenario string
796+
partitions int
797+
commitInterval time.Duration
798+
errorOnWrongGeneration bool
799+
function func(*testing.T, context.Context, *Reader)
799800
}{
800801
{
801802
scenario: "basic handshake",
@@ -856,11 +857,13 @@ func TestReaderConsumerGroup(t *testing.T) {
856857
partitions: 1,
857858
function: testConsumerGroupSimple,
858859
},
860+
859861
{
860-
scenario: "Do not commit not assigned messages after rebalance",
861-
partitions: 2,
862-
function: testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions,
863-
commitInterval: 0,
862+
scenario: "Do not commit not assigned messages after rebalance",
863+
partitions: 2,
864+
function: testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions,
865+
commitInterval: 0,
866+
errorOnWrongGeneration: true,
864867
},
865868
}
866869

@@ -877,15 +880,16 @@ func TestReaderConsumerGroup(t *testing.T) {
877880

878881
groupID := makeGroupID()
879882
r := NewReader(ReaderConfig{
880-
Brokers: []string{"localhost:9092"},
881-
Topic: topic,
882-
GroupID: groupID,
883-
HeartbeatInterval: 2 * time.Second,
884-
CommitInterval: test.commitInterval,
885-
RebalanceTimeout: 2 * time.Second,
886-
RetentionTime: time.Hour,
887-
MinBytes: 1,
888-
MaxBytes: 1e6,
883+
Brokers: []string{"localhost:9092"},
884+
Topic: topic,
885+
GroupID: groupID,
886+
HeartbeatInterval: 2 * time.Second,
887+
CommitInterval: test.commitInterval,
888+
RebalanceTimeout: 2 * time.Second,
889+
RetentionTime: time.Hour,
890+
MinBytes: 1,
891+
MaxBytes: 1e6,
892+
ErrorOnWrongGenerationCommit: test.errorOnWrongGeneration,
889893
})
890894
defer r.Close()
891895

@@ -1193,6 +1197,8 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
11931197
BatchSize: 1,
11941198
Transport: client.Transport,
11951199
}
1200+
1201+
// Write 4 messages and ensure that they go the each one of the partitions
11961202
messageCount := 4
11971203
if err := writer.WriteMessages(ctx, makeTestSequence(messageCount)...); err != nil {
11981204
t.Fatalf("bad write messages: %v", err)
@@ -1220,9 +1226,12 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12201226
}
12211227
}
12221228
require.Equal(t, messageCount, totalEvents)
1229+
1230+
// create a second reader
12231231
secondReader := NewReader(firstReader.config)
12241232
defer secondReader.Close()
12251233

1234+
// wait until the group has 2 members
12261235
require.Eventually(t, func() bool {
12271236
resp, err := client.DescribeGroups(
12281237
ctx,
@@ -1247,28 +1256,14 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12471256
}
12481257
partitionAssignedToFirstConsumer := (partitionAssignedToSecondConsumer + 1) % 2
12491258

1250-
// commit all messages for the second reader and wait until commits reach the server
1259+
// commit all messages for the second reader (no need to wait until commits reach the server
1260+
// because CommitInterval is set to 0)
12511261
require.NoError(t, secondReader.CommitMessages(ctx, msgsForSecondReader...))
1252-
require.Eventually(t, func() bool {
1253-
resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
1254-
GroupID: firstReader.config.GroupID,
1255-
Topics: map[string][]int{firstReader.config.Topic: {partitionAssignedToSecondConsumer}},
1256-
})
1257-
require.NoError(t, err)
1258-
require.NotNil(t, resp)
12591262

1260-
for _, topicOffsets := range resp.Topics {
1261-
for _, offsetPartition := range topicOffsets {
1262-
if offsetPartition.Partition == partitionAssignedToSecondConsumer {
1263-
return msgsForSecondReader[len(msgsForSecondReader)-1].Offset+1 == offsetPartition.CommittedOffset
1264-
}
1265-
}
1266-
}
1267-
return false
1268-
}, 5*time.Second, 100*time.Millisecond, "Offsets were never committed")
1269-
1270-
// commit all messages the first reader received
1263+
// commit all messages the first reader received, we expect an error
12711264
require.ErrorIs(t, IllegalGeneration, firstReader.CommitMessages(ctx, allMessages...))
1265+
1266+
// verify that no offsets have been altered
12721267
require.Eventually(t, func() bool {
12731268
resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
12741269
GroupID: firstReader.config.GroupID,
@@ -1287,7 +1282,9 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
12871282
return false
12881283
}, 5*time.Second, 100*time.Millisecond, "Offsets were altered")
12891284

1290-
// commit the messages it can actually commit and verify it works
1285+
// we can read the messages again because generation changes
1286+
// cause uncommitted offsets to be lost
1287+
totalEvents = 0
12911288
for i := 0; i < len(msgsForFirstReader); i++ {
12921289
if msg, err := firstReader.FetchMessage(ctx); err != nil {
12931290
t.Errorf("reader %v expected to read 1 message", i)
@@ -1301,24 +1298,11 @@ func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.
13011298
totalEvents++
13021299
}
13031300
}
1304-
require.NoError(t, firstReader.CommitMessages(ctx, msgsForFirstReader[partitionAssignedToFirstConsumer][len(msgsForFirstReader[partitionAssignedToFirstConsumer])-1]))
1305-
require.Eventually(t, func() bool {
1306-
resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{
1307-
GroupID: firstReader.config.GroupID,
1308-
Topics: map[string][]int{firstReader.config.Topic: {partitionAssignedToFirstConsumer}},
1309-
})
1310-
require.NoError(t, err)
1311-
require.NotNil(t, resp)
1301+
require.Equal(t, 2, totalEvents)
13121302

1313-
for _, topicOffsets := range resp.Topics {
1314-
for _, offsetPartition := range topicOffsets {
1315-
if offsetPartition.Partition == partitionAssignedToFirstConsumer {
1316-
return msgsForFirstReader[partitionAssignedToFirstConsumer][len(msgsForSecondReader)-1].Offset+1 == offsetPartition.CommittedOffset
1317-
}
1318-
}
1319-
}
1320-
return false
1321-
}, 5*time.Second, 100*time.Millisecond, "Could not commit the new element")
1303+
// commit the messages it can actually commit and verify it works
1304+
// no need to wait because CommitInterval is 0
1305+
require.NoError(t, firstReader.CommitMessages(ctx, msgsForFirstReader[partitionAssignedToFirstConsumer][len(msgsForFirstReader[partitionAssignedToFirstConsumer])-1]))
13221306
}
13231307

13241308
func TestOffsetStash(t *testing.T) {

0 commit comments

Comments
 (0)