Skip to content

Commit 1abd22e

Browse files
TUN-7480: Added a timeout for unregisterUDP.
I deliberately kept this as an unregistertimeout because that was the intent. In the future we could change this to a UDPConnConfig if we want to pass multiple values here. The idea of this PR is simply to add a configurable unregister UDP timeout.
1 parent a3bcf25 commit 1abd22e

File tree

7 files changed

+96
-15
lines changed

7 files changed

+96
-15
lines changed

cmd/cloudflared/tunnel/cmd.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ const (
7979
// hostKeyPath is the path of the dir to save SSH host keys too
8080
hostKeyPath = "host-key-path"
8181

82+
// udpUnregisterSessionTimeout is how long we wait before we stop trying to unregister a UDP session from the edge
83+
udpUnregisterSessionTimeoutFlag = "udp-unregister-session-timeout"
84+
8285
// uiFlag is to enable launching cloudflared in interactive UI mode
8386
uiFlag = "ui"
8487

@@ -683,6 +686,11 @@ func tunnelFlags(shouldHide bool) []cli.Flag {
683686
Value: 4,
684687
Hidden: true,
685688
}),
689+
altsrc.NewDurationFlag(&cli.DurationFlag{
690+
Name: udpUnregisterSessionTimeoutFlag,
691+
Value: 5 * time.Second,
692+
Hidden: true,
693+
}),
686694
altsrc.NewStringFlag(&cli.StringFlag{
687695
Name: connectorLabelFlag,
688696
Usage: "Use this option to give a meaningful label to a specific connector. When a tunnel starts up, a connector id unique to the tunnel is generated. This is a uuid. To make it easier to identify a connector, we will use the hostname of the machine the tunnel is running on along with the connector ID. This option exists if one wants to have more control over what their individual connectors are called.",

cmd/cloudflared/tunnel/configuration.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,15 @@ func prepareTunnelConfig(
231231
Observer: observer,
232232
ReportedVersion: info.Version(),
233233
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
234-
Retries: uint(c.Int("retries")),
235-
RunFromTerminal: isRunningFromTerminal(),
236-
NamedTunnel: namedTunnel,
237-
ProtocolSelector: protocolSelector,
238-
EdgeTLSConfigs: edgeTLSConfigs,
239-
NeedPQ: needPQ,
240-
PQKexIdx: pqKexIdx,
241-
MaxEdgeAddrRetries: uint8(c.Int("max-edge-addr-retries")),
234+
Retries: uint(c.Int("retries")),
235+
RunFromTerminal: isRunningFromTerminal(),
236+
NamedTunnel: namedTunnel,
237+
ProtocolSelector: protocolSelector,
238+
EdgeTLSConfigs: edgeTLSConfigs,
239+
NeedPQ: needPQ,
240+
PQKexIdx: pqKexIdx,
241+
MaxEdgeAddrRetries: uint8(c.Int("max-edge-addr-retries")),
242+
UDPUnregisterSessionTimeout: c.Duration(udpUnregisterSessionTimeoutFlag),
242243
}
243244
packetConfig, err := newPacketConfig(c, log)
244245
if err != nil {

connection/quic.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type QUICConnection struct {
6363
controlStreamHandler ControlStreamHandler
6464
connOptions *tunnelpogs.ConnectionOptions
6565
connIndex uint8
66+
67+
udpUnregisterTimeout time.Duration
6668
}
6769

6870
// NewQUICConnection returns a new instance of QUICConnection.
@@ -78,6 +80,7 @@ func NewQUICConnection(
7880
controlStreamHandler ControlStreamHandler,
7981
logger *zerolog.Logger,
8082
packetRouterConfig *ingress.GlobalRouterConfig,
83+
udpUnregisterTimeout time.Duration,
8184
) (*QUICConnection, error) {
8285
udpConn, err := createUDPConnForConnIndex(connIndex, localAddr, logger)
8386
if err != nil {
@@ -112,6 +115,7 @@ func NewQUICConnection(
112115
controlStreamHandler: controlStreamHandler,
113116
connOptions: connOptions,
114117
connIndex: connIndex,
118+
udpUnregisterTimeout: udpUnregisterTimeout,
115119
}, nil
116120
}
117121

@@ -370,7 +374,7 @@ func (q *QUICConnection) closeUDPSession(ctx context.Context, sessionID uuid.UUI
370374

371375
stream := quicpogs.NewSafeStreamCloser(quicStream)
372376
defer stream.Close()
373-
rpcClientStream, err := quicpogs.NewRPCClientStream(ctx, stream, q.logger)
377+
rpcClientStream, err := quicpogs.NewRPCClientStream(ctx, stream, q.udpUnregisterTimeout, q.logger)
374378
if err != nil {
375379
// Log this at debug because this is not an error if session was closed due to lost connection
376380
// with edge

connection/quic_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,7 @@ func testQUICConnection(udpListenerAddr net.Addr, t *testing.T, index uint8) *QU
725725
fakeControlStream{},
726726
&log,
727727
nil,
728+
5*time.Second,
728729
)
729730
require.NoError(t, err)
730731
return qc

quic/quic_protocol.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,12 @@ func writeSignature(stream io.Writer, signature ProtocolSignature) error {
229229
type RPCClientStream struct {
230230
client tunnelpogs.CloudflaredServer_PogsClient
231231
transport rpc.Transport
232+
233+
// Time we wait for the server to respond to a request before we close the connection.
234+
rpcUnregisterUDPSessionDeadline time.Duration
232235
}
233236

234-
func NewRPCClientStream(ctx context.Context, stream io.ReadWriteCloser, logger *zerolog.Logger) (*RPCClientStream, error) {
237+
func NewRPCClientStream(ctx context.Context, stream io.ReadWriteCloser, rpcUnregisterUDPSessionDeadline time.Duration, logger *zerolog.Logger) (*RPCClientStream, error) {
235238
n, err := stream.Write(RPCStreamProtocolSignature[:])
236239
if err != nil {
237240
return nil, err
@@ -245,8 +248,9 @@ func NewRPCClientStream(ctx context.Context, stream io.ReadWriteCloser, logger *
245248
tunnelrpc.ConnLog(logger),
246249
)
247250
return &RPCClientStream{
248-
client: tunnelpogs.NewCloudflaredServer_PogsClient(conn.Bootstrap(ctx), conn),
249-
transport: transport,
251+
client: tunnelpogs.NewCloudflaredServer_PogsClient(conn.Bootstrap(ctx), conn),
252+
transport: transport,
253+
rpcUnregisterUDPSessionDeadline: rpcUnregisterUDPSessionDeadline,
250254
}, nil
251255
}
252256

@@ -255,6 +259,8 @@ func (rcs *RPCClientStream) RegisterUdpSession(ctx context.Context, sessionID uu
255259
}
256260

257261
func (rcs *RPCClientStream) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
262+
ctx, cancel := context.WithTimeout(ctx, rcs.rpcUnregisterUDPSessionDeadline)
263+
defer cancel()
258264
return rcs.client.UnregisterUdpSession(ctx, sessionID, message)
259265
}
260266

quic/quic_protocol_test.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,63 @@ func TestConnectResponseMeta(t *testing.T) {
109109
}
110110
}
111111

112+
func TestUnregisterUdpSession(t *testing.T) {
113+
unregisterMessage := "closed by eyeball"
114+
115+
var tests = []struct {
116+
name string
117+
sessionRPCServer mockSessionRPCServer
118+
timeout time.Duration
119+
}{
120+
121+
{
122+
name: "UnregisterUdpSessionTimesout if the RPC server does not respond",
123+
sessionRPCServer: mockSessionRPCServer{
124+
sessionID: uuid.New(),
125+
dstIP: net.IP{172, 16, 0, 1},
126+
dstPort: 8000,
127+
closeIdleAfter: testCloseIdleAfterHint,
128+
unregisterMessage: unregisterMessage,
129+
traceContext: "1241ce3ecdefc68854e8514e69ba42ca:b38f1bf5eae406f3:0:1",
130+
},
131+
// very very low value so we trigger the timeout every time.
132+
timeout: time.Nanosecond * 1,
133+
},
134+
}
135+
136+
for _, test := range tests {
137+
t.Run(test.name, func(t *testing.T) {
138+
logger := zerolog.Nop()
139+
clientStream, serverStream := newMockRPCStreams()
140+
sessionRegisteredChan := make(chan struct{})
141+
go func() {
142+
protocol, err := DetermineProtocol(serverStream)
143+
assert.NoError(t, err)
144+
rpcServerStream, err := NewRPCServerStream(serverStream, protocol)
145+
assert.NoError(t, err)
146+
err = rpcServerStream.Serve(test.sessionRPCServer, nil, &logger)
147+
assert.NoError(t, err)
148+
149+
serverStream.Close()
150+
close(sessionRegisteredChan)
151+
}()
152+
153+
rpcClientStream, err := NewRPCClientStream(context.Background(), clientStream, test.timeout, &logger)
154+
assert.NoError(t, err)
155+
156+
reg, err := rpcClientStream.RegisterUdpSession(context.Background(), test.sessionRPCServer.sessionID, test.sessionRPCServer.dstIP, test.sessionRPCServer.dstPort, testCloseIdleAfterHint, test.sessionRPCServer.traceContext)
157+
assert.NoError(t, err)
158+
assert.NoError(t, reg.Err)
159+
160+
assert.Error(t, rpcClientStream.UnregisterUdpSession(context.Background(), test.sessionRPCServer.sessionID, unregisterMessage))
161+
162+
rpcClientStream.Close()
163+
<-sessionRegisteredChan
164+
})
165+
}
166+
167+
}
168+
112169
func TestRegisterUdpSession(t *testing.T) {
113170
unregisterMessage := "closed by eyeball"
114171

@@ -157,7 +214,7 @@ func TestRegisterUdpSession(t *testing.T) {
157214
close(sessionRegisteredChan)
158215
}()
159216

160-
rpcClientStream, err := NewRPCClientStream(context.Background(), clientStream, &logger)
217+
rpcClientStream, err := NewRPCClientStream(context.Background(), clientStream, 5*time.Second, &logger)
161218
assert.NoError(t, err)
162219

163220
reg, err := rpcClientStream.RegisterUdpSession(context.Background(), test.sessionRPCServer.sessionID, test.sessionRPCServer.dstIP, test.sessionRPCServer.dstPort, testCloseIdleAfterHint, test.sessionRPCServer.traceContext)
@@ -208,7 +265,7 @@ func TestManageConfiguration(t *testing.T) {
208265

209266
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
210267
defer cancel()
211-
rpcClientStream, err := NewRPCClientStream(ctx, clientStream, &logger)
268+
rpcClientStream, err := NewRPCClientStream(ctx, clientStream, 5*time.Second, &logger)
212269
assert.NoError(t, err)
213270

214271
result, err := rpcClientStream.UpdateConfiguration(ctx, version, config)

supervisor/tunnel.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type TunnelConfig struct {
6868
ProtocolSelector connection.ProtocolSelector
6969
EdgeTLSConfigs map[connection.Protocol]*tls.Config
7070
PacketConfig *ingress.GlobalRouterConfig
71+
72+
UDPUnregisterSessionTimeout time.Duration
7173
}
7274

7375
func (c *TunnelConfig) registrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {
@@ -615,7 +617,9 @@ func (e *EdgeTunnelServer) serveQUIC(
615617
connOptions,
616618
controlStreamHandler,
617619
connLogger.Logger(),
618-
e.config.PacketConfig)
620+
e.config.PacketConfig,
621+
e.config.UDPUnregisterSessionTimeout,
622+
)
619623
if err != nil {
620624
if e.config.NeedPQ {
621625
handlePQTunnelError(err, e.config)

0 commit comments

Comments
 (0)