From 17cc32fb7649b494dc0ea7b9cca7d52e5c907cf2 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Sun, 8 Jun 2025 13:20:05 +0530 Subject: [PATCH 1/2] Add method to fetch messages in batch Since FetchMessage is already reading messages from a fetched batch, this new method just hold the messages util the batchSize number of messages are read. Fixes #123 --- reader.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/reader.go b/reader.go index 04d90f355..879062666 100644 --- a/reader.go +++ b/reader.go @@ -912,6 +912,68 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error { } } +// FetchMessageBatch fetches a batch of messages from the reader. It is similar to +// FetchMessage, except it blocks until no. of messages read reaches batchSize. +func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Message, error) { + r.activateReadLag() + msgBatch := make([]Message, 0, batchSize) + + var i int + for i <= batchSize { + r.mutex.Lock() + + if !r.closed && r.version == 0 { + r.start(r.getTopicPartitionOffset()) + } + + version := r.version + r.mutex.Unlock() + + select { + case <-ctx.Done(): + return []Message{}, ctx.Err() + + case err := <-r.runError: + return []Message{}, err + + case m, ok := <-r.msgs: + if !ok { + return []Message{}, io.EOF + } + + if m.version < version { + continue + } + + r.mutex.Lock() + + switch { + case m.error != nil: + case version == r.version: + r.offset = m.message.Offset + 1 + r.lag = m.watermark - r.offset + } + + r.mutex.Unlock() + + if errors.Is(m.error, io.EOF) { + // io.EOF is used as a marker to indicate that the stream + // has been closed, in case it was received from the inner + // reader we don't want to confuse the program and replace + // the error with io.ErrUnexpectedEOF. + m.error = io.ErrUnexpectedEOF + } + if m.error != nil { + return nil, m.error + } + + msgBatch = append(msgBatch, m.message) + } + i++ + } + return msgBatch, nil +} + // ReadLag returns the current lag of the reader by fetching the last offset of // the topic and partition and computing the difference between that value and // the offset of the last message returned by ReadMessage. From 4c6275619c2b533b880ec17dc91e8626d7efa34b Mon Sep 17 00:00:00 2001 From: Maksim Falaleev Date: Sat, 12 Jul 2025 13:28:24 +0800 Subject: [PATCH 2/2] support timeout for FetchMessageBatch fn, added new test scenario --- client_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++ reader.go | 31 ++++++++++----------- 2 files changed, 89 insertions(+), 17 deletions(-) diff --git a/client_test.go b/client_test.go index 62153b234..79e3060eb 100644 --- a/client_test.go +++ b/client_test.go @@ -111,6 +111,10 @@ func TestClient(t *testing.T) { scenario: "retrieve committed offsets for a consumer group and topic", function: testConsumerGroupFetchOffsets, }, + { + scenario: "retrieve committed offsets for a consumer group and topic with batch processing", + function: testConsumerGroupFetchMessageBatch, + }, } for _, test := range tests { @@ -191,6 +195,77 @@ func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, client *Cl } } +func testConsumerGroupFetchMessageBatch(t *testing.T, ctx context.Context, client *Client) { + const totalMessages = 144 + const partitions = 12 + const msgPerPartition = totalMessages / partitions + + topic := makeTopic() + if err := clientCreateTopic(client, topic, partitions); err != nil { + t.Fatal(err) + } + + groupId := makeGroupID() + brokers := []string{"localhost:9092"} + + writer := &Writer{ + Addr: TCP(brokers...), + Topic: topic, + Balancer: &RoundRobin{}, + BatchSize: 1, + Transport: client.Transport, + } + if err := writer.WriteMessages(ctx, makeTestSequence(totalMessages)...); err != nil { + t.Fatalf("bad write messages: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("bad write err: %v", err) + } + + batchSize := 100 + r := NewReader(ReaderConfig{ + Brokers: brokers, + Topic: topic, + GroupID: groupId, + MinBytes: 1, + MaxBytes: 10e6, + MaxWait: 100 * time.Millisecond, + QueueCapacity: batchSize, + }) + defer r.Close() + + var expected int + for { + m, err := r.FetchMessageBatch(ctx, batchSize, time.Second) + if err != nil { + t.Fatalf("error fetching message: %s", err) + } + if err := r.CommitMessages(context.Background(), m...); err != nil { + t.Fatal(err) + } + expected += len(m) + if expected == totalMessages { + break + } + } + + offsets, err := client.ConsumerOffsets(ctx, TopicAndGroup{GroupId: groupId, Topic: topic}) + if err != nil { + t.Fatal(err) + } + + if len(offsets) != partitions { + t.Fatalf("expected %d partitions but only received offsets for %d", partitions, len(offsets)) + } + + for i := 0; i < partitions; i++ { + committedOffset := offsets[i] + if committedOffset != msgPerPartition { + t.Errorf("expected partition %d with committed offset of %d but received %d", i, msgPerPartition, committedOffset) + } + } +} + func TestClientProduceAndConsume(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/reader.go b/reader.go index 879062666..ed6cdbfdd 100644 --- a/reader.go +++ b/reader.go @@ -914,28 +914,23 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error { // FetchMessageBatch fetches a batch of messages from the reader. It is similar to // FetchMessage, except it blocks until no. of messages read reaches batchSize. -func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Message, error) { +func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int, deadline time.Duration) ([]Message, error) { r.activateReadLag() msgBatch := make([]Message, 0, batchSize) - var i int - for i <= batchSize { - r.mutex.Lock() - - if !r.closed && r.version == 0 { - r.start(r.getTopicPartitionOffset()) - } - - version := r.version - r.mutex.Unlock() + r.mutex.Lock() + if !r.closed && r.version == 0 { + r.start(r.getTopicPartitionOffset()) + } + version := r.version + r.mutex.Unlock() + for { select { case <-ctx.Done(): return []Message{}, ctx.Err() - - case err := <-r.runError: - return []Message{}, err - + case <-time.After(deadline): + return msgBatch, nil case m, ok := <-r.msgs: if !ok { return []Message{}, io.EOF @@ -968,10 +963,12 @@ func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Messag } msgBatch = append(msgBatch, m.message) + + if len(msgBatch) == batchSize { + return msgBatch, nil + } } - i++ } - return msgBatch, nil } // ReadLag returns the current lag of the reader by fetching the last offset of