Skip to content

deadline for batch fetching #1395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func TestClient(t *testing.T) {
scenario: "retrieve committed offsets for a consumer group and topic",
function: testConsumerGroupFetchOffsets,
},
{
scenario: "retrieve committed offsets for a consumer group and topic with batch processing",
function: testConsumerGroupFetchMessageBatch,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -191,6 +195,77 @@ func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, client *Cl
}
}

func testConsumerGroupFetchMessageBatch(t *testing.T, ctx context.Context, client *Client) {
const totalMessages = 144
const partitions = 12
const msgPerPartition = totalMessages / partitions

topic := makeTopic()
if err := clientCreateTopic(client, topic, partitions); err != nil {
t.Fatal(err)
}

groupId := makeGroupID()
brokers := []string{"localhost:9092"}

writer := &Writer{
Addr: TCP(brokers...),
Topic: topic,
Balancer: &RoundRobin{},
BatchSize: 1,
Transport: client.Transport,
}
if err := writer.WriteMessages(ctx, makeTestSequence(totalMessages)...); err != nil {
t.Fatalf("bad write messages: %v", err)
}
if err := writer.Close(); err != nil {
t.Fatalf("bad write err: %v", err)
}

batchSize := 100
r := NewReader(ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupId,
MinBytes: 1,
MaxBytes: 10e6,
MaxWait: 100 * time.Millisecond,
QueueCapacity: batchSize,
})
defer r.Close()

var expected int
for {
m, err := r.FetchMessageBatch(ctx, batchSize, time.Second)
if err != nil {
t.Fatalf("error fetching message: %s", err)
}
if err := r.CommitMessages(context.Background(), m...); err != nil {
t.Fatal(err)
}
expected += len(m)
if expected == totalMessages {
break
}
}

offsets, err := client.ConsumerOffsets(ctx, TopicAndGroup{GroupId: groupId, Topic: topic})
if err != nil {
t.Fatal(err)
}

if len(offsets) != partitions {
t.Fatalf("expected %d partitions but only received offsets for %d", partitions, len(offsets))
}

for i := 0; i < partitions; i++ {
committedOffset := offsets[i]
if committedOffset != msgPerPartition {
t.Errorf("expected partition %d with committed offset of %d but received %d", i, msgPerPartition, committedOffset)
}
}
}

func TestClientProduceAndConsume(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
59 changes: 59 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,65 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
}
}

// FetchMessageBatch fetches a batch of messages from the reader. It is similar to
// FetchMessage, except it blocks until no. of messages read reaches batchSize.
func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int, deadline time.Duration) ([]Message, error) {
r.activateReadLag()
msgBatch := make([]Message, 0, batchSize)

r.mutex.Lock()
if !r.closed && r.version == 0 {
r.start(r.getTopicPartitionOffset())
}
version := r.version
r.mutex.Unlock()

for {
select {
case <-ctx.Done():
return []Message{}, ctx.Err()
case <-time.After(deadline):
return msgBatch, nil
case m, ok := <-r.msgs:
if !ok {
return []Message{}, io.EOF
}

if m.version < version {
continue
}

r.mutex.Lock()

switch {
case m.error != nil:
case version == r.version:
r.offset = m.message.Offset + 1
r.lag = m.watermark - r.offset
}

r.mutex.Unlock()

if errors.Is(m.error, io.EOF) {
// io.EOF is used as a marker to indicate that the stream
// has been closed, in case it was received from the inner
// reader we don't want to confuse the program and replace
// the error with io.ErrUnexpectedEOF.
m.error = io.ErrUnexpectedEOF
}
if m.error != nil {
return nil, m.error
}

msgBatch = append(msgBatch, m.message)

if len(msgBatch) == batchSize {
return msgBatch, nil
}
}
}
}

// ReadLag returns the current lag of the reader by fetching the last offset of
// the topic and partition and computing the difference between that value and
// the offset of the last message returned by ReadMessage.
Expand Down