diff --git a/client_test.go b/client_test.go index 62153b234..79e3060eb 100644 --- a/client_test.go +++ b/client_test.go @@ -111,6 +111,10 @@ func TestClient(t *testing.T) { scenario: "retrieve committed offsets for a consumer group and topic", function: testConsumerGroupFetchOffsets, }, + { + scenario: "retrieve committed offsets for a consumer group and topic with batch processing", + function: testConsumerGroupFetchMessageBatch, + }, } for _, test := range tests { @@ -191,6 +195,77 @@ func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, client *Cl } } +func testConsumerGroupFetchMessageBatch(t *testing.T, ctx context.Context, client *Client) { + const totalMessages = 144 + const partitions = 12 + const msgPerPartition = totalMessages / partitions + + topic := makeTopic() + if err := clientCreateTopic(client, topic, partitions); err != nil { + t.Fatal(err) + } + + groupId := makeGroupID() + brokers := []string{"localhost:9092"} + + writer := &Writer{ + Addr: TCP(brokers...), + Topic: topic, + Balancer: &RoundRobin{}, + BatchSize: 1, + Transport: client.Transport, + } + if err := writer.WriteMessages(ctx, makeTestSequence(totalMessages)...); err != nil { + t.Fatalf("bad write messages: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("bad write err: %v", err) + } + + batchSize := 100 + r := NewReader(ReaderConfig{ + Brokers: brokers, + Topic: topic, + GroupID: groupId, + MinBytes: 1, + MaxBytes: 10e6, + MaxWait: 100 * time.Millisecond, + QueueCapacity: batchSize, + }) + defer r.Close() + + var expected int + for { + m, err := r.FetchMessageBatch(ctx, batchSize, time.Second) + if err != nil { + t.Fatalf("error fetching message: %s", err) + } + if err := r.CommitMessages(context.Background(), m...); err != nil { + t.Fatal(err) + } + expected += len(m) + if expected == totalMessages { + break + } + } + + offsets, err := client.ConsumerOffsets(ctx, TopicAndGroup{GroupId: groupId, Topic: topic}) + if err != nil { + t.Fatal(err) + } + + if len(offsets) != partitions { + t.Fatalf("expected %d partitions but only received offsets for %d", partitions, len(offsets)) + } + + for i := 0; i < partitions; i++ { + committedOffset := offsets[i] + if committedOffset != msgPerPartition { + t.Errorf("expected partition %d with committed offset of %d but received %d", i, msgPerPartition, committedOffset) + } + } +} + func TestClientProduceAndConsume(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/reader.go b/reader.go index 04d90f355..ed6cdbfdd 100644 --- a/reader.go +++ b/reader.go @@ -912,6 +912,65 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error { } } +// FetchMessageBatch fetches a batch of messages from the reader. It is similar to +// FetchMessage, except it blocks until no. of messages read reaches batchSize. +func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int, deadline time.Duration) ([]Message, error) { + r.activateReadLag() + msgBatch := make([]Message, 0, batchSize) + + r.mutex.Lock() + if !r.closed && r.version == 0 { + r.start(r.getTopicPartitionOffset()) + } + version := r.version + r.mutex.Unlock() + + for { + select { + case <-ctx.Done(): + return []Message{}, ctx.Err() + case <-time.After(deadline): + return msgBatch, nil + case m, ok := <-r.msgs: + if !ok { + return []Message{}, io.EOF + } + + if m.version < version { + continue + } + + r.mutex.Lock() + + switch { + case m.error != nil: + case version == r.version: + r.offset = m.message.Offset + 1 + r.lag = m.watermark - r.offset + } + + r.mutex.Unlock() + + if errors.Is(m.error, io.EOF) { + // io.EOF is used as a marker to indicate that the stream + // has been closed, in case it was received from the inner + // reader we don't want to confuse the program and replace + // the error with io.ErrUnexpectedEOF. + m.error = io.ErrUnexpectedEOF + } + if m.error != nil { + return nil, m.error + } + + msgBatch = append(msgBatch, m.message) + + if len(msgBatch) == batchSize { + return msgBatch, nil + } + } + } +} + // ReadLag returns the current lag of the reader by fetching the last offset of // the topic and partition and computing the difference between that value and // the offset of the last message returned by ReadMessage.