Skip to content

Commit b0bb6ce

Browse files
committed
add configurable buffer sizes for Redis connections
1 parent 7bc12bb commit b0bb6ce

File tree

8 files changed

+290
-4
lines changed

8 files changed

+290
-4
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,18 @@ func main() {
297297
```
298298
299299
300+
### Buffer Size Configuration
301+
302+
go-redis uses 0.5MiB read and write buffers by default for optimal performance. For high-throughput applications or large pipelines, you can customize buffer sizes:
303+
304+
```go
305+
rdb := redis.NewClient(&redis.Options{
306+
Addr: "localhost:6379",
307+
ReadBufferSize: 1024 * 1024, // 1MiB read buffer
308+
WriteBufferSize: 1024 * 1024, // 1MiB write buffer
309+
})
310+
```
311+
300312
### Advanced Configuration
301313
302314
go-redis supports extending the client identification phase to allow projects to send their own custom client identification.

internal/pool/buffer_size_test.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package pool_test
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"net"
7+
"unsafe"
8+
9+
. "github.com/bsm/ginkgo/v2"
10+
. "github.com/bsm/gomega"
11+
12+
"github.com/redis/go-redis/v9/internal/pool"
13+
"github.com/redis/go-redis/v9/internal/proto"
14+
)
15+
16+
var _ = Describe("Buffer Size Configuration", func() {
17+
var connPool *pool.ConnPool
18+
ctx := context.Background()
19+
20+
AfterEach(func() {
21+
if connPool != nil {
22+
connPool.Close()
23+
}
24+
})
25+
26+
It("should use default buffer sizes when not specified", func() {
27+
connPool = pool.NewConnPool(&pool.Options{
28+
Dialer: dummyDialer,
29+
PoolSize: 1,
30+
PoolTimeout: 1000,
31+
})
32+
33+
cn, err := connPool.NewConn(ctx)
34+
Expect(err).NotTo(HaveOccurred())
35+
defer connPool.CloseConn(cn)
36+
37+
// Check that default buffer sizes are used (0.5MiB)
38+
writerBufSize := getWriterBufSizeUnsafe(cn)
39+
readerBufSize := getReaderBufSizeUnsafe(cn)
40+
41+
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
42+
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
43+
})
44+
45+
It("should use custom buffer sizes when specified", func() {
46+
customReadSize := 32 * 1024 // 32KB
47+
customWriteSize := 64 * 1024 // 64KB
48+
49+
connPool = pool.NewConnPool(&pool.Options{
50+
Dialer: dummyDialer,
51+
PoolSize: 1,
52+
PoolTimeout: 1000,
53+
ReadBufferSize: customReadSize,
54+
WriteBufferSize: customWriteSize,
55+
})
56+
57+
cn, err := connPool.NewConn(ctx)
58+
Expect(err).NotTo(HaveOccurred())
59+
defer connPool.CloseConn(cn)
60+
61+
// Check that custom buffer sizes are used
62+
writerBufSize := getWriterBufSizeUnsafe(cn)
63+
readerBufSize := getReaderBufSizeUnsafe(cn)
64+
65+
Expect(writerBufSize).To(Equal(customWriteSize))
66+
Expect(readerBufSize).To(Equal(customReadSize))
67+
})
68+
69+
It("should handle zero buffer sizes by using defaults", func() {
70+
connPool = pool.NewConnPool(&pool.Options{
71+
Dialer: dummyDialer,
72+
PoolSize: 1,
73+
PoolTimeout: 1000,
74+
ReadBufferSize: 0, // Should use default
75+
WriteBufferSize: 0, // Should use default
76+
})
77+
78+
cn, err := connPool.NewConn(ctx)
79+
Expect(err).NotTo(HaveOccurred())
80+
defer connPool.CloseConn(cn)
81+
82+
// Check that default buffer sizes are used (0.5MiB)
83+
writerBufSize := getWriterBufSizeUnsafe(cn)
84+
readerBufSize := getReaderBufSizeUnsafe(cn)
85+
86+
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
87+
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
88+
})
89+
90+
It("should use 0.5MiB default buffer sizes for standalone NewConn", func() {
91+
// Test that NewConn (without pool) also uses 0.5MiB defaults
92+
netConn := newDummyConn()
93+
cn := pool.NewConn(netConn)
94+
defer cn.Close()
95+
96+
writerBufSize := getWriterBufSizeUnsafe(cn)
97+
readerBufSize := getReaderBufSizeUnsafe(cn)
98+
99+
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
100+
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
101+
})
102+
103+
It("should use 0.5MiB defaults even when pool is created directly without buffer sizes", func() {
104+
// Test the scenario where someone creates a pool directly (like in tests)
105+
// without setting ReadBufferSize and WriteBufferSize
106+
connPool = pool.NewConnPool(&pool.Options{
107+
Dialer: dummyDialer,
108+
PoolSize: 1,
109+
PoolTimeout: 1000,
110+
// ReadBufferSize and WriteBufferSize are not set (will be 0)
111+
})
112+
113+
cn, err := connPool.NewConn(ctx)
114+
Expect(err).NotTo(HaveOccurred())
115+
defer connPool.CloseConn(cn)
116+
117+
// Should still get 0.5MiB defaults because NewConnPool sets them
118+
writerBufSize := getWriterBufSizeUnsafe(cn)
119+
readerBufSize := getReaderBufSizeUnsafe(cn)
120+
121+
Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
122+
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
123+
})
124+
})
125+
126+
// Helper functions to extract buffer sizes using unsafe pointers
127+
func getWriterBufSizeUnsafe(cn *pool.Conn) int {
128+
cnPtr := (*struct {
129+
usedAt int64
130+
netConn net.Conn
131+
rd *proto.Reader
132+
bw *bufio.Writer
133+
wr *proto.Writer
134+
// ... other fields
135+
})(unsafe.Pointer(cn))
136+
137+
if cnPtr.bw == nil {
138+
return -1
139+
}
140+
141+
bwPtr := (*struct {
142+
err error
143+
buf []byte
144+
n int
145+
wr interface{}
146+
})(unsafe.Pointer(cnPtr.bw))
147+
148+
return len(bwPtr.buf)
149+
}
150+
151+
func getReaderBufSizeUnsafe(cn *pool.Conn) int {
152+
cnPtr := (*struct {
153+
usedAt int64
154+
netConn net.Conn
155+
rd *proto.Reader
156+
bw *bufio.Writer
157+
wr *proto.Writer
158+
// ... other fields
159+
})(unsafe.Pointer(cn))
160+
161+
if cnPtr.rd == nil {
162+
return -1
163+
}
164+
165+
rdPtr := (*struct {
166+
rd *bufio.Reader
167+
})(unsafe.Pointer(cnPtr.rd))
168+
169+
if rdPtr.rd == nil {
170+
return -1
171+
}
172+
173+
bufReaderPtr := (*struct {
174+
buf []byte
175+
rd interface{}
176+
r, w int
177+
err error
178+
lastByte int
179+
lastRuneSize int
180+
})(unsafe.Pointer(rdPtr.rd))
181+
182+
return len(bufReaderPtr.buf)
183+
}

internal/pool/conn.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,28 @@ type Conn struct {
2828
}
2929

3030
func NewConn(netConn net.Conn) *Conn {
31+
return NewConnWithBufferSize(netConn, proto.DefaultBufferSize, proto.DefaultBufferSize)
32+
}
33+
34+
func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn {
3135
cn := &Conn{
3236
netConn: netConn,
3337
createdAt: time.Now(),
3438
}
35-
cn.rd = proto.NewReader(netConn)
36-
cn.bw = bufio.NewWriter(netConn)
39+
40+
// Use specified buffer sizes, or fall back to 0.5MiB defaults if 0
41+
if readBufSize > 0 {
42+
cn.rd = proto.NewReaderSize(netConn, readBufSize)
43+
} else {
44+
cn.rd = proto.NewReader(netConn) // Uses 0.5MiB default
45+
}
46+
47+
if writeBufSize > 0 {
48+
cn.bw = bufio.NewWriterSize(netConn, writeBufSize)
49+
} else {
50+
cn.bw = bufio.NewWriterSize(netConn, proto.DefaultBufferSize)
51+
}
52+
3753
cn.wr = proto.NewWriter(cn.bw)
3854
cn.SetUsedAt(time.Now())
3955
return cn

internal/pool/pool.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ type Options struct {
7171
MaxActiveConns int
7272
ConnMaxIdleTime time.Duration
7373
ConnMaxLifetime time.Duration
74+
75+
ReadBufferSize int
76+
WriteBufferSize int
7477
}
7578

7679
type lastDialErrorWrap struct {
@@ -226,7 +229,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
226229
return nil, err
227230
}
228231

229-
cn := NewConn(netConn)
232+
cn := NewConnWithBufferSize(netConn, p.cfg.ReadBufferSize, p.cfg.WriteBufferSize)
230233
cn.pooled = pooled
231234
return cn, nil
232235
}

internal/proto/reader.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import (
1212
"github.com/redis/go-redis/v9/internal/util"
1313
)
1414

15+
// DefaultBufferSize is the default size for read/write buffers (0.5MiB)
16+
const DefaultBufferSize = 512 * 1024
17+
1518
// redis resp protocol data type.
1619
const (
1720
RespStatus = '+' // +<string>\r\n
@@ -58,7 +61,13 @@ type Reader struct {
5861

5962
func NewReader(rd io.Reader) *Reader {
6063
return &Reader{
61-
rd: bufio.NewReader(rd),
64+
rd: bufio.NewReaderSize(rd, DefaultBufferSize),
65+
}
66+
}
67+
68+
func NewReaderSize(rd io.Reader, size int) *Reader {
69+
return &Reader{
70+
rd: bufio.NewReaderSize(rd, size),
6271
}
6372
}
6473

options.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/redis/go-redis/v9/auth"
1717
"github.com/redis/go-redis/v9/internal/pool"
18+
"github.com/redis/go-redis/v9/internal/proto"
1819
)
1920

2021
// Limiter is the interface of a rate limiter or a circuit breaker.
@@ -130,6 +131,18 @@ type Options struct {
130131
// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
131132
ContextTimeoutEnabled bool
132133

134+
// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
135+
// Larger buffers can improve performance for commands that return large responses.
136+
//
137+
// default: 0.5MiB (524288 bytes)
138+
ReadBufferSize int
139+
140+
// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
141+
// Larger buffers can improve performance for large pipelines and commands with many arguments.
142+
//
143+
// default: 0.5MiB (524288 bytes)
144+
WriteBufferSize int
145+
133146
// PoolFIFO type of connection pool.
134147
//
135148
// - true for FIFO pool
@@ -241,6 +254,12 @@ func (opt *Options) init() {
241254
if opt.PoolSize == 0 {
242255
opt.PoolSize = 10 * runtime.GOMAXPROCS(0)
243256
}
257+
if opt.ReadBufferSize == 0 {
258+
opt.ReadBufferSize = proto.DefaultBufferSize
259+
}
260+
if opt.WriteBufferSize == 0 {
261+
opt.WriteBufferSize = proto.DefaultBufferSize
262+
}
244263
switch opt.ReadTimeout {
245264
case -2:
246265
opt.ReadTimeout = -1
@@ -592,5 +611,7 @@ func newConnPool(
592611
MaxActiveConns: opt.MaxActiveConns,
593612
ConnMaxIdleTime: opt.ConnMaxIdleTime,
594613
ConnMaxLifetime: opt.ConnMaxLifetime,
614+
ReadBufferSize: opt.ReadBufferSize,
615+
WriteBufferSize: opt.WriteBufferSize,
595616
})
596617
}

osscluster.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,18 @@ type ClusterOptions struct {
9292
ConnMaxIdleTime time.Duration
9393
ConnMaxLifetime time.Duration
9494

95+
// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
96+
// Larger buffers can improve performance for commands that return large responses.
97+
//
98+
// default: 0.5MiB (524288 bytes)
99+
ReadBufferSize int
100+
101+
// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
102+
// Larger buffers can improve performance for large pipelines and commands with many arguments.
103+
//
104+
// default: 0.5MiB (524288 bytes)
105+
WriteBufferSize int
106+
95107
TLSConfig *tls.Config
96108

97109
// DisableIndentity - Disable set-lib on connect.
@@ -127,6 +139,12 @@ func (opt *ClusterOptions) init() {
127139
if opt.PoolSize == 0 {
128140
opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
129141
}
142+
if opt.ReadBufferSize == 0 {
143+
opt.ReadBufferSize = proto.DefaultBufferSize
144+
}
145+
if opt.WriteBufferSize == 0 {
146+
opt.WriteBufferSize = proto.DefaultBufferSize
147+
}
130148

131149
switch opt.ReadTimeout {
132150
case -1:
@@ -318,6 +336,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
318336
MaxActiveConns: opt.MaxActiveConns,
319337
ConnMaxIdleTime: opt.ConnMaxIdleTime,
320338
ConnMaxLifetime: opt.ConnMaxLifetime,
339+
ReadBufferSize: opt.ReadBufferSize,
340+
WriteBufferSize: opt.WriteBufferSize,
321341
DisableIdentity: opt.DisableIdentity,
322342
DisableIndentity: opt.DisableIdentity,
323343
IdentitySuffix: opt.IdentitySuffix,

0 commit comments

Comments
 (0)