Skip to content

Commit 55455d7

Browse files
committed
allow stream cancellation when blocked on flow control
1 parent 31a6f26 commit 55455d7

File tree

2 files changed

+81
-4
lines changed

2 files changed

+81
-4
lines changed

internal/transport/http2_server.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,10 +1353,7 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
13531353
// called to interrupt the potential blocking on other goroutines.
13541354
s.cancel()
13551355

1356-
oldState := s.swapState(streamDone)
1357-
if oldState == streamDone {
1358-
return
1359-
}
1356+
s.swapState(streamDone)
13601357
t.deleteStream(s, eosReceived)
13611358

13621359
t.controlBuf.put(&cleanupStream{

test/transport_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net"
2525
"sync"
2626
"testing"
27+
"time"
2728

2829
"golang.org/x/net/http2"
2930
"google.golang.org/grpc"
@@ -229,3 +230,82 @@ func (s) TestRSTDuringMessageRead(t *testing.T) {
229230
t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled)
230231
}
231232
}
233+
234+
// Test verifies that a client-side cancellation correctly frees up resources on
235+
// the server. The test setup is designed to simulate a scenario where a server
236+
// is blocked from sending a large message due to a full client-side flow
237+
// control window. The client-side cancellation of this blocked RPC then frees
238+
// up the max concurrent streams quota on the server, allowing a new RPC to be
239+
// created successfully.
240+
func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) {
241+
serverDoneCh := make(chan struct{}, 2)
242+
const flowControlWindowSize = 65535
243+
ss := &stubserver.StubServer{
244+
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
245+
// Send a large message to exhaust the client's flow control window.
246+
stream.Send(&testpb.StreamingOutputCallResponse{
247+
Payload: &testpb.Payload{
248+
Body: make([]byte, flowControlWindowSize+1),
249+
},
250+
})
251+
serverDoneCh <- struct{}{}
252+
return nil
253+
},
254+
}
255+
256+
// Create a server that allows only 1 stream at a time.
257+
ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1))
258+
defer ss.Stop()
259+
// Use a static flow control window.
260+
if err := ss.StartClient(grpc.WithStaticStreamWindowSize(flowControlWindowSize)); err != nil {
261+
t.Fatalf("Error while start test service client: %v", err)
262+
}
263+
client := ss.Client
264+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
265+
defer cancel()
266+
267+
streamCtx, streamCancel := context.WithCancel(ctx)
268+
defer streamCancel()
269+
270+
if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil {
271+
t.Fatalf("Failed to create server streaming RPC: %v", err)
272+
}
273+
274+
// Wait for the server handler to return. This should cause the trailers to
275+
// be buffered on the server, waiting for flow control quota to first send
276+
// the data frame.
277+
select {
278+
case <-ctx.Done():
279+
t.Fatal("Context timed out waiting for server handler to return.")
280+
case <-serverDoneCh:
281+
}
282+
283+
// Attempt to create a stream. It should fail since the previous stream is
284+
// still blocked.
285+
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
286+
defer shortCancel()
287+
_, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{})
288+
if status.Code(err) != codes.DeadlineExceeded {
289+
t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded)
290+
}
291+
292+
// Cancel the RPC, this should free up concurrent stream quota on the
293+
// server.
294+
streamCancel()
295+
296+
// Attempt to create another stream.
297+
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
298+
stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
299+
if err != nil {
300+
t.Fatalf("Failed to create server streaming RPC: %v", err)
301+
}
302+
_, err = stream.Recv()
303+
if err == nil {
304+
return
305+
}
306+
}
307+
308+
if ctx.Err() != nil {
309+
t.Fatal("Context timed out waiting for stream quota to replenish.")
310+
}
311+
}

0 commit comments

Comments
 (0)