Skip to content

Commit 58f7149

Browse files
committed
feat: add ContextTimeoutEnabled to respect context timeouts and deadlines
1 parent 8319b1e commit 58f7149

File tree

8 files changed

+92
-59
lines changed

8 files changed

+92
-59
lines changed

cluster.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ type ClusterOptions struct {
6666
MinRetryBackoff time.Duration
6767
MaxRetryBackoff time.Duration
6868

69-
DialTimeout time.Duration
70-
ReadTimeout time.Duration
71-
WriteTimeout time.Duration
69+
DialTimeout time.Duration
70+
ReadTimeout time.Duration
71+
WriteTimeout time.Duration
72+
ContextTimeoutEnabled bool
7273

7374
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
7475
PoolFIFO bool
@@ -1259,14 +1260,14 @@ func (c *ClusterClient) _processPipelineNode(
12591260
) {
12601261
_ = node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
12611262
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1262-
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1263+
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
12631264
return writeCmds(wr, cmds)
12641265
}); err != nil {
12651266
setCmdsErr(cmds, err)
12661267
return err
12671268
}
12681269

1269-
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1270+
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
12701271
return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
12711272
})
12721273
})
@@ -1421,14 +1422,14 @@ func (c *ClusterClient) _processTxPipelineNode(
14211422
_ = node.Client.hooks.processTxPipeline(
14221423
ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
14231424
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1424-
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1425+
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
14251426
return writeCmds(wr, cmds)
14261427
}); err != nil {
14271428
setCmdsErr(cmds, err)
14281429
return err
14291430
}
14301431

1431-
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1432+
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
14321433
statusCmd := cmds[0].(*StatusCmd)
14331434
// Trim multi and exec.
14341435
trimmedCmds := cmds[1 : len(cmds)-1]
@@ -1788,6 +1789,13 @@ func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client,
17881789
return node.Client, err
17891790
}
17901791

1792+
func (c *ClusterClient) context(ctx context.Context) context.Context {
1793+
if c.opt.ContextTimeoutEnabled {
1794+
return ctx
1795+
}
1796+
return context.Background()
1797+
}
1798+
17911799
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
17921800
for _, n := range nodes {
17931801
if n == node {

internal/pool/conn.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ func (cn *Conn) RemoteAddr() net.Addr {
6363
return nil
6464
}
6565

66-
func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
66+
func (cn *Conn) WithReader(
67+
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
68+
) error {
6769
if timeout >= 0 {
6870
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
6971
return err

main_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,10 @@ func redisOptions() *redis.Options {
123123
Addr: redisAddr,
124124
DB: 15,
125125

126-
DialTimeout: 10 * time.Second,
127-
ReadTimeout: 30 * time.Second,
128-
WriteTimeout: 30 * time.Second,
126+
DialTimeout: 10 * time.Second,
127+
ReadTimeout: 30 * time.Second,
128+
WriteTimeout: 30 * time.Second,
129+
ContextTimeoutEnabled: true,
129130

130131
MaxRetries: -1,
131132

options.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,14 @@ type Options struct {
8383
// - `-1` - no timeout (block indefinitely).
8484
// - `-2` - disables SetWriteDeadline calls completely.
8585
WriteTimeout time.Duration
86+
// ContextTimeoutEnabled controls whether the client respects context timeouts and deadlines.
87+
// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
88+
ContextTimeoutEnabled bool
8689

8790
// Type of connection pool.
8891
// true for FIFO pool, false for LIFO pool.
89-
// Note that fifo has higher overhead compared to lifo.
92+
// Note that FIFO has slightly higher overhead compared to LIFO,
93+
// but it helps closing idle connections faster reducing the pool size.
9094
PoolFIFO bool
9195
// Maximum number of socket connections.
9296
// Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS.
@@ -100,22 +104,30 @@ type Options struct {
100104
MinIdleConns int
101105
// Maximum number of idle connections.
102106
MaxIdleConns int
103-
// Amount of time after which client closes idle connections.
107+
// ConnMaxIdleTime is the maximum amount of time a connection may be idle.
104108
// Should be less than server's timeout.
109+
//
110+
// Expired connections may be closed lazily before reuse.
111+
// If d <= 0, connections are not closed due to a connection's idle time.
112+
//
105113
// Default is 5 minutes. -1 disables idle timeout check.
106114
ConnMaxIdleTime time.Duration
107-
// Connection age at which client retires (closes) the connection.
108-
// Default is to not close aged connections.
115+
// ConnMaxLifetime is the maximum amount of time a connection may be reused.
116+
//
117+
// Expired connections may be closed lazily before reuse.
118+
// If <= 0, connections are not closed due to a connection's age.
119+
//
120+
// Default is to not close idle connections.
109121
ConnMaxLifetime time.Duration
110122

111-
// Enables read only queries on slave nodes.
112-
readOnly bool
113-
114-
// TLS Config to use. When set TLS will be negotiated.
123+
// TLS Config to use. When set, TLS will be negotiated.
115124
TLSConfig *tls.Config
116125

117-
// Limiter interface used to implemented circuit breaker or rate limiter.
126+
// Limiter interface used to implement circuit breaker or rate limiter.
118127
Limiter Limiter
128+
129+
// Enables read only queries on slave/follower nodes.
130+
readOnly bool
119131
}
120132

121133
func (opt *Options) init() {

pubsub.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ type PubSub struct {
2424
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
2525
closeConn func(*pool.Conn) error
2626

27-
mu sync.Mutex
28-
cn *pool.Conn
29-
channels map[string]struct{}
30-
patterns map[string]struct{}
27+
mu sync.Mutex
28+
cn *pool.Conn
29+
channels map[string]struct{}
30+
patterns map[string]struct{}
3131
schannels map[string]struct{}
3232

3333
closed bool
@@ -84,7 +84,7 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er
8484
}
8585

8686
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
87-
return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
87+
return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error {
8888
return writeCmd(wr, cmd)
8989
})
9090
}
@@ -408,7 +408,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
408408
return nil, err
409409
}
410410

411-
err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error {
411+
err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
412412
return c.cmd.readReply(rd)
413413
})
414414

redis.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -316,17 +316,15 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
316316
}
317317

318318
retryTimeout := uint32(0)
319-
err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
320-
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
319+
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
320+
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
321321
return writeCmd(wr, cmd)
322-
})
323-
if err != nil {
322+
}); err != nil {
324323
atomic.StoreUint32(&retryTimeout, 1)
325324
return err
326325
}
327326

328-
err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
329-
if err != nil {
327+
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
330328
if cmd.readTimeout() == nil {
331329
atomic.StoreUint32(&retryTimeout, 1)
332330
} else {
@@ -336,13 +334,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
336334
}
337335

338336
return nil
339-
})
340-
if err == nil {
341-
return false, nil
337+
}); err != nil {
338+
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
339+
return retry, err
342340
}
343341

344-
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
345-
return retry, err
342+
return false, nil
346343
}
347344

348345
func (c *baseClient) retryBackoff(attempt int) time.Duration {
@@ -430,14 +427,14 @@ func (c *baseClient) _generalProcessPipeline(
430427
func (c *baseClient) pipelineProcessCmds(
431428
ctx context.Context, cn *pool.Conn, cmds []Cmder,
432429
) (bool, error) {
433-
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
430+
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
434431
return writeCmds(wr, cmds)
435432
}); err != nil {
436433
setCmdsErr(cmds, err)
437434
return true, err
438435
}
439436

440-
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
437+
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
441438
return pipelineReadCmds(rd, cmds)
442439
}); err != nil {
443440
return true, err
@@ -462,14 +459,14 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
462459
func (c *baseClient) txPipelineProcessCmds(
463460
ctx context.Context, cn *pool.Conn, cmds []Cmder,
464461
) (bool, error) {
465-
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
462+
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
466463
return writeCmds(wr, cmds)
467464
}); err != nil {
468465
setCmdsErr(cmds, err)
469466
return true, err
470467
}
471468

472-
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
469+
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
473470
statusCmd := cmds[0].(*StatusCmd)
474471
// Trim multi and exec.
475472
trimmedCmds := cmds[1 : len(cmds)-1]
@@ -527,6 +524,13 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
527524
return nil
528525
}
529526

527+
func (c *baseClient) context(ctx context.Context) context.Context {
528+
if c.opt.ContextTimeoutEnabled {
529+
return ctx
530+
}
531+
return context.Background()
532+
}
533+
530534
//------------------------------------------------------------------------------
531535

532536
// Client is a Redis client representing a pool of zero or more underlying connections.

sentinel.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ type FailoverOptions struct {
5959
MinRetryBackoff time.Duration
6060
MaxRetryBackoff time.Duration
6161

62-
DialTimeout time.Duration
63-
ReadTimeout time.Duration
64-
WriteTimeout time.Duration
62+
DialTimeout time.Duration
63+
ReadTimeout time.Duration
64+
WriteTimeout time.Duration
65+
ContextTimeoutEnabled bool
6566

6667
PoolFIFO bool
6768

@@ -90,9 +91,10 @@ func (opt *FailoverOptions) clientOptions() *Options {
9091
MinRetryBackoff: opt.MinRetryBackoff,
9192
MaxRetryBackoff: opt.MaxRetryBackoff,
9293

93-
DialTimeout: opt.DialTimeout,
94-
ReadTimeout: opt.ReadTimeout,
95-
WriteTimeout: opt.WriteTimeout,
94+
DialTimeout: opt.DialTimeout,
95+
ReadTimeout: opt.ReadTimeout,
96+
WriteTimeout: opt.WriteTimeout,
97+
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
9698

9799
PoolFIFO: opt.PoolFIFO,
98100
PoolSize: opt.PoolSize,

universal.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ type UniversalOptions struct {
3232
MinRetryBackoff time.Duration
3333
MaxRetryBackoff time.Duration
3434

35-
DialTimeout time.Duration
36-
ReadTimeout time.Duration
37-
WriteTimeout time.Duration
35+
DialTimeout time.Duration
36+
ReadTimeout time.Duration
37+
WriteTimeout time.Duration
38+
ContextTimeoutEnabled bool
3839

3940
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
4041
PoolFIFO bool
@@ -84,9 +85,10 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
8485
MinRetryBackoff: o.MinRetryBackoff,
8586
MaxRetryBackoff: o.MaxRetryBackoff,
8687

87-
DialTimeout: o.DialTimeout,
88-
ReadTimeout: o.ReadTimeout,
89-
WriteTimeout: o.WriteTimeout,
88+
DialTimeout: o.DialTimeout,
89+
ReadTimeout: o.ReadTimeout,
90+
WriteTimeout: o.WriteTimeout,
91+
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
9092

9193
PoolFIFO: o.PoolFIFO,
9294

@@ -124,9 +126,10 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
124126
MinRetryBackoff: o.MinRetryBackoff,
125127
MaxRetryBackoff: o.MaxRetryBackoff,
126128

127-
DialTimeout: o.DialTimeout,
128-
ReadTimeout: o.ReadTimeout,
129-
WriteTimeout: o.WriteTimeout,
129+
DialTimeout: o.DialTimeout,
130+
ReadTimeout: o.ReadTimeout,
131+
WriteTimeout: o.WriteTimeout,
132+
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
130133

131134
PoolFIFO: o.PoolFIFO,
132135
PoolSize: o.PoolSize,
@@ -160,9 +163,10 @@ func (o *UniversalOptions) Simple() *Options {
160163
MinRetryBackoff: o.MinRetryBackoff,
161164
MaxRetryBackoff: o.MaxRetryBackoff,
162165

163-
DialTimeout: o.DialTimeout,
164-
ReadTimeout: o.ReadTimeout,
165-
WriteTimeout: o.WriteTimeout,
166+
DialTimeout: o.DialTimeout,
167+
ReadTimeout: o.ReadTimeout,
168+
WriteTimeout: o.WriteTimeout,
169+
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
166170

167171
PoolFIFO: o.PoolFIFO,
168172
PoolSize: o.PoolSize,

0 commit comments

Comments
 (0)