Skip to content

Commit 1086d5e

Browse files
committed
TUN-5204: Unregister QUIC transports on disconnect
This adds various bug fixes when investigating why QUIC transports were not being unregistered when they should (and only when the graceful shutdown started). Most of these bug fixes are making the QUIC transport implementation closer to its HTTP2 counterpart: - ServeControlStream is now a blocking function (it's up to the transport to handle that) - QUIC transport then handles the control plane as part of its Serve, thus waiting for it on shutdown - QUIC transport now returns "non recoverable" for connections with similar semantics to HTTP2 and H2mux - QUIC transport no longer has a loop around its Serve logic that retries connections on its own (that logic is upstream)
1 parent c314d58 commit 1086d5e

File tree

5 files changed

+66
-67
lines changed

5 files changed

+66
-67
lines changed

connection/control.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ type controlStream struct {
2929

3030
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
3131
type ControlStreamHandler interface {
32-
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, shouldWaitForUnregister bool) error
32+
// ServeControlStream handles the control plane of the transport in the current goroutine calling this
33+
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error
34+
// IsStopped tells whether the method above has finished
3335
IsStopped() bool
3436
}
3537

@@ -61,7 +63,6 @@ func (c *controlStream) ServeControlStream(
6163
ctx context.Context,
6264
rw io.ReadWriteCloser,
6365
connOptions *tunnelpogs.ConnectionOptions,
64-
shouldWaitForUnregister bool,
6566
) error {
6667
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
6768

@@ -71,12 +72,7 @@ func (c *controlStream) ServeControlStream(
7172
}
7273
c.connectedFuse.Connected()
7374

74-
if shouldWaitForUnregister {
75-
c.waitForUnregister(ctx, rpcClient)
76-
} else {
77-
go c.waitForUnregister(ctx, rpcClient)
78-
}
79-
75+
c.waitForUnregister(ctx, rpcClient)
8076
return nil
8177
}
8278

connection/http2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
108108

109109
switch connType {
110110
case TypeControlStream:
111-
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, true); err != nil {
111+
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil {
112112
c.controlStreamErr = err
113113
c.log.Error().Err(err)
114114
respWriter.WriteErrorResponse()

connection/quic.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ type QUICConnection struct {
3939
httpProxy OriginProxy
4040
sessionManager datagramsession.Manager
4141
controlStreamHandler ControlStreamHandler
42+
connOptions *tunnelpogs.ConnectionOptions
4243
}
4344

4445
// NewQUICConnection returns a new instance of QUICConnection.
4546
func NewQUICConnection(
46-
ctx context.Context,
4747
quicConfig *quic.Config,
4848
edgeAddr net.Addr,
4949
tlsConfig *tls.Config,
@@ -57,17 +57,6 @@ func NewQUICConnection(
5757
return nil, fmt.Errorf("failed to dial to edge: %w", err)
5858
}
5959

60-
registrationStream, err := session.OpenStream()
61-
if err != nil {
62-
return nil, fmt.Errorf("failed to open a registration stream: %w", err)
63-
}
64-
65-
err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions, false)
66-
if err != nil {
67-
// Not wrapping error here to be consistent with the http2 message.
68-
return nil, err
69-
}
70-
7160
datagramMuxer, err := quicpogs.NewDatagramMuxer(session)
7261
if err != nil {
7362
return nil, err
@@ -81,18 +70,32 @@ func NewQUICConnection(
8170
logger: logger,
8271
sessionManager: sessionManager,
8372
controlStreamHandler: controlStreamHandler,
73+
connOptions: connOptions,
8474
}, nil
8575
}
8676

8777
// Serve starts a QUIC session that begins accepting streams.
8878
func (q *QUICConnection) Serve(ctx context.Context) error {
79+
// origintunneld assumes the first stream is used for the control plane
80+
controlStream, err := q.session.OpenStream()
81+
if err != nil {
82+
return fmt.Errorf("failed to open a registration control stream: %w", err)
83+
}
84+
8985
// If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
9086
// as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
9187
// connection).
9288
// If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
9389
// other goroutine as fast as possible.
9490
ctx, cancel := context.WithCancel(ctx)
9591
errGroup, ctx := errgroup.WithContext(ctx)
92+
93+
// In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
94+
// stream is already fully registered before the other goroutines can proceed.
95+
errGroup.Go(func() error {
96+
defer cancel()
97+
return q.serveControlStream(ctx, controlStream)
98+
})
9699
errGroup.Go(func() error {
97100
defer cancel()
98101
return q.acceptStream(ctx)
@@ -101,9 +104,21 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
101104
defer cancel()
102105
return q.sessionManager.Serve(ctx)
103106
})
107+
104108
return errGroup.Wait()
105109
}
106110

111+
func (q *QUICConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
112+
// This blocks until the control plane is done.
113+
err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions)
114+
if err != nil {
115+
// Not wrapping error here to be consistent with the http2 message.
116+
return err
117+
}
118+
119+
return nil
120+
}
121+
107122
func (q *QUICConnection) acceptStream(ctx context.Context) error {
108123
defer q.Close()
109124
for {

connection/quic_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func TestQUICServer(t *testing.T) {
154154
)
155155
}()
156156

157-
qc := testQUICConnection(ctx, udpListener.LocalAddr(), t)
157+
qc := testQUICConnection(udpListener.LocalAddr(), t)
158158
go qc.Serve(ctx)
159159

160160
wg.Wait()
@@ -167,7 +167,8 @@ type fakeControlStream struct {
167167
ControlStreamHandler
168168
}
169169

170-
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, shouldWaitForUnregister bool) error {
170+
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error {
171+
<-ctx.Done()
171172
return nil
172173
}
173174
func (fakeControlStream) IsStopped() bool {
@@ -532,7 +533,7 @@ func TestServeUDPSession(t *testing.T) {
532533
edgeQUICSessionChan <- edgeQUICSession
533534
}()
534535

535-
qc := testQUICConnection(ctx, udpListener.LocalAddr(), t)
536+
qc := testQUICConnection(udpListener.LocalAddr(), t)
536537
go qc.Serve(ctx)
537538

538539
edgeQUICSession := <-edgeQUICSessionChan
@@ -645,7 +646,7 @@ func (s mockSessionRPCServer) UnregisterUdpSession(ctx context.Context, sessionI
645646
return nil
646647
}
647648

648-
func testQUICConnection(ctx context.Context, udpListenerAddr net.Addr, t *testing.T) *QUICConnection {
649+
func testQUICConnection(udpListenerAddr net.Addr, t *testing.T) *QUICConnection {
649650
tlsClientConfig := &tls.Config{
650651
InsecureSkipVerify: true,
651652
NextProtos: []string{"argotunnel"},
@@ -654,7 +655,6 @@ func testQUICConnection(ctx context.Context, udpListenerAddr net.Addr, t *testin
654655
originProxy := &mockOriginProxyWithRequest{}
655656
log := zerolog.New(os.Stdout)
656657
qc, err := NewQUICConnection(
657-
ctx,
658658
testQUICConfig,
659659
udpListenerAddr,
660660
tlsClientConfig,

origin/tunnel.go

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,6 @@ const (
3535
quicMaxIdleTimeout = 15 * time.Second
3636
)
3737

38-
type rpcName string
39-
40-
const (
41-
reconnect rpcName = "reconnect"
42-
authenticate rpcName = " authenticate"
43-
)
44-
4538
type TunnelConfig struct {
4639
ConnectionConfig *connection.Config
4740
OSArch string
@@ -535,44 +528,39 @@ func ServeQUIC(
535528
EnableDatagrams: true,
536529
Tracer: quicpogs.NewClientTracer(connLogger.Logger(), connIndex),
537530
}
538-
for {
539-
select {
540-
case <-ctx.Done():
541-
return
542-
default:
543-
quicConn, err := connection.NewQUICConnection(
544-
ctx,
545-
quicConfig,
546-
edgeAddr,
547-
tlsConfig,
548-
config.ConnectionConfig.OriginProxy,
549-
connOptions,
550-
controlStreamHandler,
551-
connLogger.Logger())
552-
if err != nil {
553-
connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection")
554-
return err, true
555-
}
556531

557-
errGroup, serveCtx := errgroup.WithContext(ctx)
558-
errGroup.Go(func() error {
559-
err := quicConn.Serve(serveCtx)
560-
if err != nil {
561-
connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection")
562-
}
563-
return err
564-
})
532+
quicConn, err := connection.NewQUICConnection(
533+
quicConfig,
534+
edgeAddr,
535+
tlsConfig,
536+
config.ConnectionConfig.OriginProxy,
537+
connOptions,
538+
controlStreamHandler,
539+
connLogger.Logger())
540+
if err != nil {
541+
connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection")
542+
return err, true
543+
}
565544

566-
errGroup.Go(func() error {
567-
return listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
568-
})
545+
errGroup, serveCtx := errgroup.WithContext(ctx)
546+
errGroup.Go(func() error {
547+
err := quicConn.Serve(serveCtx)
548+
if err != nil {
549+
connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection")
550+
}
551+
return err
552+
})
569553

570-
err = errGroup.Wait()
571-
if err == nil {
572-
return nil, false
573-
}
554+
errGroup.Go(func() error {
555+
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
556+
if err != nil {
557+
// forcefully break the connection (this is only used for testing)
558+
quicConn.Close()
574559
}
575-
}
560+
return err
561+
})
562+
563+
return errGroup.Wait(), false
576564
}
577565

578566
func listenReconnect(ctx context.Context, reconnectCh <-chan ReconnectSignal, gracefulShutdownCh <-chan struct{}) error {

0 commit comments

Comments
 (0)