Skip to content

Commit d83a475

Browse files
authored
feat: gRPC supports reusing write buffer for each connection (#1918)
1 parent 850f33d commit d83a475

12 files changed

Lines changed: 533 additions & 66 deletions

File tree

client/option.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,28 @@ func WithGRPCKeepaliveParams(kp grpc.ClientKeepalive) Option {
547547
}}
548548
}
549549

550+
// WithGRPCReuseWriteBuffer enables write buffer reusing across connection lifecycle.
551+
// When enabled, a fixed-size buffer allocation (default 64KB) is no longer assigned to each connection.
552+
// Instead, buffers are allocated on-demand from buffer pool and put back to buffer pool after each flush.
553+
// Trading CPU overhead (malloc/free) for reduced memory usage.
554+
//
555+
// Use cases:
556+
// - Scenarios with thousands of idle/low-traffic connections
557+
// - Memory-constrained environments
558+
//
559+
// Trade-offs:
560+
// - Reduces memory: ~2*WriteBufferSize per connection
561+
// - Increases CPU: buffer pool malloc/free overhead on each write=>flush cycle
562+
// - Slightly higher GC pressure
563+
//
564+
// This feature is disabled by default. (optimized for throughput)
565+
func WithGRPCReuseWriteBuffer(cfg grpc.ReuseWriteBufferConfig) Option {
566+
return Option{F: func(o *client.Options, di *utils.Slice) {
567+
di.Push(fmt.Sprintf("WithGRPCReuseWriteBuffer(%+v)", cfg))
568+
o.GRPCConnectOpts.ReuseWriteBufferConfig = cfg
569+
}}
570+
}
571+
550572
// WithWarmingUp forces the client to do some warm-ups at the end of the initialization.
551573
func WithWarmingUp(wuo *warmup.ClientOption) Option {
552574
return Option{F: func(o *client.Options, di *utils.Slice) {

client/option_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,17 @@ func TestWithGRPCTLSConfig(t *testing.T) {
755755
test.Assert(t, opts.GRPCConnectOpts != nil)
756756
}
757757

758+
func TestWithGRPCReuseWriteBuffer(t *testing.T) {
759+
opts := client.NewOptions([]client.Option{})
760+
test.Assert(t, !opts.GRPCConnectOpts.ReuseWriteBufferConfig.Enable, opts.GRPCConnectOpts)
761+
opts = client.NewOptions([]client.Option{WithGRPCReuseWriteBuffer(grpc.ReuseWriteBufferConfig{Enable: true})})
762+
test.Assert(t, opts.GRPCConnectOpts.ReuseWriteBufferConfig.Enable, opts.GRPCConnectOpts)
763+
opts = client.NewOptions([]client.Option{WithGRPCReuseWriteBuffer(grpc.ReuseWriteBufferConfig{Enable: false})})
764+
test.Assert(t, !opts.GRPCConnectOpts.ReuseWriteBufferConfig.Enable, opts.GRPCConnectOpts)
765+
opts = client.NewOptions([]client.Option{WithGRPCReuseWriteBuffer(grpc.ReuseWriteBufferConfig{})})
766+
test.Assert(t, !opts.GRPCConnectOpts.ReuseWriteBufferConfig.Enable, opts.GRPCConnectOpts)
767+
}
768+
758769
func TestTailOption(t *testing.T) {
759770
ctrl := gomock.NewController(t)
760771
defer ctrl.Finish()

pkg/remote/trans/nphttp2/grpc/controlbuf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ func (l *loopyWriter) run(remoteAddr string) (err error) {
610610
}
611611
if gosched {
612612
gosched = false
613-
if l.framer.writer.offset < minBatchSize {
613+
if l.framer.writer.GetOffset() < minBatchSize {
614614
runtime.Gosched()
615615
continue hasdata
616616
}

pkg/remote/trans/nphttp2/grpc/framer.go

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"net"
2222

2323
"github.com/bytedance/gopkg/lang/dirtmake"
24+
"github.com/bytedance/gopkg/lang/mcache"
2425
"github.com/cloudwego/netpoll"
2526
"golang.org/x/net/http2/hpack"
2627

@@ -30,17 +31,17 @@ import (
3031
type framer struct {
3132
*grpcframe.Framer
3233
reader netpoll.Reader
33-
writer *bufWriter
34+
writer bufWriter
3435
}
3536

36-
func newFramer(conn net.Conn, writeBufferSize, readBufferSize, maxHeaderListSize uint32) *framer {
37+
func newFramer(conn net.Conn, writeBufferSize, readBufferSize, maxHeaderListSize uint32, reuseCfg ReuseWriteBufferConfig) *framer {
3738
var r netpoll.Reader
3839
if npConn, ok := conn.(interface{ Reader() netpoll.Reader }); ok {
3940
r = npConn.Reader()
4041
} else {
4142
r = netpoll.NewReader(conn)
4243
}
43-
w := newBufWriter(conn, int(writeBufferSize))
44+
w := newBufWriter(conn, int(writeBufferSize), reuseCfg)
4445
fr := &framer{
4546
reader: r,
4647
writer: w,
@@ -55,23 +56,41 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize, maxHeaderListSize
5556
return fr
5657
}
5758

58-
type bufWriter struct {
59-
writer io.Writer
60-
buf []byte
61-
offset int
62-
batchSize int
63-
err error
59+
type ReuseWriteBufferConfig struct {
60+
Enable bool
61+
}
62+
63+
type bufWriter interface {
64+
Write(b []byte) (n int, err error)
65+
Flush() error
66+
GetOffset() int
6467
}
6568

66-
func newBufWriter(writer io.Writer, batchSize int) *bufWriter {
67-
return &bufWriter{
68-
buf: dirtmake.Bytes(batchSize*2, batchSize*2),
69+
func newBufWriter(writer io.Writer, batchSize int, reuseCfg ReuseWriteBufferConfig) bufWriter {
70+
w := &keepBufWriter{
6971
batchSize: batchSize,
7072
writer: writer,
7173
}
74+
75+
if !reuseCfg.Enable {
76+
// Using pre-allocated memory dedicated to each connection
77+
w.buf = dirtmake.Bytes(batchSize*2, batchSize*2)
78+
return w
79+
}
80+
81+
return &reuseBufWriter{w}
7282
}
7383

74-
func (w *bufWriter) Write(b []byte) (n int, err error) {
84+
// keepBufWriter pre-allocates batchSize * 2 buf and keeps it resident throughout the connection lifecycle.
85+
type keepBufWriter struct {
86+
writer io.Writer
87+
buf []byte
88+
offset int
89+
batchSize int
90+
err error
91+
}
92+
93+
func (w *keepBufWriter) Write(b []byte) (n int, err error) {
7594
if w.err != nil {
7695
return 0, w.err
7796
}
@@ -84,13 +103,23 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
84103
w.offset += nn
85104
n += nn
86105
if w.offset >= w.batchSize {
87-
err = w.Flush()
106+
if err = w.flushAllocatedBuffer(); err != nil {
107+
return n, err
108+
}
88109
}
89110
}
90111
return n, err
91112
}
92113

93-
func (w *bufWriter) Flush() error {
114+
func (w *keepBufWriter) Flush() error {
115+
return w.flushAllocatedBuffer()
116+
}
117+
118+
func (w *keepBufWriter) GetOffset() int {
119+
return w.offset
120+
}
121+
122+
func (w *keepBufWriter) flushAllocatedBuffer() error {
94123
if w.err != nil {
95124
return w.err
96125
}
@@ -101,3 +130,47 @@ func (w *bufWriter) Flush() error {
101130
w.offset = 0
102131
return w.err
103132
}
133+
134+
// During the Write=>Write=> … =>Write=>Flush cycle,
135+
// the first Write allocates batchSize * 2 buf from mcache, and the final Flush returns it.
136+
type reuseBufWriter struct {
137+
*keepBufWriter
138+
}
139+
140+
func (w *reuseBufWriter) Write(b []byte) (n int, err error) {
141+
if w.err != nil {
142+
return 0, w.err
143+
}
144+
if w.batchSize == 0 { // buffer has been disabled.
145+
return w.writer.Write(b)
146+
}
147+
if w.buf == nil {
148+
w.buf = mcache.Malloc(w.batchSize*2, w.batchSize*2)
149+
}
150+
for len(b) > 0 {
151+
nn := copy(w.buf[w.offset:], b)
152+
b = b[nn:]
153+
w.offset += nn
154+
n += nn
155+
if w.offset >= w.batchSize {
156+
if err = w.flushAllocatedBuffer(); err != nil {
157+
return n, err
158+
}
159+
}
160+
}
161+
return n, err
162+
}
163+
164+
func (w *reuseBufWriter) Flush() error {
165+
err := w.flushAllocatedBuffer()
166+
// Reuse only when err is nil, to prevent the underlying connection from still holding buf after a Write failure.
167+
if err == nil && w.buf != nil {
168+
mcache.Free(w.buf)
169+
w.buf = nil
170+
}
171+
return err
172+
}
173+
174+
func (w *reuseBufWriter) GetOffset() int {
175+
return w.offset
176+
}

pkg/remote/trans/nphttp2/grpc/http2_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
186186
readerDone: make(chan struct{}),
187187
writerDone: make(chan struct{}),
188188
goAway: make(chan struct{}),
189-
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
189+
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize, opts.ReuseWriteBufferConfig),
190190
fc: &trInFlow{limit: icwz},
191191
activeStreams: make(map[uint32]*Stream),
192192
kp: kp,

pkg/remote/trans/nphttp2/grpc/http2_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func newHTTP2Server(ctx context.Context, conn net.Conn, config *ServerConfig) (_
148148
maxHeaderListSize = *config.MaxHeaderListSize
149149
}
150150

151-
framer := newFramer(conn, config.WriteBufferSize, config.ReadBufferSize, maxHeaderListSize)
151+
framer := newFramer(conn, config.WriteBufferSize, config.ReadBufferSize, maxHeaderListSize, config.ReuseWriteBufferConfig)
152152
// Send initial settings as connection preface to client.
153153
isettings := []http2.Setting{{
154154
ID: http2.SettingMaxFrameSize,

0 commit comments

Comments
 (0)