Skip to content

Commit 27e1277

Browse files
TUN-5142: Add asynchronous servecontrolstream for QUIC
ServeControlStream accidentally became non-blocking in the last quic change causing stream to not be returned until a SIGTERM was received. This change makes ServeControlStream be non-blocking for QUIC streams.
1 parent 6238fd9 commit 27e1277

File tree

4 files changed

+15
-5
lines changed

4 files changed

+15
-5
lines changed

connection/control.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type controlStream struct {
2929

3030
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
3131
type ControlStreamHandler interface {
32-
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error
32+
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, shouldWaitForUnregister bool) error
3333
IsStopped() bool
3434
}
3535

@@ -61,6 +61,7 @@ func (c *controlStream) ServeControlStream(
6161
ctx context.Context,
6262
rw io.ReadWriteCloser,
6363
connOptions *tunnelpogs.ConnectionOptions,
64+
shouldWaitForUnregister bool,
6465
) error {
6566
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
6667
defer rpcClient.Close()
@@ -70,6 +71,16 @@ func (c *controlStream) ServeControlStream(
7071
}
7172
c.connectedFuse.Connected()
7273

74+
if shouldWaitForUnregister {
75+
c.waitForUnregister(ctx, rpcClient)
76+
} else {
77+
go c.waitForUnregister(ctx, rpcClient)
78+
}
79+
80+
return nil
81+
}
82+
83+
func (c *controlStream) waitForUnregister(ctx context.Context, rpcClient NamedTunnelRPCClient) {
7384
// wait for connection termination or start of graceful shutdown
7485
select {
7586
case <-ctx.Done():
@@ -81,7 +92,6 @@ func (c *controlStream) ServeControlStream(
8192
c.observer.sendUnregisteringEvent(c.connIndex)
8293
rpcClient.GracefulShutdown(ctx, c.gracePeriod)
8394
c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
84-
return nil
8595
}
8696

8797
func (c *controlStream) IsStopped() bool {

connection/http2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
109109

110110
switch connType {
111111
case TypeControlStream:
112-
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil {
112+
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, true); err != nil {
113113
c.controlStreamErr = err
114114
c.log.Error().Err(err)
115115
respWriter.WriteErrorResponse()

connection/quic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewQUICConnection(
5757
return nil, errors.Wrap(err, "failed to open a registration stream")
5858
}
5959

60-
err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions)
60+
err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions, false)
6161
if err != nil {
6262
// Not wrapping error here to be consistent with the http2 message.
6363
return nil, err

connection/quic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ type fakeControlStream struct {
183183
ControlStreamHandler
184184
}
185185

186-
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error {
186+
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, shouldWaitForUnregister bool) error {
187187
return nil
188188
}
189189
func (fakeControlStream) IsStopped() bool {

0 commit comments

Comments
 (0)