Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.

Commit 7bbf8c8

Browse files
authored
Merge pull request #59 from bashtanov/combine-offset-results
consumer: combine offset map from multiple partially successful replies
2 parents 3f834e0 + 237efd0 commit 7bbf8c8

File tree

1 file changed

+46
-33
lines changed

1 file changed

+46
-33
lines changed

pkg/worker/verifier/client_helpers.go

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,53 @@ import (
55
"errors"
66
"time"
77

8+
"github.com/redpanda-data/kgo-verifier/pkg/util"
89
log "github.com/sirupsen/logrus"
910
"github.com/twmb/franz-go/pkg/kerr"
1011
"github.com/twmb/franz-go/pkg/kgo"
1112
"github.com/twmb/franz-go/pkg/kmsg"
1213
"github.com/vectorizedio/redpanda/src/go/rpk/pkg/kafka"
1314
)
1415

16+
type OffsetResult struct {
17+
offset int64
18+
err error
19+
}
20+
1521
// Try to get offsets, with a retry loop in case any partitions are not
1622
// in a position to respond. This is useful to avoid terminating if e.g.
1723
// the cluster is subject to failure injection while workload runs.
1824
func GetOffsets(client *kgo.Client, topic string, nPartitions int32, t int64) []int64 {
1925
wait_t := 2 * time.Second
26+
combinedResult := make([]int64, nPartitions)
27+
haveResult := make([]bool, nPartitions)
28+
29+
req := formOffsetsReq(topic, nPartitions, t)
2030
for {
21-
result, err := getOffsetsInner(client, topic, nPartitions, t)
22-
if err != nil {
23-
log.Warnf("Retrying getOffsets in %v", wait_t)
24-
time.Sleep(wait_t)
25-
} else {
26-
return result
31+
result := attemptGetOffsets(client, topic, nPartitions, req)
32+
var seenPartitions = int32(0)
33+
for i := 0; i < int(nPartitions); i++ {
34+
if result[i].err == nil {
35+
// update even if seen before
36+
combinedResult[i] = result[i].offset
37+
haveResult[i] = true
38+
}
39+
if haveResult[i] {
40+
seenPartitions += 1
41+
}
2742
}
28-
43+
if seenPartitions == nPartitions {
44+
return combinedResult
45+
}
46+
log.Warnf(
47+
"Got offsets for %d/%d partitions, retrying attemptGetOffsets in %v",
48+
seenPartitions, nPartitions, wait_t)
49+
time.Sleep(wait_t)
2950
}
3051
}
3152

32-
func getOffsetsInner(client *kgo.Client, topic string, nPartitions int32, t int64) ([]int64, error) {
53+
func formOffsetsReq(topic string, nPartitions int32, t int64) *kmsg.ListOffsetsRequest {
3354
log.Infof("Loading offsets for topic %s t=%d...", topic, t)
34-
pOffsets := make([]int64, nPartitions)
3555

3656
req := kmsg.NewPtrListOffsetsRequest()
3757
req.ReplicaID = -1
@@ -45,37 +65,30 @@ func getOffsetsInner(client *kgo.Client, topic string, nPartitions int32, t int6
4565
}
4666

4767
req.Topics = append(req.Topics, reqTopic)
68+
return req
69+
}
70+
71+
func attemptGetOffsets(client *kgo.Client, topic string, nPartitions int32, req *kmsg.ListOffsetsRequest) []OffsetResult {
72+
pOffsets := make([]OffsetResult, nPartitions)
73+
for i := range pOffsets {
74+
pOffsets[i].err = errors.New("no result")
75+
}
4876

49-
seenPartitions := int32(0)
5077
shards := client.RequestSharded(context.Background(), req)
51-
var r_err error
52-
allFailed := kafka.EachShard(req, shards, func(shard kgo.ResponseShard) {
53-
if shard.Err != nil {
54-
r_err = shard.Err
55-
return
56-
}
78+
kafka.EachShard(req, shards, func(shard kgo.ResponseShard) {
79+
util.Chk(shard.Err, "kafka.EachShard called processor fn on an error result")
5780
resp := shard.Resp.(*kmsg.ListOffsetsResponse)
5881
for _, partition := range resp.Topics[0].Partitions {
5982
if partition.ErrorCode != 0 {
60-
log.Warnf("error fetching %s/%d metadata: %v", topic, partition.Partition, kerr.ErrorForCode(partition.ErrorCode))
61-
r_err = kerr.ErrorForCode(partition.ErrorCode)
83+
err := kerr.ErrorForCode(partition.ErrorCode)
84+
pOffsets[partition.Partition].err = err
85+
log.Warnf("error fetching %s/%d metadata: %v", topic, partition.Partition, err)
86+
} else {
87+
pOffsets[partition.Partition] = OffsetResult{offset: partition.Offset, err: nil}
88+
log.Debugf("Partition %d offset %d", partition.Partition, partition.Offset)
6289
}
63-
pOffsets[partition.Partition] = partition.Offset
64-
seenPartitions += 1
65-
log.Debugf("Partition %d offset %d", partition.Partition, pOffsets[partition.Partition])
6690
}
6791
})
6892

69-
if allFailed {
70-
return nil, errors.New("All offset requests failed")
71-
}
72-
73-
if seenPartitions < nPartitions {
74-
// The results may be partial, simply omitting some partitions while not
75-
// raising any error. We transform this into an error to avoid wrongly
76-
// returning a 0 offset for any missing partitions
77-
return nil, errors.New("Didn't get data for all partitions")
78-
}
79-
80-
return pOffsets, r_err
93+
return pOffsets
8194
}

0 commit comments

Comments
 (0)