From 358937a4be0c3d86e66040aff4651ca1099e5033 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 6 Oct 2025 11:10:43 +0530 Subject: [PATCH 1/2] replace closures with interfaces to avoid heap allocations --- internal/transport/client_stream.go | 8 +++++++ internal/transport/handler_server.go | 10 +++++++-- internal/transport/http2_client.go | 8 ++----- internal/transport/http2_server.go | 8 ++----- internal/transport/server_stream.go | 8 +++++++ internal/transport/transport.go | 32 ++++++++++++++++++---------- internal/transport/transport_test.go | 31 +++++++++++++++++++++------ 7 files changed, 73 insertions(+), 32 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index ccc0e017e5e5..31087b5cee14 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -142,3 +142,11 @@ func (s *ClientStream) TrailersOnly() bool { func (s *ClientStream) Status() *status.Status { return s.status } + +func (s *ClientStream) requestRead(n int) { + s.ct.adjustWindow(s, uint32(n)) +} + +func (s *ClientStream) updateWindow(n int) { + s.ct.updateWindow(s, uint32(n)) +} diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index d954a64c38f4..91e40b241016 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -387,6 +387,12 @@ func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) e return err } +func (ht *serverHandlerTransport) adjustWindow(*ServerStream, uint32) { +} + +func (ht *serverHandlerTransport) updateWindow(*ServerStream, uint32) { +} + func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) { // With this transport type there will be exactly 1 stream: this HTTP request. var cancel context.CancelFunc @@ -414,7 +420,6 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream Stream: &Stream{ id: 0, // irrelevant ctx: ctx, - requestRead: func(int) {}, buf: newRecvBuffer(), method: req.URL.Path, recvCompress: req.Header.Get("grpc-encoding"), @@ -424,9 +429,10 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream st: ht, headerWireLength: 0, // won't have access to header wire length until golang/go#18997. } + s.readRequester = s s.trReader = &transportReader{ reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf}, - windowHandler: func(int) {}, + windowHandler: s, } // readerDone is closed when the Body.Read-ing goroutine exits. diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index e06771c3397f..9dea287a944b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -493,9 +493,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt doneFunc: callHdr.DoneFunc, } s.wq = newWriteQuota(defaultWriteQuota, s.done) - s.requestRead = func(n int) { - t.adjustWindow(s, uint32(n)) - } + s.readRequester = s // The client side stream context should have exactly the same life cycle with the user provided context. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. // So we use the original context here instead of creating a copy. @@ -509,9 +507,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt s.Close(err) }, }, - windowHandler: func(n int) { - t.updateWindow(s, uint32(n)) - }, + windowHandler: s, } return s } diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 83cee314c8f4..5ac40b04b7ee 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -640,9 +640,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade t.channelz.SocketMetrics.StreamsStarted.Add(1) t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano()) } - s.requestRead = func(n int) { - t.adjustWindow(s, uint32(n)) - } + s.readRequester = s s.ctxDone = s.ctx.Done() s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) s.trReader = &transportReader{ @@ -651,9 +649,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade ctxDone: s.ctxDone, recv: s.buf, }, - windowHandler: func(n int) { - t.updateWindow(s, uint32(n)) - }, + windowHandler: s, } // Register the stream with loopy. t.controlBuf.put(®isterStream{ diff --git a/internal/transport/server_stream.go b/internal/transport/server_stream.go index cf8da0b52d0a..a84160fc18ca 100644 --- a/internal/transport/server_stream.go +++ b/internal/transport/server_stream.go @@ -178,3 +178,11 @@ func (s *ServerStream) SetTrailer(md metadata.MD) error { s.hdrMu.Unlock() return nil } + +func (s *ServerStream) requestRead(n int) { + s.st.adjustWindow(s, uint32(n)) +} + +func (s *ServerStream) updateWindow(n int) { + s.st.updateWindow(s, uint32(n)) +} diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 7dd53e80a75b..190e1b03b949 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -295,9 +295,7 @@ type Stream struct { fc *inFlow wq *writeQuota - // Callback to state application's intentions to read data. This - // is used to adjust flow control, if needed. - requestRead func(int) + readRequester readRequester state streamState @@ -308,6 +306,12 @@ type Stream struct { trailer metadata.MD // the key-value map of trailer metadata. } +// readRequester is used to state application's intentions to read data. This +// is used to adjust flow control, if needed. +type readRequester interface { + requestRead(int) +} + func (s *Stream) swapState(st streamState) streamState { return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) } @@ -355,7 +359,7 @@ func (s *Stream) ReadMessageHeader(header []byte) (err error) { if er := s.trReader.er; er != nil { return er } - s.requestRead(len(header)) + s.readRequester.requestRead(len(header)) for len(header) != 0 { n, err := s.trReader.ReadMessageHeader(header) header = header[n:] @@ -378,7 +382,7 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) { if er := s.trReader.er; er != nil { return nil, er } - s.requestRead(n) + s.readRequester.requestRead(n) for n != 0 { buf, err := s.trReader.Read(n) var bufLen int @@ -406,20 +410,24 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) { // The error is io.EOF when the stream is done or another non-nil error if // the stream broke. type transportReader struct { - reader *recvBufferReader - // The handler to control the window update procedure for both this - // particular stream and the associated transport. - windowHandler func(int) + reader *recvBufferReader + windowHandler windowHandler er error } +// The handler to control the window update procedure for both this +// particular stream and the associated transport. +type windowHandler interface { + updateWindow(int) +} + func (t *transportReader) ReadMessageHeader(header []byte) (int, error) { n, err := t.reader.ReadMessageHeader(header) if err != nil { t.er = err return 0, err } - t.windowHandler(n) + t.windowHandler.updateWindow(n) return n, nil } @@ -429,7 +437,7 @@ func (t *transportReader) Read(n int) (mem.Buffer, error) { t.er = err return buf, err } - t.windowHandler(buf.Len()) + t.windowHandler.updateWindow(buf.Len()) return buf, nil } @@ -615,6 +623,8 @@ type internalServerTransport interface { write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error writeStatus(s *ServerStream, st *status.Status) error incrMsgRecv() + adjustWindow(s *ServerStream, n uint32) + updateWindow(s *ServerStream, n uint32) } // connectionErrorf creates an ConnectionError with the specified error description. diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 0b0396b0ab37..ee8ec552a9c4 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -1858,9 +1858,9 @@ func (s) TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) { defer cancel() testRecvBuffer := newRecvBuffer() s := &Stream{ - ctx: ctx, - buf: testRecvBuffer, - requestRead: func(int) {}, + ctx: ctx, + buf: testRecvBuffer, + readRequester: &fakeReadRequester{}, } s.trReader = &transportReader{ reader: &recvBufferReader{ @@ -1868,7 +1868,9 @@ func (s) TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) { ctxDone: s.ctx.Done(), recv: s.buf, }, - windowHandler: func(int) {}, + windowHandler: &mockWindowUpdater{ + f: func(int) {}, + }, } testData := make([]byte, 1) testData[0] = 5 @@ -3076,13 +3078,15 @@ func (s) TestReadMessageHeaderMultipleBuffers(t *testing.T) { recvBuffer.put(recvMsg{buffer: make(mem.SliceBuffer, headerLen-3)}) bytesRead := 0 s := Stream{ - requestRead: func(int) {}, + readRequester: &fakeReadRequester{}, trReader: &transportReader{ reader: &recvBufferReader{ recv: recvBuffer, }, - windowHandler: func(i int) { - bytesRead += i + windowHandler: &mockWindowUpdater{ + f: func(i int) { + bytesRead += i + }, }, }, } @@ -3262,3 +3266,16 @@ func (s) TestClientTransport_Handle1xxHeaders(t *testing.T) { }) } } + +type fakeReadRequester struct { +} + +func (f *fakeReadRequester) requestRead(int) {} + +type mockWindowUpdater struct { + f func(int) +} + +func (m *mockWindowUpdater) updateWindow(n int) { + m.f(n) +} From 9fee0451784712cd8fa20abab2b9c75f8e589c74 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 7 Oct 2025 15:31:12 +0530 Subject: [PATCH 2/2] resolve conflicts --- internal/transport/handler_server.go | 2 +- internal/transport/http2_client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index aaec4f216c30..80ef2d00fbe7 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -428,12 +428,12 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream st: ht, headerWireLength: 0, // won't have access to header wire length until golang/go#18997. } + s.Stream.buf.init() s.readRequester = s s.trReader = transportReader{ reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf}, windowHandler: s, } - s.Stream.buf.init() // readerDone is closed when the Body.Read-ing goroutine exits. readerDone := make(chan struct{}) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index ebeffbe7ad39..4a92928f5ff2 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -491,9 +491,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt headerChan: make(chan struct{}), doneFunc: callHdr.DoneFunc, } - s.readRequester = s s.Stream.buf.init() s.Stream.wq.init(defaultWriteQuota, s.done) + s.readRequester = s // The client side stream context should have exactly the same life cycle with the user provided context. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. // So we use the original context here instead of creating a copy.