Skip to content

Commit 5a19469

Browse files
authored
Add "human readable" description for offset in reader logs (#893)
When the reader logs an offset value, if the value is a special value, i.e. FirstOffset / LastOffset, it will log something like setting the offset ... from -2 to 20 With this change, it'll say "first offset" or "last offset", respectively, for example: setting the offset ... from first offset to 20 It uses a new `fmt.Formatter` type, `humanOffset`, which seems more expressive (and performant) that simply converting to a string. Instead of type-casting offset directly, I added a helper `toHumanOffset` to avoid other types, e.g. `humanOffset(uint64(1))`. Co-authored-by: Wade Carpenter <[email protected]>
1 parent 0b5e1f6 commit 5a19469

File tree

1 file changed

+29
-11
lines changed

1 file changed

+29
-11
lines changed

reader.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,7 @@ func (r *Reader) Offset() int64 {
996996
offset := r.offset
997997
r.mutex.Unlock()
998998
r.withLogger(func(log Logger) {
999-
log.Printf("looking up offset of kafka reader for partition %d of %s: %d", r.config.Partition, r.config.Topic, offset)
999+
log.Printf("looking up offset of kafka reader for partition %d of %s: %s", r.config.Partition, r.config.Topic, toHumanOffset(offset))
10001000
})
10011001
return offset
10021002
}
@@ -1034,8 +1034,8 @@ func (r *Reader) SetOffset(offset int64) error {
10341034
err = io.ErrClosedPipe
10351035
} else if offset != r.offset {
10361036
r.withLogger(func(log Logger) {
1037-
log.Printf("setting the offset of the kafka reader for partition %d of %s from %d to %d",
1038-
r.config.Partition, r.config.Topic, r.offset, offset)
1037+
log.Printf("setting the offset of the kafka reader for partition %d of %s from %s to %s",
1038+
r.config.Partition, r.config.Topic, toHumanOffset(r.offset), toHumanOffset(offset))
10391039
})
10401040
r.offset = offset
10411041

@@ -1260,7 +1260,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
12601260
}
12611261

12621262
r.withLogger(func(log Logger) {
1263-
log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, offset)
1263+
log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, toHumanOffset(offset))
12641264
})
12651265

12661266
conn, start, err := r.initialize(ctx, offset)
@@ -1320,7 +1320,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13201320
continue
13211321
case UnknownTopicOrPartition:
13221322
r.withErrorLogger(func(log Logger) {
1323-
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers)
1323+
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)
13241324
})
13251325

13261326
conn.Close()
@@ -1331,7 +1331,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13311331
break readLoop
13321332
case NotLeaderForPartition:
13331333
r.withErrorLogger(func(log Logger) {
1334-
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, offset)
1334+
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset))
13351335
})
13361336

13371337
conn.Close()
@@ -1345,7 +1345,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13451345
// Timeout on the kafka side, this can be safely retried.
13461346
errcount = 0
13471347
r.withLogger(func(log Logger) {
1348-
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset)
1348+
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
13491349
})
13501350
r.stats.timeouts.observe(1)
13511351
continue
@@ -1363,7 +1363,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13631363
switch {
13641364
case offset < first:
13651365
r.withErrorLogger(func(log Logger) {
1366-
log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, offset, first, first-offset)
1366+
log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, toHumanOffset(offset), first, first-offset)
13671367
})
13681368
offset, errcount = first, 0
13691369
continue // retry immediately so we don't keep falling behind due to the backoff
@@ -1375,7 +1375,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13751375
default:
13761376
// We may be reading past the last offset, will retry later.
13771377
r.withErrorLogger(func(log Logger) {
1378-
log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, offset)
1378+
log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
13791379
})
13801380
}
13811381

@@ -1396,7 +1396,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13961396
r.sendError(ctx, err)
13971397
} else {
13981398
r.withErrorLogger(func(log Logger) {
1399-
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err)
1399+
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, toHumanOffset(offset), err)
14001400
})
14011401
r.stats.errors.observe(1)
14021402
conn.Close()
@@ -1442,7 +1442,7 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
14421442
}
14431443

14441444
r.withLogger(func(log Logger) {
1445-
log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, offset)
1445+
log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, toHumanOffset(offset))
14461446
})
14471447

14481448
if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
@@ -1578,3 +1578,21 @@ func extractTopics(members []GroupMember) []string {
15781578

15791579
return topics
15801580
}
1581+
1582+
type humanOffset int64
1583+
1584+
func toHumanOffset(v int64) humanOffset {
1585+
return humanOffset(v)
1586+
}
1587+
1588+
func (offset humanOffset) Format(w fmt.State, _ rune) {
1589+
v := int64(offset)
1590+
switch v {
1591+
case FirstOffset:
1592+
fmt.Fprint(w, "first offset")
1593+
case LastOffset:
1594+
fmt.Fprintf(w, "last offset")
1595+
default:
1596+
fmt.Fprintf(w, strconv.FormatInt(v, 10))
1597+
}
1598+
}

0 commit comments

Comments
 (0)