Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/transport/client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,11 @@ func (s *ClientStream) TrailersOnly() bool {
func (s *ClientStream) Status() *status.Status {
return s.status
}

func (s *ClientStream) requestRead(n int) {
s.ct.adjustWindow(s, uint32(n))
}

func (s *ClientStream) updateWindow(n int) {
s.ct.updateWindow(s, uint32(n))
}
10 changes: 8 additions & 2 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) e
return err
}

func (ht *serverHandlerTransport) adjustWindow(*ServerStream, uint32) {
}

func (ht *serverHandlerTransport) updateWindow(*ServerStream, uint32) {
}

func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var cancel context.CancelFunc
Expand Down Expand Up @@ -414,7 +420,6 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
Stream: Stream{
id: 0, // irrelevant
ctx: ctx,
requestRead: func(int) {},
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
contentSubtype: ht.contentSubtype,
Expand All @@ -424,9 +429,10 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
s.Stream.buf.init()
s.readRequester = s
s.trReader = transportReader{
reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf},
windowHandler: func(int) {},
windowHandler: s,
}

// readerDone is closed when the Body.Read-ing goroutine exits.
Expand Down
8 changes: 2 additions & 6 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt
}
s.Stream.buf.init()
s.Stream.wq.init(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
s.readRequester = s
// The client side stream context should have exactly the same life cycle with the user provided context.
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
// So we use the original context here instead of creating a copy.
Expand All @@ -509,9 +507,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt
s.Close(err)
},
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
windowHandler: s,
}
return s
}
Expand Down
8 changes: 2 additions & 6 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
t.channelz.SocketMetrics.StreamsStarted.Add(1)
t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
s.readRequester = s
s.ctxDone = s.ctx.Done()
s.Stream.wq.init(defaultWriteQuota, s.ctxDone)
s.trReader = transportReader{
Expand All @@ -650,9 +648,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
ctxDone: s.ctxDone,
recv: &s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
windowHandler: s,
}
// Register the stream with loopy.
t.controlBuf.put(&registerStream{
Expand Down
8 changes: 8 additions & 0 deletions internal/transport/server_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,11 @@ func (s *ServerStream) SetTrailer(md metadata.MD) error {
s.hdrMu.Unlock()
return nil
}

func (s *ServerStream) requestRead(n int) {
s.st.adjustWindow(s, uint32(n))
}

func (s *ServerStream) updateWindow(n int) {
s.st.updateWindow(s, uint32(n))
}
28 changes: 20 additions & 8 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,7 @@ type Stream struct {
recvCompress string
sendCompress string

// Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed.
requestRead func(int)
readRequester readRequester
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to embed some of these things instead so that we don't have to implement the trampoline methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updateWindow method on the new windowHandler interface takes a single parameter, while the corresponding methods on http2Client and http2Server expect a second parameter for the stream itself. The intermediate methods on ServerStream and ClientStream supply this extra parameter before calling the underlying http2 methods.

func (s *ServerStream) updateWindow(n int) {
	// The receiver 's' is passed as the required stream parameter.
	s.st.updateWindow(s, uint32(n))
}

I couldn't find a simple way to remove these one-line wrapper methods without changing the windowHandler interface to accept the extra *Stream parameter. That change would require callers like transportReader to hold a reference to the stream just for this call, which feels like a worse design.


// contentSubtype is the content-subtype for requests.
// this must be lowercase or the behavior is undefined.
Expand All @@ -310,6 +308,12 @@ type Stream struct {
wq writeQuota
}

// readRequester is used to state application's intentions to read data. This
// is used to adjust flow control, if needed.
type readRequester interface {
requestRead(int)
}

func (s *Stream) swapState(st streamState) streamState {
return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
}
Expand Down Expand Up @@ -357,7 +361,7 @@ func (s *Stream) ReadMessageHeader(header []byte) (err error) {
if er := s.trReader.er; er != nil {
return er
}
s.requestRead(len(header))
s.readRequester.requestRead(len(header))
for len(header) != 0 {
n, err := s.trReader.ReadMessageHeader(header)
header = header[n:]
Expand All @@ -380,7 +384,7 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
if er := s.trReader.er; er != nil {
return nil, er
}
s.requestRead(n)
s.readRequester.requestRead(n)
for n != 0 {
buf, err := s.trReader.Read(n)
var bufLen int
Expand Down Expand Up @@ -422,18 +426,24 @@ type transportReader struct {
_ noCopy
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
windowHandler func(int)
windowHandler windowHandler
er error
reader recvBufferReader
}

// The handler to control the window update procedure for both this
// particular stream and the associated transport.
type windowHandler interface {
updateWindow(int)
}

func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
n, err := t.reader.ReadMessageHeader(header)
if err != nil {
t.er = err
return 0, err
}
t.windowHandler(n)
t.windowHandler.updateWindow(n)
return n, nil
}

Expand All @@ -443,7 +453,7 @@ func (t *transportReader) Read(n int) (mem.Buffer, error) {
t.er = err
return buf, err
}
t.windowHandler(buf.Len())
t.windowHandler.updateWindow(buf.Len())
return buf, nil
}

Expand Down Expand Up @@ -629,6 +639,8 @@ type internalServerTransport interface {
write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
writeStatus(s *ServerStream, st *status.Status) error
incrMsgRecv()
adjustWindow(s *ServerStream, n uint32)
updateWindow(s *ServerStream, n uint32)
}

// connectionErrorf creates an ConnectionError with the specified error description.
Expand Down
29 changes: 23 additions & 6 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1854,8 +1854,8 @@ func (s) TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
s := &Stream{
ctx: ctx,
requestRead: func(int) {},
ctx: ctx,
readRequester: &fakeReadRequester{},
}
s.buf.init()
s.trReader = transportReader{
Expand All @@ -1864,7 +1864,9 @@ func (s) TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) {
ctxDone: s.ctx.Done(),
recv: &s.buf,
},
windowHandler: func(int) {},
windowHandler: &mockWindowUpdater{
f: func(int) {},
},
}
testData := make([]byte, 1)
testData[0] = 5
Expand Down Expand Up @@ -3069,16 +3071,18 @@ func (s) TestReadMessageHeaderMultipleBuffers(t *testing.T) {
headerLen := 5
bytesRead := 0
s := Stream{
requestRead: func(int) {},
readRequester: &fakeReadRequester{},
}
s.buf.init()
recvBuffer := &s.buf
s.trReader = transportReader{
reader: recvBufferReader{
recv: recvBuffer,
},
windowHandler: func(i int) {
bytesRead += i
windowHandler: &mockWindowUpdater{
f: func(i int) {
bytesRead += i
},
},
}

Expand Down Expand Up @@ -3260,3 +3264,16 @@ func (s) TestClientTransport_Handle1xxHeaders(t *testing.T) {
})
}
}

type fakeReadRequester struct {
}

func (f *fakeReadRequester) requestRead(int) {}

type mockWindowUpdater struct {
f func(int)
}

func (m *mockWindowUpdater) updateWindow(n int) {
m.f(n)
}