Skip to content

Commit bef2911

Browse files
authored
#726 make kafka reader message ReadBatchTimeout configurable (#989)
* #726 remove reader safetyTimeout * add configuration * update docs and add default * update comment * add read deadline to every itration * rename to ReadBatchTimeout
1 parent ee37c7f commit bef2911

File tree

1 file changed

+44
-40
lines changed

1 file changed

+44
-40
lines changed

reader.go

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,11 @@ type ReaderConfig struct {
400400
// Default: 10s
401401
MaxWait time.Duration
402402

403+
// ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
404+
//
405+
// Default: 10s
406+
ReadBatchTimeout time.Duration
407+
403408
// ReadLagInterval sets the frequency at which the reader lag is updated.
404409
// Setting this field to a negative value disables lag reporting.
405410
ReadLagInterval time.Duration
@@ -654,6 +659,10 @@ func NewReader(config ReaderConfig) *Reader {
654659
config.MaxWait = 10 * time.Second
655660
}
656661

662+
if config.ReadBatchTimeout == 0 {
663+
config.ReadBatchTimeout = 10 * time.Second
664+
}
665+
657666
if config.ReadLagInterval == 0 {
658667
config.ReadLagInterval = 1 * time.Minute
659668
}
@@ -1203,22 +1212,23 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
12031212
defer join.Done()
12041213

12051214
(&reader{
1206-
dialer: r.config.Dialer,
1207-
logger: r.config.Logger,
1208-
errorLogger: r.config.ErrorLogger,
1209-
brokers: r.config.Brokers,
1210-
topic: key.topic,
1211-
partition: int(key.partition),
1212-
minBytes: r.config.MinBytes,
1213-
maxBytes: r.config.MaxBytes,
1214-
maxWait: r.config.MaxWait,
1215-
backoffDelayMin: r.config.ReadBackoffMin,
1216-
backoffDelayMax: r.config.ReadBackoffMax,
1217-
version: r.version,
1218-
msgs: r.msgs,
1219-
stats: r.stats,
1220-
isolationLevel: r.config.IsolationLevel,
1221-
maxAttempts: r.config.MaxAttempts,
1215+
dialer: r.config.Dialer,
1216+
logger: r.config.Logger,
1217+
errorLogger: r.config.ErrorLogger,
1218+
brokers: r.config.Brokers,
1219+
topic: key.topic,
1220+
partition: int(key.partition),
1221+
minBytes: r.config.MinBytes,
1222+
maxBytes: r.config.MaxBytes,
1223+
maxWait: r.config.MaxWait,
1224+
readBatchTimeout: r.config.ReadBatchTimeout,
1225+
backoffDelayMin: r.config.ReadBackoffMin,
1226+
backoffDelayMax: r.config.ReadBackoffMax,
1227+
version: r.version,
1228+
msgs: r.msgs,
1229+
stats: r.stats,
1230+
isolationLevel: r.config.IsolationLevel,
1231+
maxAttempts: r.config.MaxAttempts,
12221232

12231233
// backwards-compatibility flags
12241234
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
@@ -1231,22 +1241,23 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
12311241
// used as an way to asynchronously fetch messages while the main program reads
12321242
// them using the high level reader API.
12331243
type reader struct {
1234-
dialer *Dialer
1235-
logger Logger
1236-
errorLogger Logger
1237-
brokers []string
1238-
topic string
1239-
partition int
1240-
minBytes int
1241-
maxBytes int
1242-
maxWait time.Duration
1243-
backoffDelayMin time.Duration
1244-
backoffDelayMax time.Duration
1245-
version int64
1246-
msgs chan<- readerMessage
1247-
stats *readerStats
1248-
isolationLevel IsolationLevel
1249-
maxAttempts int
1244+
dialer *Dialer
1245+
logger Logger
1246+
errorLogger Logger
1247+
brokers []string
1248+
topic string
1249+
partition int
1250+
minBytes int
1251+
maxBytes int
1252+
maxWait time.Duration
1253+
readBatchTimeout time.Duration
1254+
backoffDelayMin time.Duration
1255+
backoffDelayMax time.Duration
1256+
version int64
1257+
msgs chan<- readerMessage
1258+
stats *readerStats
1259+
isolationLevel IsolationLevel
1260+
maxAttempts int
12501261

12511262
offsetOutOfRangeError bool
12521263
}
@@ -1514,15 +1525,8 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
15141525
var size int64
15151526
var bytes int64
15161527

1517-
const safetyTimeout = 10 * time.Second
1518-
deadline := time.Now().Add(safetyTimeout)
1519-
conn.SetReadDeadline(deadline)
1520-
15211528
for {
1522-
if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
1523-
deadline = now.Add(safetyTimeout)
1524-
conn.SetReadDeadline(deadline)
1525-
}
1529+
conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
15261530

15271531
if msg, err = batch.ReadMessage(); err != nil {
15281532
batch.Close()

0 commit comments

Comments
 (0)