Skip to content

Commit fd3dc94

Browse files
committed
Add a test to showcase the problem.
1 parent 7b9c99d commit fd3dc94

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed

reader_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"testing"
1616
"time"
1717

18+
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
)
2021

@@ -855,6 +856,12 @@ func TestReaderConsumerGroup(t *testing.T) {
855856
partitions: 1,
856857
function: testConsumerGroupSimple,
857858
},
859+
860+
{
861+
scenario: "Do not commit not assigned messages after rebalance",
862+
partitions: 2,
863+
function: testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions,
864+
},
858865
}
859866

860867
for _, test := range tests {
@@ -1174,6 +1181,76 @@ func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing
11741181
}
11751182
}
11761183

1184+
func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.T, ctx context.Context, firstReader *Reader) {
1185+
client, shutdown := newLocalClient()
1186+
defer shutdown()
1187+
1188+
// write messages across both partitions
1189+
writer := &Writer{
1190+
Addr: TCP(firstReader.config.Brokers...),
1191+
Topic: firstReader.config.Topic,
1192+
Balancer: &RoundRobin{},
1193+
BatchSize: 1,
1194+
Transport: client.Transport,
1195+
}
1196+
messageCount := 4
1197+
if err := writer.WriteMessages(ctx, makeTestSequence(messageCount)...); err != nil {
1198+
t.Fatalf("bad write messages: %v", err)
1199+
}
1200+
if err := writer.Close(); err != nil {
1201+
t.Fatalf("bad write err: %v", err)
1202+
}
1203+
1204+
secondReader := NewReader(firstReader.config)
1205+
defer secondReader.Close()
1206+
1207+
require.Eventually(t, func() bool {
1208+
resp, err := client.DescribeGroups(
1209+
ctx,
1210+
&DescribeGroupsRequest{
1211+
GroupIDs: []string{firstReader.config.GroupID},
1212+
},
1213+
)
1214+
assert.NoError(t, err)
1215+
assert.NotNil(t, resp)
1216+
return len(resp.Groups[0].Members) == 2
1217+
}, 10*time.Second, 100*time.Millisecond)
1218+
1219+
topicsToCommit := make(map[int]int64)
1220+
msgsForSecondReader := make([]Message, 0, messageCount)
1221+
for i := 0; i < messageCount/2; i++ {
1222+
if msg, err := secondReader.FetchMessage(ctx); err != nil {
1223+
t.Errorf("reader %v expected to read 1 message", i)
1224+
} else {
1225+
msgsForSecondReader = append(msgsForSecondReader, msg)
1226+
topicsToCommit[msg.Partition] = msg.Offset
1227+
}
1228+
}
1229+
1230+
require.NoError(t, secondReader.CommitMessages(ctx, msgsForSecondReader[len(msgsForSecondReader)-1]))
1231+
require.NoError(t, firstReader.CommitMessages(ctx, msgsForSecondReader[0]))
1232+
resp, err := client.OffsetFetch(
1233+
ctx,
1234+
&OffsetFetchRequest{
1235+
GroupID: firstReader.config.GroupID,
1236+
Topics: map[string][]int{firstReader.config.Topic: {0, 1}},
1237+
},
1238+
)
1239+
require.NoError(t, err)
1240+
require.NotNil(t, resp)
1241+
if topics, ok := resp.Topics[firstReader.config.Topic]; !ok {
1242+
require.True(t, ok, "Topic not found")
1243+
} else {
1244+
for _, topic := range topics {
1245+
if offset, ok := topicsToCommit[topic.Partition]; ok {
1246+
assert.Equal(t, offset+1, topic.CommittedOffset, "committed partition %d had committed offset %d instead of %d", topic.Partition, topic.CommittedOffset, offset)
1247+
} else {
1248+
assert.Equal(t, int64(-1), topic.CommittedOffset, "not-committed partition %d had committed offset %d instead of 0", topic.Partition, topic.CommittedOffset)
1249+
}
1250+
}
1251+
}
1252+
}
1253+
11771254
func TestOffsetStash(t *testing.T) {
11781255
const topic = "topic"
11791256

0 commit comments

Comments
 (0)