Skip to content

Commit 41dffd7

Browse files
committed
CUSTESC-53681: Correct QUIC connection management for datagram handlers
Corrects the pattern of using errgroup's and context cancellation to simplify the logic for canceling extra routines for the QUIC connection. This is because the extra context cancellation is redundant with the fact that the errgroup also cancels it's own provided context when a routine returns (error or not). For the datagram handler specifically, since it can respond faster to a context cancellation from the QUIC connection, we wrap the error before surfacing it outside of the QUIC connection scope to the supervisor. Additionally, the supervisor will look for this error type to check if it should retry the QUIC connection. These two operations are required because the supervisor does not look for a context canceled error when deciding to retry a connection. If a context canceled from the datagram handler were to be returned up to the supervisor on the initial connection, the cloudflared application would exit. We want to ensure that cloudflared maintains connection attempts even if any of the services on-top of a QUIC connection fail (datagram handler in this case). Additional logging is also introduced along these paths to help with understanding the error conditions from the specific handlers on-top of a QUIC connection. Related CUSTESC-53681 Closes TUN-9610
1 parent 8825cee commit 41dffd7

File tree

9 files changed

+70
-63
lines changed

9 files changed

+70
-63
lines changed

connection/control.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (c *controlStream) ServeControlStream(
8282
tunnelConfigGetter TunnelConfigJSONGetter,
8383
) error {
8484
registrationClient := c.registerClientFunc(ctx, rw, c.registerTimeout)
85-
85+
c.observer.logConnecting(c.connIndex, c.edgeAddress, c.protocol)
8686
registrationDetails, err := registrationClient.RegisterConnection(
8787
ctx,
8888
c.tunnelProperties.Credentials.Auth(),

connection/errors.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package connection
22

33
import (
4-
"github.com/cloudflare/cloudflared/edgediscovery"
54
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
65
)
76

@@ -53,26 +52,26 @@ func serverRegistrationErrorFromRPC(err error) ServerRegisterTunnelError {
5352
}
5453
}
5554

56-
type muxerShutdownError struct{}
55+
type ControlStreamError struct{}
5756

58-
func (e muxerShutdownError) Error() string {
59-
return "muxer shutdown"
57+
var _ error = &ControlStreamError{}
58+
59+
func (e *ControlStreamError) Error() string {
60+
return "control stream encountered a failure while serving"
6061
}
6162

62-
var errMuxerStopped = muxerShutdownError{}
63+
type StreamListenerError struct{}
6364

64-
func isHandshakeErrRecoverable(err error, connIndex uint8, observer *Observer) bool {
65-
log := observer.log.With().
66-
Uint8(LogFieldConnIndex, connIndex).
67-
Err(err).
68-
Logger()
65+
var _ error = &StreamListenerError{}
6966

70-
switch err.(type) {
71-
case edgediscovery.DialError:
72-
log.Error().Msg("Connection unable to dial edge")
73-
default:
74-
log.Error().Msg("Connection failed")
75-
return false
76-
}
77-
return true
67+
func (e *StreamListenerError) Error() string {
68+
return "accept stream listener encountered a failure while serving"
69+
}
70+
71+
type DatagramManagerError struct{}
72+
73+
var _ error = &DatagramManagerError{}
74+
75+
func (e *DatagramManagerError) Error() string {
76+
return "datagram manager encountered a failure while serving"
7877
}

connection/observer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ func (o *Observer) RegisterSink(sink EventSink) {
4646
o.addSinkChan <- sink
4747
}
4848

49+
func (o *Observer) logConnecting(connIndex uint8, address net.IP, protocol Protocol) {
50+
o.log.Debug().
51+
Int(management.EventTypeKey, int(management.Cloudflared)).
52+
Uint8(LogFieldConnIndex, connIndex).
53+
IPAddr(LogFieldIPAddress, address).
54+
Str(LogFieldProtocol, protocol.String()).
55+
Msg("Registering tunnel connection")
56+
}
57+
4958
func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) {
5059
o.log.Info().
5160
Int(management.EventTypeKey, int(management.Cloudflared)).

connection/quic_connection.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package connection
33
import (
44
"bufio"
55
"context"
6+
"errors"
67
"fmt"
78
"io"
89
"net"
@@ -12,7 +13,6 @@ import (
1213
"sync/atomic"
1314
"time"
1415

15-
"github.com/pkg/errors"
1616
"github.com/quic-go/quic-go"
1717
"github.com/rs/zerolog"
1818
"golang.org/x/sync/errgroup"
@@ -65,7 +65,7 @@ func NewTunnelConnection(
6565
streamWriteTimeout time.Duration,
6666
gracePeriod time.Duration,
6767
logger *zerolog.Logger,
68-
) (TunnelConnection, error) {
68+
) TunnelConnection {
6969
return &quicConnection{
7070
conn: conn,
7171
logger: logger,
@@ -77,27 +77,28 @@ func NewTunnelConnection(
7777
rpcTimeout: rpcTimeout,
7878
streamWriteTimeout: streamWriteTimeout,
7979
gracePeriod: gracePeriod,
80-
}, nil
80+
}
8181
}
8282

8383
// Serve starts a QUIC connection that begins accepting streams.
84+
// Returning a nil error means cloudflared will exit for good and will not attempt to reconnect.
8485
func (q *quicConnection) Serve(ctx context.Context) error {
8586
// The edge assumes the first stream is used for the control plane
8687
controlStream, err := q.conn.OpenStream()
8788
if err != nil {
8889
return fmt.Errorf("failed to open a registration control stream: %w", err)
8990
}
9091

91-
// If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
92-
// as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
93-
// connection).
9492
// If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
95-
// other goroutine as fast as possible.
96-
ctx, cancel := context.WithCancel(ctx)
93+
// other goroutines. We enforce returning a not-nil error for each function started in the errgroup by logging
94+
// the error returned and returning a custom error type instead.
9795
errGroup, ctx := errgroup.WithContext(ctx)
9896

99-
// In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
100-
// stream is already fully registered before the other goroutines can proceed.
97+
// Close the quic connection if any of the following routines return from the errgroup (regardless of their error)
98+
// because they are no longer processing requests for the connection.
99+
defer q.Close()
100+
101+
// Start the control stream routine
101102
errGroup.Go(func() error {
102103
// err is equal to nil if we exit due to unregistration. If that happens we want to wait the full
103104
// amount of the grace period, allowing requests to finish before we cancel the context, which will
@@ -114,16 +115,26 @@ func (q *quicConnection) Serve(ctx context.Context) error {
114115
}
115116
}
116117
}
117-
cancel()
118-
return err
118+
if err != nil {
119+
q.logger.Error().Err(err).Msg("failed to serve the control stream")
120+
}
121+
return &ControlStreamError{}
119122
})
123+
// Start the accept stream loop routine
120124
errGroup.Go(func() error {
121-
defer cancel()
122-
return q.acceptStream(ctx)
125+
err := q.acceptStream(ctx)
126+
if err != nil {
127+
q.logger.Error().Err(err).Msg("failed to accept incoming stream requests")
128+
}
129+
return &StreamListenerError{}
123130
})
131+
// Start the datagram handler routine
124132
errGroup.Go(func() error {
125-
defer cancel()
126-
return q.datagramHandler.Serve(ctx)
133+
err := q.datagramHandler.Serve(ctx)
134+
if err != nil {
135+
q.logger.Error().Err(err).Msg("failed to run the datagram handler")
136+
}
137+
return &DatagramManagerError{}
127138
})
128139

129140
return errGroup.Wait()
@@ -140,7 +151,6 @@ func (q *quicConnection) Close() {
140151
}
141152

142153
func (q *quicConnection) acceptStream(ctx context.Context) error {
143-
defer q.Close()
144154
for {
145155
quicStream, err := q.conn.AcceptStream(ctx)
146156
if err != nil {
@@ -230,7 +240,7 @@ func (q *quicConnection) dispatchRequest(ctx context.Context, stream *rpcquic.Re
230240
ConnIndex: q.connIndex,
231241
}), rwa.connectResponseSent
232242
default:
233-
return errors.Errorf("unsupported error type: %s", request.Type), false
243+
return fmt.Errorf("unsupported error type: %s", request.Type), false
234244
}
235245
}
236246

connection/quic_connection_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,7 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8)
847847
&log,
848848
}
849849

850-
tunnelConn, err := NewTunnelConnection(
850+
tunnelConn := NewTunnelConnection(
851851
ctx,
852852
conn,
853853
index,
@@ -860,7 +860,6 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8)
860860
0*time.Second,
861861
&log,
862862
)
863-
require.NoError(t, err)
864863
return tunnelConn, datagramConn
865864
}
866865

connection/quic_datagram_v2.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,24 +98,17 @@ func NewDatagramV2Connection(ctx context.Context,
9898
}
9999

100100
func (d *datagramV2Connection) Serve(ctx context.Context) error {
101-
// If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
102-
// as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
103-
// connection).
104-
// If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
105-
// other goroutine as fast as possible.
106-
ctx, cancel := context.WithCancel(ctx)
101+
// If either goroutine from the errgroup returns at all (error or nil), we rely on its cancellation to make sure
102+
// the other goroutines as well.
107103
errGroup, ctx := errgroup.WithContext(ctx)
108104

109105
errGroup.Go(func() error {
110-
defer cancel()
111106
return d.sessionManager.Serve(ctx)
112107
})
113108
errGroup.Go(func() error {
114-
defer cancel()
115109
return d.datagramMuxer.ServeReceive(ctx)
116110
})
117111
errGroup.Go(func() error {
118-
defer cancel()
119112
return d.packetRouter.Serve(ctx)
120113
})
121114

quic/v3/muxer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (c *datagramConn) Serve(ctx context.Context) error {
175175
// Monitor the context of cloudflared
176176
case <-ctx.Done():
177177
return ctx.Err()
178-
// Monitor the context of the underlying connection
178+
// Monitor the context of the underlying quic connection
179179
case <-connCtx.Done():
180180
return connCtx.Err()
181181
// Monitor for any hard errors from reading the connection

supervisor/supervisor.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ func (s *Supervisor) Run(
132132
if err == errEarlyShutdown {
133133
return nil
134134
}
135+
s.log.Logger().Error().Err(err).Msg("initial tunnel connection failed")
135136
return err
136137
}
137138
var tunnelsWaiting []int
@@ -154,6 +155,7 @@ func (s *Supervisor) Run(
154155
// (note that this may also be caused by context cancellation)
155156
case tunnelError := <-s.tunnelErrors:
156157
tunnelsActive--
158+
s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated")
157159
if tunnelError.err != nil && !shuttingDown {
158160
switch tunnelError.err.(type) {
159161
case ReconnectSignal:
@@ -166,7 +168,6 @@ func (s *Supervisor) Run(
166168
if _, retry := s.tunnelsProtocolFallback[tunnelError.index].GetMaxBackoffDuration(ctx); !retry {
167169
continue
168170
}
169-
s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated")
170171
tunnelsWaiting = append(tunnelsWaiting, tunnelError.index)
171172
s.waitForNextTunnel(tunnelError.index)
172173

@@ -285,7 +286,10 @@ func (s *Supervisor) startFirstTunnel(
285286
*quic.IdleTimeoutError,
286287
*quic.ApplicationError,
287288
edgediscovery.DialError,
288-
*connection.EdgeQuicDialError:
289+
*connection.EdgeQuicDialError,
290+
*connection.ControlStreamError,
291+
*connection.StreamListenerError,
292+
*connection.DatagramManagerError:
289293
// Try again for these types of errors
290294
default:
291295
// Uncaught errors should bail startup
@@ -301,13 +305,9 @@ func (s *Supervisor) startTunnel(
301305
index int,
302306
connectedSignal *signal.Signal,
303307
) {
304-
var err error
305-
defer func() {
306-
s.tunnelErrors <- tunnelError{index: index, err: err}
307-
}()
308-
309308
// nolint: gosec
310-
err = s.edgeTunnelServer.Serve(ctx, uint8(index), s.tunnelsProtocolFallback[index], connectedSignal)
309+
err := s.edgeTunnelServer.Serve(ctx, uint8(index), s.tunnelsProtocolFallback[index], connectedSignal)
310+
s.tunnelErrors <- tunnelError{index: index, err: err}
311311
}
312312

313313
func (s *Supervisor) newConnectedTunnelSignal(index int) *signal.Signal {

supervisor/tunnel.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,7 @@ func (e *EdgeTunnelServer) serveQUIC(
556556
pqMode := connOptions.FeatureSnapshot.PostQuantum
557557
curvePref, err := curvePreference(pqMode, fips.IsFipsEnabled(), tlsConfig.CurvePreferences)
558558
if err != nil {
559+
connLogger.ConnAwareLogger().Err(err).Msgf("failed to get curve preferences")
559560
return err, true
560561
}
561562

@@ -627,7 +628,7 @@ func (e *EdgeTunnelServer) serveQUIC(
627628
}
628629

629630
// Wrap the [quic.Connection] as a TunnelConnection
630-
tunnelConn, err := connection.NewTunnelConnection(
631+
tunnelConn := connection.NewTunnelConnection(
631632
ctx,
632633
conn,
633634
connIndex,
@@ -640,17 +641,13 @@ func (e *EdgeTunnelServer) serveQUIC(
640641
e.config.GracePeriod,
641642
connLogger.Logger(),
642643
)
643-
if err != nil {
644-
connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new tunnel connection")
645-
return err, true
646-
}
647644

648645
// Serve the TunnelConnection
649646
errGroup, serveCtx := errgroup.WithContext(ctx)
650647
errGroup.Go(func() error {
651648
err := tunnelConn.Serve(serveCtx)
652649
if err != nil {
653-
connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve tunnel connection")
650+
connLogger.ConnAwareLogger().Err(err).Msg("failed to serve tunnel connection")
654651
}
655652
return err
656653
})

0 commit comments

Comments
 (0)