Skip to content

Commit fd14bf4

Browse files
TUN-5118: Quic connection now detects duplicate connections similar to http2
1 parent e2b1836 commit fd14bf4

File tree

3 files changed

+110
-70
lines changed

3 files changed

+110
-70
lines changed

connection/quic.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ func NewQUICConnection(
5757
return nil, errors.Wrap(err, "failed to open a registration stream")
5858
}
5959

60-
go controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions)
60+
err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions)
61+
if err != nil {
62+
// Not wrapping error here to be consistent with the http2 message.
63+
return nil, err
64+
}
6165

6266
return &QUICConnection{
6367
session: session,
@@ -74,6 +78,10 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
7478
for {
7579
stream, err := q.session.AcceptStream(ctx)
7680
if err != nil {
81+
// context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
82+
if errors.Is(err, context.Canceled) {
83+
return nil
84+
}
7785
return errors.Wrap(err, "failed to accept QUIC stream")
7886
}
7987
go func() {

connection/quic_test.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"os"
1717
"sync"
1818
"testing"
19-
"time"
2019

2120
"github.com/gobwas/ws/wsutil"
2221
"github.com/lucas-clemente/quic-go"
@@ -27,6 +26,7 @@ import (
2726

2827
quicpogs "github.com/cloudflare/cloudflared/quic"
2928
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
29+
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
3030
)
3131

3232
// TestQUICServer tests if a quic server accepts and responds to a quic client with the acceptance protocol.
@@ -158,21 +158,7 @@ func TestQUICServer(t *testing.T) {
158158
)
159159
}()
160160

161-
rpcClientFactory := mockRPCClientFactory{
162-
registered: make(chan struct{}),
163-
unregistered: make(chan struct{}),
164-
}
165-
166-
obs := NewObserver(&log, &log, false)
167-
controlStream := NewControlStream(
168-
obs,
169-
mockConnectedFuse{},
170-
&NamedTunnelConfig{},
171-
1,
172-
rpcClientFactory.newMockRPCClient,
173-
nil,
174-
1*time.Second,
175-
)
161+
controlStream := fakeControlStream{}
176162

177163
qC, err := NewQUICConnection(
178164
ctx,
@@ -188,17 +174,20 @@ func TestQUICServer(t *testing.T) {
188174
go qC.Serve(ctx)
189175

190176
wg.Wait()
191-
192-
select {
193-
case <-rpcClientFactory.registered:
194-
break //ok
195-
case <-time.Tick(time.Second):
196-
t.Fatal("timeout out waiting for registration")
197-
}
198177
cancel()
199178
})
200179
}
180+
}
201181

182+
type fakeControlStream struct {
183+
ControlStreamHandler
184+
}
185+
186+
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error {
187+
return nil
188+
}
189+
func (fakeControlStream) IsStopped() bool {
190+
return true
202191
}
203192

204193
func quicServer(

origin/tunnel.go

Lines changed: 89 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
dialTimeout = 15 * time.Second
3131
FeatureSerializedHeaders = "serialized_headers"
3232
FeatureQuickReconnects = "quick_reconnects"
33+
quicHandshakeIdleTimeout = 5 * time.Second
34+
quicMaxIdleTimeout = 15 * time.Second
3335
)
3436

3537
type rpcName string
@@ -271,12 +273,73 @@ func ServeTunnel(
271273
}()
272274

273275
defer config.Observer.SendDisconnect(connIndex)
276+
err, recoverable = serveTunnel(
277+
ctx,
278+
connLog,
279+
credentialManager,
280+
config,
281+
addr,
282+
connIndex,
283+
fuse,
284+
backoff,
285+
cloudflaredUUID,
286+
reconnectCh,
287+
protocol,
288+
gracefulShutdownC,
289+
)
290+
291+
if err != nil {
292+
switch err := err.(type) {
293+
case connection.DupConnRegisterTunnelError:
294+
connLog.Err(err).Msg("Unable to establish connection.")
295+
// don't retry this connection anymore, let supervisor pick a new address
296+
return err, false
297+
case connection.ServerRegisterTunnelError:
298+
connLog.Err(err).Msg("Register tunnel error from server side")
299+
// Don't send registration error return from server to Sentry. They are
300+
// logged on server side
301+
if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
302+
connLog.Error().Msg(activeIncidentsMsg(incidents))
303+
}
304+
return err.Cause, !err.Permanent
305+
case ReconnectSignal:
306+
connLog.Info().
307+
Uint8(connection.LogFieldConnIndex, connIndex).
308+
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
309+
err.DelayBeforeReconnect()
310+
return err, true
311+
default:
312+
if err == context.Canceled {
313+
connLog.Debug().Err(err).Msgf("Serve tunnel error")
314+
return err, false
315+
}
316+
connLog.Err(err).Msgf("Serve tunnel error")
317+
_, permanent := err.(unrecoverableError)
318+
return err, !permanent
319+
}
320+
}
321+
return nil, false
322+
}
323+
324+
func serveTunnel(
325+
ctx context.Context,
326+
connLog *zerolog.Logger,
327+
credentialManager *reconnectCredentialManager,
328+
config *TunnelConfig,
329+
addr *allregions.EdgeAddr,
330+
connIndex uint8,
331+
fuse *h2mux.BooleanFuse,
332+
backoff *protocolFallback,
333+
cloudflaredUUID uuid.UUID,
334+
reconnectCh chan ReconnectSignal,
335+
protocol connection.Protocol,
336+
gracefulShutdownC <-chan struct{},
337+
) (err error, recoverable bool) {
274338

275339
connectedFuse := &connectedFuse{
276340
fuse: fuse,
277341
backoff: backoff,
278342
}
279-
280343
controlStream := connection.NewControlStream(
281344
config.Observer,
282345
connectedFuse,
@@ -287,7 +350,8 @@ func ServeTunnel(
287350
config.ConnectionConfig.GracePeriod,
288351
)
289352

290-
if protocol == connection.QUIC {
353+
switch protocol {
354+
case connection.QUIC:
291355
connOptions := config.ConnectionOptions(addr.UDP.String(), uint8(backoff.Retries()))
292356
return ServeQUIC(ctx,
293357
addr.UDP,
@@ -297,17 +361,16 @@ func ServeTunnel(
297361
connectedFuse,
298362
reconnectCh,
299363
gracefulShutdownC)
300-
}
301364

302-
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
303-
if err != nil {
304-
connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge")
305-
return err, true
306-
}
365+
case connection.HTTP2:
366+
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
367+
if err != nil {
368+
connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge")
369+
return err, true
370+
}
307371

308-
if protocol == connection.HTTP2 {
309372
connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries()))
310-
err = ServeHTTP2(
373+
if err := ServeHTTP2(
311374
ctx,
312375
connLog,
313376
config,
@@ -317,9 +380,18 @@ func ServeTunnel(
317380
connIndex,
318381
gracefulShutdownC,
319382
reconnectCh,
320-
)
321-
} else {
322-
err = ServeH2mux(
383+
); err != nil {
384+
return err, false
385+
}
386+
387+
default:
388+
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
389+
if err != nil {
390+
connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge")
391+
return err, true
392+
}
393+
394+
if err := ServeH2mux(
323395
ctx,
324396
connLog,
325397
credentialManager,
@@ -330,40 +402,11 @@ func ServeTunnel(
330402
cloudflaredUUID,
331403
reconnectCh,
332404
gracefulShutdownC,
333-
)
334-
}
335-
336-
if err != nil {
337-
switch err := err.(type) {
338-
case connection.DupConnRegisterTunnelError:
339-
connLog.Err(err).Msg("Unable to establish connection.")
340-
// don't retry this connection anymore, let supervisor pick a new address
405+
); err != nil {
341406
return err, false
342-
case connection.ServerRegisterTunnelError:
343-
connLog.Err(err).Msg("Register tunnel error from server side")
344-
// Don't send registration error return from server to Sentry. They are
345-
// logged on server side
346-
if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
347-
connLog.Error().Msg(activeIncidentsMsg(incidents))
348-
}
349-
return err.Cause, !err.Permanent
350-
case ReconnectSignal:
351-
connLog.Info().
352-
Uint8(connection.LogFieldConnIndex, connIndex).
353-
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
354-
err.DelayBeforeReconnect()
355-
return err, true
356-
default:
357-
if err == context.Canceled {
358-
connLog.Debug().Err(err).Msgf("Serve tunnel error")
359-
return err, false
360-
}
361-
connLog.Err(err).Msgf("Serve tunnel error")
362-
_, permanent := err.(unrecoverableError)
363-
return err, !permanent
364407
}
365408
}
366-
return nil, false
409+
return
367410
}
368411

369412
type unrecoverableError struct {
@@ -472,7 +515,8 @@ func ServeQUIC(
472515
) (err error, recoverable bool) {
473516
tlsConfig := config.EdgeTLSConfigs[connection.QUIC]
474517
quicConfig := &quic.Config{
475-
HandshakeIdleTimeout: time.Second * 10,
518+
HandshakeIdleTimeout: quicHandshakeIdleTimeout,
519+
MaxIdleTimeout: quicMaxIdleTimeout,
476520
KeepAlive: true,
477521
}
478522
for {
@@ -511,7 +555,6 @@ func ServeQUIC(
511555
if err == nil {
512556
return nil, false
513557
}
514-
config.Log.Info().Msg("Reconnecting with the same udp conn")
515558
}
516559
}
517560
}

0 commit comments

Comments
 (0)