Skip to content

Commit dd540af

Browse files
committed
TUN-6388: Fix first tunnel connection not retrying
1 parent e921ab3 commit dd540af

File tree

6 files changed

+89
-30
lines changed

6 files changed

+89
-30
lines changed

connection/control.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package connection
33
import (
44
"context"
55
"io"
6+
"net"
67
"time"
78

89
"github.com/rs/zerolog"
@@ -19,6 +20,7 @@ type controlStream struct {
1920
connectedFuse ConnectedFuse
2021
namedTunnelProperties *NamedTunnelProperties
2122
connIndex uint8
23+
edgeAddress net.IP
2224

2325
newRPCClientFunc RPCClientFunc
2426

@@ -45,6 +47,7 @@ func NewControlStream(
4547
connectedFuse ConnectedFuse,
4648
namedTunnelConfig *NamedTunnelProperties,
4749
connIndex uint8,
50+
edgeAddress net.IP,
4851
newRPCClientFunc RPCClientFunc,
4952
gracefulShutdownC <-chan struct{},
5053
gracePeriod time.Duration,
@@ -58,6 +61,7 @@ func NewControlStream(
5861
namedTunnelProperties: namedTunnelConfig,
5962
newRPCClientFunc: newRPCClientFunc,
6063
connIndex: connIndex,
64+
edgeAddress: edgeAddress,
6165
gracefulShutdownC: gracefulShutdownC,
6266
gracePeriod: gracePeriod,
6367
}
@@ -71,7 +75,7 @@ func (c *controlStream) ServeControlStream(
7175
) error {
7276
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
7377

74-
registrationDetails, err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.observer)
78+
registrationDetails, err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.edgeAddress, c.observer)
7579
if err != nil {
7680
rpcClient.Close()
7781
return err

connection/http2_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func newTestHTTP2Connection() (*HTTP2Connection, net.Conn) {
4141
connIndex,
4242
nil,
4343
nil,
44+
nil,
4445
1*time.Second,
4546
)
4647
return NewHTTP2Connection(
@@ -176,6 +177,7 @@ func (mc mockNamedTunnelRPCClient) RegisterConnection(
176177
properties *NamedTunnelProperties,
177178
options *tunnelpogs.ConnectionOptions,
178179
connIndex uint8,
180+
edgeAddress net.IP,
179181
observer *Observer,
180182
) (*tunnelpogs.ConnectionDetails, error) {
181183
if mc.shouldFail != nil {
@@ -360,6 +362,7 @@ func TestServeControlStream(t *testing.T) {
360362
mockConnectedFuse{},
361363
&NamedTunnelProperties{},
362364
1,
365+
nil,
363366
rpcClientFactory.newMockRPCClient,
364367
nil,
365368
1*time.Second,
@@ -410,6 +413,7 @@ func TestFailRegistration(t *testing.T) {
410413
mockConnectedFuse{},
411414
&NamedTunnelProperties{},
412415
http2Conn.connIndex,
416+
nil,
413417
rpcClientFactory.newMockRPCClient,
414418
nil,
415419
1*time.Second,
@@ -456,6 +460,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
456460
mockConnectedFuse{},
457461
&NamedTunnelProperties{},
458462
http2Conn.connIndex,
463+
nil,
459464
rpcClientFactory.newMockRPCClient,
460465
shutdownC,
461466
1*time.Second,

connection/rpc.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type NamedTunnelRPCClient interface {
5858
config *NamedTunnelProperties,
5959
options *tunnelpogs.ConnectionOptions,
6060
connIndex uint8,
61+
edgeAddress net.IP,
6162
observer *Observer,
6263
) (*tunnelpogs.ConnectionDetails, error)
6364
SendLocalConfiguration(
@@ -95,6 +96,7 @@ func (rsc *registrationServerClient) RegisterConnection(
9596
properties *NamedTunnelProperties,
9697
options *tunnelpogs.ConnectionOptions,
9798
connIndex uint8,
99+
edgeAddress net.IP,
98100
observer *Observer,
99101
) (*tunnelpogs.ConnectionDetails, error) {
100102
conn, err := rsc.client.RegisterConnection(
@@ -115,7 +117,7 @@ func (rsc *registrationServerClient) RegisterConnection(
115117

116118
observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
117119

118-
observer.logServerInfo(connIndex, conn.Location, options.OriginLocalIP, fmt.Sprintf("Connection %s registered", conn.UUID))
120+
observer.logServerInfo(connIndex, conn.Location, edgeAddress, fmt.Sprintf("Connection %s registered", conn.UUID))
119121
observer.sendConnectedEvent(connIndex, conn.Location)
120122

121123
return conn, nil
@@ -291,7 +293,7 @@ func (h *h2muxConnection) registerNamedTunnel(
291293
rpcClient := h.newRPCClientFunc(ctx, stream, h.observer.log)
292294
defer rpcClient.Close()
293295

294-
if _, err = rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, h.observer); err != nil {
296+
if _, err = rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, nil, h.observer); err != nil {
295297
return err
296298
}
297299
return nil

edgediscovery/edgediscovery.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package edgediscovery
22

33
import (
4-
"fmt"
54
"sync"
65

76
"github.com/rs/zerolog"
@@ -14,7 +13,13 @@ const (
1413
LogFieldIPAddress = "ip"
1514
)
1615

17-
var ErrNoAddressesLeft = fmt.Errorf("there are no free edge addresses left")
16+
var errNoAddressesLeft = ErrNoAddressesLeft{}
17+
18+
type ErrNoAddressesLeft struct{}
19+
20+
func (e ErrNoAddressesLeft) Error() string {
21+
return "there are no free edge addresses left to resolve to"
22+
}
1823

1924
// Edge finds addresses on the Cloudflare edge and hands them out to connections.
2025
type Edge struct {
@@ -62,7 +67,7 @@ func (ed *Edge) GetAddrForRPC() (*allregions.EdgeAddr, error) {
6267
defer ed.Unlock()
6368
addr := ed.regions.GetAnyAddress()
6469
if addr == nil {
65-
return nil, ErrNoAddressesLeft
70+
return nil, errNoAddressesLeft
6671
}
6772
return addr, nil
6873
}
@@ -83,7 +88,7 @@ func (ed *Edge) GetAddr(connIndex int) (*allregions.EdgeAddr, error) {
8388
addr := ed.regions.GetUnusedAddr(nil, connIndex)
8489
if addr == nil {
8590
log.Debug().Msg("edgediscovery - GetAddr: No addresses left to give proxy connection")
86-
return nil, ErrNoAddressesLeft
91+
return nil, errNoAddressesLeft
8792
}
8893
log = ed.log.With().
8994
Int(LogFieldConnIndex, connIndex).
@@ -107,7 +112,7 @@ func (ed *Edge) GetDifferentAddr(connIndex int, hasConnectivityError bool) (*all
107112
if addr == nil {
108113
log.Debug().Msg("edgediscovery - GetDifferentAddr: No addresses left to give proxy connection")
109114
// note: if oldAddr were not nil, it will become available on the next iteration
110-
return nil, ErrNoAddressesLeft
115+
return nil, errNoAddressesLeft
111116
}
112117
log = ed.log.With().
113118
Int(LogFieldConnIndex, connIndex).

supervisor/supervisor.go

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strings"
78
"time"
89

910
"github.com/google/uuid"
11+
"github.com/lucas-clemente/quic-go"
1012
"github.com/rs/zerolog"
1113

1214
"github.com/cloudflare/cloudflared/connection"
@@ -37,13 +39,14 @@ const (
3739
// Supervisor manages non-declarative tunnels. Establishes TCP connections with the edge, and
3840
// reconnects them if they disconnect.
3941
type Supervisor struct {
40-
cloudflaredUUID uuid.UUID
41-
config *TunnelConfig
42-
orchestrator *orchestration.Orchestrator
43-
edgeIPs *edgediscovery.Edge
44-
edgeTunnelServer EdgeTunnelServer
45-
tunnelErrors chan tunnelError
46-
tunnelsConnecting map[int]chan struct{}
42+
cloudflaredUUID uuid.UUID
43+
config *TunnelConfig
44+
orchestrator *orchestration.Orchestrator
45+
edgeIPs *edgediscovery.Edge
46+
edgeTunnelServer EdgeTunnelServer
47+
tunnelErrors chan tunnelError
48+
tunnelsConnecting map[int]chan struct{}
49+
tunnelsProtocolFallback map[int]*protocolFallback
4750
// nextConnectedIndex and nextConnectedSignal are used to wait for all
4851
// currently-connecting tunnels to finish connecting so we can reset backoff timer
4952
nextConnectedIndex int
@@ -72,8 +75,10 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
7275
return nil, fmt.Errorf("failed to generate cloudflared instance ID: %w", err)
7376
}
7477

78+
isStaticEdge := len(config.EdgeAddrs) > 0
79+
7580
var edgeIPs *edgediscovery.Edge
76-
if len(config.EdgeAddrs) > 0 {
81+
if isStaticEdge { // static edge addresses
7782
edgeIPs, err = edgediscovery.StaticEdge(config.Log, config.EdgeAddrs)
7883
} else {
7984
edgeIPs, err = edgediscovery.ResolveEdge(config.Log, config.Region, config.EdgeIPVersion)
@@ -86,7 +91,9 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
8691
log := NewConnAwareLogger(config.Log, config.Observer)
8792

8893
var edgeAddrHandler EdgeAddrHandler
89-
if config.EdgeIPVersion == allregions.IPv6Only || config.EdgeIPVersion == allregions.Auto {
94+
if isStaticEdge { // static edge addresses
95+
edgeAddrHandler = &IPAddrFallback{}
96+
} else if config.EdgeIPVersion == allregions.IPv6Only || config.EdgeIPVersion == allregions.Auto {
9097
edgeAddrHandler = &IPAddrFallback{}
9198
} else { // IPv4Only
9299
edgeAddrHandler = &DefaultAddrFallback{}
@@ -117,6 +124,7 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
117124
edgeTunnelServer: edgeTunnelServer,
118125
tunnelErrors: make(chan tunnelError),
119126
tunnelsConnecting: map[int]chan struct{}{},
127+
tunnelsProtocolFallback: map[int]*protocolFallback{},
120128
log: log,
121129
logTransport: config.LogTransport,
122130
reconnectCredentialManager: reconnectCredentialManager,
@@ -178,6 +186,10 @@ func (s *Supervisor) Run(
178186
tunnelsActive++
179187
continue
180188
}
189+
// Make sure we don't continue if there is no more fallback allowed
190+
if _, retry := s.tunnelsProtocolFallback[tunnelError.index].GetMaxBackoffDuration(ctx); !retry {
191+
continue
192+
}
181193
s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated")
182194
tunnelsWaiting = append(tunnelsWaiting, tunnelError.index)
183195
s.waitForNextTunnel(tunnelError.index)
@@ -232,6 +244,11 @@ func (s *Supervisor) initialize(
232244
s.log.Logger().Info().Msgf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, availableAddrs)
233245
s.config.HAConnections = availableAddrs
234246
}
247+
s.tunnelsProtocolFallback[0] = &protocolFallback{
248+
retry.BackoffHandler{MaxRetries: s.config.Retries},
249+
s.config.ProtocolSelector.Current(),
250+
false,
251+
}
235252

236253
go s.startFirstTunnel(ctx, connectedSignal)
237254

@@ -249,6 +266,11 @@ func (s *Supervisor) initialize(
249266

250267
// At least one successful connection, so start the rest
251268
for i := 1; i < s.config.HAConnections; i++ {
269+
s.tunnelsProtocolFallback[i] = &protocolFallback{
270+
retry.BackoffHandler{MaxRetries: s.config.Retries},
271+
s.config.ProtocolSelector.Current(),
272+
false,
273+
}
252274
ch := signal.New(make(chan struct{}))
253275
go s.startTunnel(ctx, i, ch)
254276
time.Sleep(registrationInterval)
@@ -266,21 +288,44 @@ func (s *Supervisor) startFirstTunnel(
266288
err error
267289
)
268290
const firstConnIndex = 0
291+
isStaticEdge := len(s.config.EdgeAddrs) > 0
269292
defer func() {
270293
s.tunnelErrors <- tunnelError{index: firstConnIndex, err: err}
271294
}()
272295

273-
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
274-
275296
// If the first tunnel disconnects, keep restarting it.
276-
for s.unusedIPs() {
297+
for {
298+
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, s.tunnelsProtocolFallback[firstConnIndex], connectedSignal)
277299
if ctx.Err() != nil {
278300
return
279301
}
280302
if err == nil {
281303
return
282304
}
283-
err = s.edgeTunnelServer.Serve(ctx, firstConnIndex, connectedSignal)
305+
// Make sure we don't continue if there is no more fallback allowed
306+
if _, retry := s.tunnelsProtocolFallback[firstConnIndex].GetMaxBackoffDuration(ctx); !retry {
307+
return
308+
}
309+
// Try again for Unauthorized errors because we hope them to be
310+
// transient due to edge propagation lag on new Tunnels.
311+
if strings.Contains(err.Error(), "Unauthorized") {
312+
continue
313+
}
314+
switch err.(type) {
315+
case edgediscovery.ErrNoAddressesLeft:
316+
// If your provided addresses are not available, we will keep trying regardless.
317+
if !isStaticEdge {
318+
return
319+
}
320+
case connection.DupConnRegisterTunnelError,
321+
*quic.IdleTimeoutError,
322+
edgediscovery.DialError,
323+
*connection.EdgeQuicDialError:
324+
// Try again for these types of errors
325+
default:
326+
// Uncaught errors should bail startup
327+
return
328+
}
284329
}
285330
}
286331

@@ -298,7 +343,7 @@ func (s *Supervisor) startTunnel(
298343
s.tunnelErrors <- tunnelError{index: index, err: err}
299344
}()
300345

301-
err = s.edgeTunnelServer.Serve(ctx, uint8(index), connectedSignal)
346+
err = s.edgeTunnelServer.Serve(ctx, uint8(index), s.tunnelsProtocolFallback[index], connectedSignal)
302347
}
303348

304349
func (s *Supervisor) newConnectedTunnelSignal(index int) *signal.Signal {

supervisor/tunnel.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -194,15 +194,10 @@ type EdgeTunnelServer struct {
194194
connAwareLogger *ConnAwareLogger
195195
}
196196

197-
func (e EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, connectedSignal *signal.Signal) error {
197+
func (e EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, protocolFallback *protocolFallback, connectedSignal *signal.Signal) error {
198198
haConnections.Inc()
199199
defer haConnections.Dec()
200200

201-
protocolFallback := &protocolFallback{
202-
retry.BackoffHandler{MaxRetries: e.config.Retries},
203-
e.config.ProtocolSelector.Current(),
204-
false,
205-
}
206201
connectedFuse := h2mux.NewBooleanFuse()
207202
go func() {
208203
if connectedFuse.Await() {
@@ -214,7 +209,7 @@ func (e EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, connectedS
214209

215210
// Fetch IP address to associated connection index
216211
addr, err := e.edgeAddrs.GetAddr(int(connIndex))
217-
switch err {
212+
switch err.(type) {
218213
case nil: // no error
219214
case edgediscovery.ErrNoAddressesLeft:
220215
return err
@@ -262,7 +257,9 @@ func (e EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, connectedS
262257
// establishing a connection to the edge and if so, rotate the IP address.
263258
yes, hasConnectivityError := e.edgeAddrHandler.ShouldGetNewAddress(err)
264259
if yes {
265-
e.edgeAddrs.GetDifferentAddr(int(connIndex), hasConnectivityError)
260+
if _, err := e.edgeAddrs.GetDifferentAddr(int(connIndex), hasConnectivityError); err != nil {
261+
return err
262+
}
266263
}
267264

268265
select {
@@ -461,6 +458,7 @@ func serveTunnel(
461458
connectedFuse,
462459
config.NamedTunnel,
463460
connIndex,
461+
addr.UDP.IP,
464462
nil,
465463
gracefulShutdownC,
466464
config.GracePeriod,

0 commit comments

Comments
 (0)