Skip to content

Commit 628545d

Browse files
committed
TUN-5600: Close QUIC transports as soon as possible while respecting graceful shutdown
This does a few fixes to make sure that the QUICConnection returns from Serve when the context is cancelled. QUIC transport now behaves like other transports: closes as soon as there is no traffic, or at most by grace-period. Note that we do not wait for UDP traffic since that's connectionless by design.
1 parent ead93e9 commit 628545d

File tree

4 files changed

+33
-17
lines changed

4 files changed

+33
-17
lines changed

connection/quic.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ const (
3434

3535
// QUICConnection represents the type that facilitates Proxying via QUIC streams.
3636
type QUICConnection struct {
37-
session quic.Session
38-
logger *zerolog.Logger
39-
httpProxy OriginProxy
40-
sessionManager datagramsession.Manager
37+
session quic.Session
38+
logger *zerolog.Logger
39+
httpProxy OriginProxy
40+
sessionManager datagramsession.Manager
41+
controlStreamHandler ControlStreamHandler
4142
}
4243

4344
// NewQUICConnection returns a new instance of QUICConnection.
@@ -49,7 +50,7 @@ func NewQUICConnection(
4950
httpProxy OriginProxy,
5051
connOptions *tunnelpogs.ConnectionOptions,
5152
controlStreamHandler ControlStreamHandler,
52-
observer *Observer,
53+
logger *zerolog.Logger,
5354
) (*QUICConnection, error) {
5455
session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig)
5556
if err != nil {
@@ -72,34 +73,44 @@ func NewQUICConnection(
7273
return nil, err
7374
}
7475

75-
sessionManager := datagramsession.NewManager(datagramMuxer, observer.log)
76+
sessionManager := datagramsession.NewManager(datagramMuxer, logger)
7677

7778
return &QUICConnection{
78-
session: session,
79-
httpProxy: httpProxy,
80-
logger: observer.log,
81-
sessionManager: sessionManager,
79+
session: session,
80+
httpProxy: httpProxy,
81+
logger: logger,
82+
sessionManager: sessionManager,
83+
controlStreamHandler: controlStreamHandler,
8284
}, nil
8385
}
8486

8587
// Serve starts a QUIC session that begins accepting streams.
8688
func (q *QUICConnection) Serve(ctx context.Context) error {
89+
// If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
90+
// as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
91+
// connection).
92+
// If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
93+
// other goroutine as fast as possible.
94+
ctx, cancel := context.WithCancel(ctx)
8795
errGroup, ctx := errgroup.WithContext(ctx)
8896
errGroup.Go(func() error {
97+
defer cancel()
8998
return q.acceptStream(ctx)
9099
})
91100
errGroup.Go(func() error {
101+
defer cancel()
92102
return q.sessionManager.Serve(ctx)
93103
})
94104
return errGroup.Wait()
95105
}
96106

97107
func (q *QUICConnection) acceptStream(ctx context.Context) error {
108+
defer q.Close()
98109
for {
99110
stream, err := q.session.AcceptStream(ctx)
100111
if err != nil {
101112
// context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
102-
if errors.Is(err, context.Canceled) {
113+
if errors.Is(err, context.Canceled) || q.controlStreamHandler.IsStopped() {
103114
return nil
104115
}
105116
return fmt.Errorf("failed to accept QUIC stream: %w", err)

connection/quic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ func testQUICConnection(ctx context.Context, udpListenerAddr net.Addr, t *testin
661661
originProxy,
662662
&tunnelpogs.ConnectionOptions{},
663663
fakeControlStream{},
664-
NewObserver(&log, &log, false),
664+
&log,
665665
)
666666
require.NoError(t, err)
667667
return qc

datagramsession/manager.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66

77
"github.com/google/uuid"
8+
"github.com/lucas-clemente/quic-go"
89
"github.com/rs/zerolog"
910
"golang.org/x/sync/errgroup"
1011
)
@@ -50,7 +51,11 @@ func (m *manager) Serve(ctx context.Context) error {
5051
for {
5152
sessionID, payload, err := m.transport.ReceiveFrom()
5253
if err != nil {
53-
return err
54+
if aerr, ok := err.(*quic.ApplicationError); ok && uint64(aerr.ErrorCode) == uint64(quic.NoError) {
55+
return nil
56+
} else {
57+
return err
58+
}
5459
}
5560
datagram := &newDatagram{
5661
sessionID: sessionID,
@@ -69,7 +74,7 @@ func (m *manager) Serve(ctx context.Context) error {
6974
for {
7075
select {
7176
case <-ctx.Done():
72-
return ctx.Err()
77+
return nil
7378
case datagram := <-m.datagramChan:
7479
m.sendToSession(datagram)
7580
case registration := <-m.registrationChan:

origin/tunnel.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -548,19 +548,19 @@ func ServeQUIC(
548548
config.ConnectionConfig.OriginProxy,
549549
connOptions,
550550
controlStreamHandler,
551-
config.Observer)
551+
connLogger.Logger())
552552
if err != nil {
553553
connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection")
554554
return err, true
555555
}
556556

557557
errGroup, serveCtx := errgroup.WithContext(ctx)
558558
errGroup.Go(func() error {
559-
err := quicConn.Serve(ctx)
559+
err := quicConn.Serve(serveCtx)
560560
if err != nil {
561561
connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection")
562562
}
563-
return fmt.Errorf("Connection with edge closed")
563+
return err
564564
})
565565

566566
errGroup.Go(func() error {

0 commit comments

Comments
 (0)