Skip to content

Commit 8eeb452

Browse files
committed
TUN-3268: Each connection has its own event digest to reconnect
1 parent 9323844 commit 8eeb452

File tree

5 files changed

+230
-263
lines changed

5 files changed

+230
-263
lines changed

origin/metrics.go

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,9 @@ type TunnelMetrics struct {
5858
// oldServerLocations stores the last server the tunnel was connected to
5959
oldServerLocations map[string]string
6060

61-
regSuccess *prometheus.CounterVec
62-
regFail *prometheus.CounterVec
63-
authSuccess prometheus.Counter
64-
authFail *prometheus.CounterVec
65-
rpcFail *prometheus.CounterVec
61+
regSuccess *prometheus.CounterVec
62+
regFail *prometheus.CounterVec
63+
rpcFail *prometheus.CounterVec
6664

6765
muxerMetrics *muxerMetrics
6866
tunnelsHA tunnelsForHA
@@ -456,27 +454,6 @@ func NewTunnelMetrics() *TunnelMetrics {
456454
)
457455
prometheus.MustRegister(registerSuccess)
458456

459-
authSuccess := prometheus.NewCounter(
460-
prometheus.CounterOpts{
461-
Namespace: metricsNamespace,
462-
Subsystem: tunnelSubsystem,
463-
Name: "tunnel_authenticate_success",
464-
Help: "Count of successful tunnel authenticate",
465-
},
466-
)
467-
prometheus.MustRegister(authSuccess)
468-
469-
authFail := prometheus.NewCounterVec(
470-
prometheus.CounterOpts{
471-
Namespace: metricsNamespace,
472-
Subsystem: tunnelSubsystem,
473-
Name: "tunnel_authenticate_fail",
474-
Help: "Count of tunnel authenticate errors by type",
475-
},
476-
[]string{"error"},
477-
)
478-
prometheus.MustRegister(authFail)
479-
480457
return &TunnelMetrics{
481458
haConnections: haConnections,
482459
activeStreams: activeStreams,
@@ -497,8 +474,6 @@ func NewTunnelMetrics() *TunnelMetrics {
497474
regFail: registerFail,
498475
rpcFail: rpcFail,
499476
userHostnamesCounts: userHostnamesCounts,
500-
authSuccess: authSuccess,
501-
authFail: authFail,
502477
}
503478
}
504479

origin/reconnect.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package origin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
"github.com/cloudflare/cloudflared/connection"
11+
"github.com/cloudflare/cloudflared/h2mux"
12+
"github.com/cloudflare/cloudflared/logger"
13+
"github.com/cloudflare/cloudflared/tunnelrpc"
14+
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
15+
"github.com/google/uuid"
16+
"github.com/prometheus/client_golang/prometheus"
17+
)
18+
19+
var (
20+
errJWTUnset = errors.New("JWT unset")
21+
)
22+
23+
// reconnectTunnelCredentialManager is invoked by functions in tunnel.go to
24+
// get/set parameters for ReconnectTunnel RPC calls.
25+
type reconnectCredentialManager struct {
26+
mu sync.RWMutex
27+
jwt []byte
28+
eventDigest map[uint8][]byte
29+
connDigest map[uint8][]byte
30+
authSuccess prometheus.Counter
31+
authFail *prometheus.CounterVec
32+
}
33+
34+
func newReconnectCredentialManager(namespace, subsystem string, haConnections int) *reconnectCredentialManager {
35+
authSuccess := prometheus.NewCounter(
36+
prometheus.CounterOpts{
37+
Namespace: namespace,
38+
Subsystem: subsystem,
39+
Name: "tunnel_authenticate_success",
40+
Help: "Count of successful tunnel authenticate",
41+
},
42+
)
43+
authFail := prometheus.NewCounterVec(
44+
prometheus.CounterOpts{
45+
Namespace: namespace,
46+
Subsystem: subsystem,
47+
Name: "tunnel_authenticate_fail",
48+
Help: "Count of tunnel authenticate errors by type",
49+
},
50+
[]string{"error"},
51+
)
52+
prometheus.MustRegister(authSuccess, authFail)
53+
return &reconnectCredentialManager{
54+
eventDigest: make(map[uint8][]byte, haConnections),
55+
connDigest: make(map[uint8][]byte, haConnections),
56+
authSuccess: authSuccess,
57+
authFail: authFail,
58+
}
59+
}
60+
61+
func (cm *reconnectCredentialManager) ReconnectToken() ([]byte, error) {
62+
cm.mu.RLock()
63+
defer cm.mu.RUnlock()
64+
if cm.jwt == nil {
65+
return nil, errJWTUnset
66+
}
67+
return cm.jwt, nil
68+
}
69+
70+
func (cm *reconnectCredentialManager) SetReconnectToken(jwt []byte) {
71+
cm.mu.Lock()
72+
defer cm.mu.Unlock()
73+
cm.jwt = jwt
74+
}
75+
76+
func (cm *reconnectCredentialManager) EventDigest(connID uint8) ([]byte, error) {
77+
cm.mu.RLock()
78+
defer cm.mu.RUnlock()
79+
digest, ok := cm.eventDigest[connID]
80+
if !ok {
81+
return nil, fmt.Errorf("no event digest for connection %v", connID)
82+
}
83+
return digest, nil
84+
}
85+
86+
func (cm *reconnectCredentialManager) SetEventDigest(connID uint8, digest []byte) {
87+
cm.mu.Lock()
88+
defer cm.mu.Unlock()
89+
cm.eventDigest[connID] = digest
90+
}
91+
92+
func (cm *reconnectCredentialManager) ConnDigest(connID uint8) ([]byte, error) {
93+
cm.mu.RLock()
94+
defer cm.mu.RUnlock()
95+
digest, ok := cm.connDigest[connID]
96+
if !ok {
97+
return nil, fmt.Errorf("no conneciton digest for connection %v", connID)
98+
}
99+
return digest, nil
100+
}
101+
102+
func (cm *reconnectCredentialManager) SetConnDigest(connID uint8, digest []byte) {
103+
cm.mu.Lock()
104+
defer cm.mu.Unlock()
105+
cm.connDigest[connID] = digest
106+
}
107+
108+
func (cm *reconnectCredentialManager) RefreshAuth(
109+
ctx context.Context,
110+
backoff *BackoffHandler,
111+
authenticate func(ctx context.Context, numPreviousAttempts int) (tunnelpogs.AuthOutcome, error),
112+
) (retryTimer <-chan time.Time, err error) {
113+
authOutcome, err := authenticate(ctx, backoff.Retries())
114+
if err != nil {
115+
cm.authFail.WithLabelValues(err.Error()).Inc()
116+
if _, ok := backoff.GetBackoffDuration(ctx); ok {
117+
return backoff.BackoffTimer(), nil
118+
}
119+
return nil, err
120+
}
121+
// clear backoff timer
122+
backoff.SetGracePeriod()
123+
124+
switch outcome := authOutcome.(type) {
125+
case tunnelpogs.AuthSuccess:
126+
cm.SetReconnectToken(outcome.JWT())
127+
cm.authSuccess.Inc()
128+
return timeAfter(outcome.RefreshAfter()), nil
129+
case tunnelpogs.AuthUnknown:
130+
duration := outcome.RefreshAfter()
131+
cm.authFail.WithLabelValues(outcome.Error()).Inc()
132+
return timeAfter(duration), nil
133+
case tunnelpogs.AuthFail:
134+
cm.authFail.WithLabelValues(outcome.Error()).Inc()
135+
return nil, outcome
136+
default:
137+
err := fmt.Errorf("refresh_auth: Unexpected outcome type %T", authOutcome)
138+
cm.authFail.WithLabelValues(err.Error()).Inc()
139+
return nil, err
140+
}
141+
}
142+
143+
func ReconnectTunnel(
144+
ctx context.Context,
145+
muxer *h2mux.Muxer,
146+
config *TunnelConfig,
147+
logger logger.Service,
148+
connectionID uint8,
149+
originLocalAddr string,
150+
uuid uuid.UUID,
151+
credentialManager *reconnectCredentialManager,
152+
) error {
153+
token, err := credentialManager.ReconnectToken()
154+
if err != nil {
155+
return err
156+
}
157+
eventDigest, err := credentialManager.EventDigest(connectionID)
158+
if err != nil {
159+
return err
160+
}
161+
connDigest, err := credentialManager.ConnDigest(connectionID)
162+
if err != nil {
163+
return err
164+
}
165+
166+
config.TransportLogger.Debug("initiating RPC stream to reconnect")
167+
tunnelServer, err := connection.NewRPCClient(ctx, muxer, config.TransportLogger, openStreamTimeout)
168+
if err != nil {
169+
// RPC stream open error
170+
return newClientRegisterTunnelError(err, config.Metrics.rpcFail, reconnect)
171+
}
172+
defer tunnelServer.Close()
173+
// Request server info without blocking tunnel registration; must use capnp library directly.
174+
serverInfoPromise := tunnelrpc.TunnelServer{Client: tunnelServer.Client}.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error {
175+
return nil
176+
})
177+
LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, logger)
178+
registration := tunnelServer.ReconnectTunnel(
179+
ctx,
180+
token,
181+
eventDigest,
182+
connDigest,
183+
config.Hostname,
184+
config.RegistrationOptions(connectionID, originLocalAddr, uuid),
185+
)
186+
if registrationErr := registration.DeserializeError(); registrationErr != nil {
187+
// ReconnectTunnel RPC failure
188+
return processRegisterTunnelError(registrationErr, config.Metrics, reconnect)
189+
}
190+
return processRegistrationSuccess(config, logger, connectionID, registration, reconnect, credentialManager)
191+
}

0 commit comments

Comments
 (0)