Skip to content

Commit 74b65af

Browse files
authored
transport: Replace closures with interfaces to avoid heap allocations (#8630)
In Go, creating a closure results in a heap allocation if the compiler determines the closure might outlive the function in which it was created. This change removes two such closures, replacing them with interfaces that are implemented by the `ClientStream` and `ServerStream` structs. While this pattern may slightly reduce readability, the performance benefit is worthwhile, as this transport code is executed for every new stream. This reduces allocs/unary RPC by 2.5%. ## Testing ```sh # test command go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \ -compression=off -maxConcurrentCalls=500 -trace=off \ -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}" -recvBufferPool=simple # results go run benchmark/benchresult/main.go unary-before unary-after Title Before After Percentage TotalOps 7593738 7708364 1.51% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 10218.45 10185.84 -0.32% Allocs/op 164.85 160.84 -2.43% ReqT/op 101249840.00 102778186.67 1.51% RespT/op 101249840.00 102778186.67 1.51% 50th-Lat 3.617561ms 3.568623ms -1.35% 90th-Lat 5.218682ms 5.131828ms -1.66% 99th-Lat 6.052632ms 5.950261ms -1.69% Avg-Lat 3.948414ms 3.889006ms -1.50% GoVersion go1.24.4 go1.24.4 GrpcVersion 1.77.0-dev 1.77.0-dev ``` RELEASE NOTES: N/A
1 parent 0e9ac5d commit 74b65af

File tree

7 files changed

+71
-28
lines changed

7 files changed

+71
-28
lines changed

internal/transport/client_stream.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,11 @@ func (s *ClientStream) TrailersOnly() bool {
144144
func (s *ClientStream) Status() *status.Status {
145145
return s.status
146146
}
147+
148+
func (s *ClientStream) requestRead(n int) {
149+
s.ct.adjustWindow(s, uint32(n))
150+
}
151+
152+
func (s *ClientStream) updateWindow(n int) {
153+
s.ct.updateWindow(s, uint32(n))
154+
}

internal/transport/handler_server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,12 @@ func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) e
387387
return err
388388
}
389389

390+
func (ht *serverHandlerTransport) adjustWindow(*ServerStream, uint32) {
391+
}
392+
393+
func (ht *serverHandlerTransport) updateWindow(*ServerStream, uint32) {
394+
}
395+
390396
func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) {
391397
// With this transport type there will be exactly 1 stream: this HTTP request.
392398
var cancel context.CancelFunc
@@ -414,7 +420,6 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
414420
Stream: Stream{
415421
id: 0, // irrelevant
416422
ctx: ctx,
417-
requestRead: func(int) {},
418423
method: req.URL.Path,
419424
recvCompress: req.Header.Get("grpc-encoding"),
420425
contentSubtype: ht.contentSubtype,
@@ -424,9 +429,10 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
424429
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
425430
}
426431
s.Stream.buf.init()
432+
s.readRequester = s
427433
s.trReader = transportReader{
428434
reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf},
429-
windowHandler: func(int) {},
435+
windowHandler: s,
430436
}
431437

432438
// readerDone is closed when the Body.Read-ing goroutine exits.

internal/transport/http2_client.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -493,9 +493,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt
493493
}
494494
s.Stream.buf.init()
495495
s.Stream.wq.init(defaultWriteQuota, s.done)
496-
s.requestRead = func(n int) {
497-
t.adjustWindow(s, uint32(n))
498-
}
496+
s.readRequester = s
499497
// The client side stream context should have exactly the same life cycle with the user provided context.
500498
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
501499
// So we use the original context here instead of creating a copy.
@@ -509,9 +507,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt
509507
s.Close(err)
510508
},
511509
},
512-
windowHandler: func(n int) {
513-
t.updateWindow(s, uint32(n))
514-
},
510+
windowHandler: s,
515511
}
516512
return s
517513
}

internal/transport/http2_server.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -640,9 +640,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
640640
t.channelz.SocketMetrics.StreamsStarted.Add(1)
641641
t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
642642
}
643-
s.requestRead = func(n int) {
644-
t.adjustWindow(s, uint32(n))
645-
}
643+
s.readRequester = s
646644
s.ctxDone = s.ctx.Done()
647645
s.Stream.wq.init(defaultWriteQuota, s.ctxDone)
648646
s.trReader = transportReader{
@@ -651,9 +649,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
651649
ctxDone: s.ctxDone,
652650
recv: &s.buf,
653651
},
654-
windowHandler: func(n int) {
655-
t.updateWindow(s, uint32(n))
656-
},
652+
windowHandler: s,
657653
}
658654
// Register the stream with loopy.
659655
t.controlBuf.put(&registerStream{

internal/transport/server_stream.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,11 @@ func (s *ServerStream) SetTrailer(md metadata.MD) error {
179179
s.hdrMu.Unlock()
180180
return nil
181181
}
182+
183+
func (s *ServerStream) requestRead(n int) {
184+
s.st.adjustWindow(s, uint32(n))
185+
}
186+
187+
func (s *ServerStream) updateWindow(n int) {
188+
s.st.updateWindow(s, uint32(n))
189+
}

internal/transport/transport.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,7 @@ type Stream struct {
291291
recvCompress string
292292
sendCompress string
293293

294-
// Callback to state application's intentions to read data. This
295-
// is used to adjust flow control, if needed.
296-
requestRead func(int)
294+
readRequester readRequester
297295

298296
// contentSubtype is the content-subtype for requests.
299297
// this must be lowercase or the behavior is undefined.
@@ -310,6 +308,12 @@ type Stream struct {
310308
wq writeQuota
311309
}
312310

311+
// readRequester is used to state application's intentions to read data. This
312+
// is used to adjust flow control, if needed.
313+
type readRequester interface {
314+
requestRead(int)
315+
}
316+
313317
func (s *Stream) swapState(st streamState) streamState {
314318
return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
315319
}
@@ -357,7 +361,7 @@ func (s *Stream) ReadMessageHeader(header []byte) (err error) {
357361
if er := s.trReader.er; er != nil {
358362
return er
359363
}
360-
s.requestRead(len(header))
364+
s.readRequester.requestRead(len(header))
361365
for len(header) != 0 {
362366
n, err := s.trReader.ReadMessageHeader(header)
363367
header = header[n:]
@@ -380,7 +384,7 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
380384
if er := s.trReader.er; er != nil {
381385
return nil, er
382386
}
383-
s.requestRead(n)
387+
s.readRequester.requestRead(n)
384388
for n != 0 {
385389
buf, err := s.trReader.Read(n)
386390
var bufLen int
@@ -422,18 +426,24 @@ type transportReader struct {
422426
_ noCopy
423427
// The handler to control the window update procedure for both this
424428
// particular stream and the associated transport.
425-
windowHandler func(int)
429+
windowHandler windowHandler
426430
er error
427431
reader recvBufferReader
428432
}
429433

434+
// The handler to control the window update procedure for both this
435+
// particular stream and the associated transport.
436+
type windowHandler interface {
437+
updateWindow(int)
438+
}
439+
430440
func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
431441
n, err := t.reader.ReadMessageHeader(header)
432442
if err != nil {
433443
t.er = err
434444
return 0, err
435445
}
436-
t.windowHandler(n)
446+
t.windowHandler.updateWindow(n)
437447
return n, nil
438448
}
439449

@@ -443,7 +453,7 @@ func (t *transportReader) Read(n int) (mem.Buffer, error) {
443453
t.er = err
444454
return buf, err
445455
}
446-
t.windowHandler(buf.Len())
456+
t.windowHandler.updateWindow(buf.Len())
447457
return buf, nil
448458
}
449459

@@ -629,6 +639,8 @@ type internalServerTransport interface {
629639
write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
630640
writeStatus(s *ServerStream, st *status.Status) error
631641
incrMsgRecv()
642+
adjustWindow(s *ServerStream, n uint32)
643+
updateWindow(s *ServerStream, n uint32)
632644
}
633645

634646
// connectionErrorf creates an ConnectionError with the specified error description.

internal/transport/transport_test.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,8 +1856,8 @@ func (s) TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) {
18561856
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
18571857
defer cancel()
18581858
s := &Stream{
1859-
ctx: ctx,
1860-
requestRead: func(int) {},
1859+
ctx: ctx,
1860+
readRequester: &fakeReadRequester{},
18611861
}
18621862
s.buf.init()
18631863
s.trReader = transportReader{
@@ -1866,7 +1866,9 @@ func (s) TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) {
18661866
ctxDone: s.ctx.Done(),
18671867
recv: &s.buf,
18681868
},
1869-
windowHandler: func(int) {},
1869+
windowHandler: &mockWindowUpdater{
1870+
f: func(int) {},
1871+
},
18701872
}
18711873
testData := make([]byte, 1)
18721874
testData[0] = 5
@@ -3163,16 +3165,18 @@ func (s) TestReadMessageHeaderMultipleBuffers(t *testing.T) {
31633165
headerLen := 5
31643166
bytesRead := 0
31653167
s := Stream{
3166-
requestRead: func(int) {},
3168+
readRequester: &fakeReadRequester{},
31673169
}
31683170
s.buf.init()
31693171
recvBuffer := &s.buf
31703172
s.trReader = transportReader{
31713173
reader: recvBufferReader{
31723174
recv: recvBuffer,
31733175
},
3174-
windowHandler: func(i int) {
3175-
bytesRead += i
3176+
windowHandler: &mockWindowUpdater{
3177+
f: func(i int) {
3178+
bytesRead += i
3179+
},
31763180
},
31773181
}
31783182

@@ -3476,3 +3480,16 @@ func (s) TestDeleteStreamMetricsIncrementedOnlyOnce(t *testing.T) {
34763480
})
34773481
}
34783482
}
3483+
3484+
type fakeReadRequester struct {
3485+
}
3486+
3487+
func (f *fakeReadRequester) requestRead(int) {}
3488+
3489+
type mockWindowUpdater struct {
3490+
f func(int)
3491+
}
3492+
3493+
func (m *mockWindowUpdater) updateWindow(n int) {
3494+
m.f(n)
3495+
}

0 commit comments

Comments
 (0)