Skip to content

Commit f2339a7

Browse files
committed
TUN-6380: Enforce connect and keep-alive timeouts for TCP connections in both WARP routing and websocket based TCP proxy.
For WARP routing the defaults for these new settings are 5 seconds for connect timeout and 30 seconds for keep-alive timeout. These values can be configured either remotely or locally. Local config lives under "warp-routing" section in config.yaml. For websocket-based proxy, the defaults come from originConfig settings (either global or per-service) and use the same defaults as HTTP proxying.
1 parent 978e01f commit f2339a7

File tree

15 files changed

+144
-88
lines changed

15 files changed

+144
-88
lines changed

cmd/cloudflared/tunnel/configuration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func prepareTunnelConfig(
358358
}
359359
orchestratorConfig := &orchestration.Config{
360360
Ingress: &ingressRules,
361-
WarpRoutingEnabled: warpRoutingEnabled,
361+
WarpRouting: ingress.NewWarpRoutingConfig(&cfg.WarpRouting),
362362
ConfigurationFlags: parseConfigFlags(c),
363363
}
364364
return tunnelConfig, orchestratorConfig, nil

config/configuration.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,9 @@ type Configuration struct {
244244
}
245245

246246
type WarpRoutingConfig struct {
247-
Enabled bool `yaml:"enabled" json:"enabled"`
247+
Enabled bool `yaml:"enabled" json:"enabled"`
248+
ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
249+
TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
248250
}
249251

250252
type configFileSettings struct {

config/configuration_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ func TestConfigFileSettings(t *testing.T) {
2323
Service: "https://localhost:8001",
2424
}
2525
warpRouting = WarpRoutingConfig{
26-
Enabled: true,
26+
Enabled: true,
27+
ConnectTimeout: &CustomDuration{Duration: 2 * time.Second},
28+
TCPKeepAlive: &CustomDuration{Duration: 10 * time.Second},
2729
}
2830
)
2931
rawYAML := `
@@ -48,6 +50,9 @@ ingress:
4850
service: https://localhost:8001
4951
warp-routing:
5052
enabled: true
53+
connectTimeout: 2s
54+
tcpKeepAlive: 10s
55+
5156
retries: 5
5257
grace-period: 30s
5358
percentage: 3.14

ingress/config.go

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
)
1313

1414
var (
15-
defaultConnectTimeout = config.CustomDuration{Duration: 30 * time.Second}
16-
defaultTLSTimeout = config.CustomDuration{Duration: 10 * time.Second}
17-
defaultTCPKeepAlive = config.CustomDuration{Duration: 30 * time.Second}
18-
defaultKeepAliveTimeout = config.CustomDuration{Duration: 90 * time.Second}
15+
defaultHTTPConnectTimeout = config.CustomDuration{Duration: 30 * time.Second}
16+
defaultWarpRoutingConnectTimeout = config.CustomDuration{Duration: 5 * time.Second}
17+
defaultTLSTimeout = config.CustomDuration{Duration: 10 * time.Second}
18+
defaultTCPKeepAlive = config.CustomDuration{Duration: 30 * time.Second}
19+
defaultKeepAliveTimeout = config.CustomDuration{Duration: 90 * time.Second}
1920
)
2021

2122
const (
@@ -41,10 +42,44 @@ const (
4142
socksProxy = "socks"
4243
)
4344

45+
type WarpRoutingConfig struct {
46+
Enabled bool `yaml:"enabled" json:"enabled"`
47+
ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
48+
TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
49+
}
50+
51+
func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
52+
cfg := WarpRoutingConfig{
53+
Enabled: raw.Enabled,
54+
ConnectTimeout: defaultWarpRoutingConnectTimeout,
55+
TCPKeepAlive: defaultTCPKeepAlive,
56+
}
57+
if raw.ConnectTimeout != nil {
58+
cfg.ConnectTimeout = *raw.ConnectTimeout
59+
}
60+
if raw.TCPKeepAlive != nil {
61+
cfg.TCPKeepAlive = *raw.TCPKeepAlive
62+
}
63+
return cfg
64+
}
65+
66+
func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig {
67+
raw := config.WarpRoutingConfig{
68+
Enabled: c.Enabled,
69+
}
70+
if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration {
71+
raw.ConnectTimeout = &c.ConnectTimeout
72+
}
73+
if c.TCPKeepAlive.Duration != defaultTCPKeepAlive.Duration {
74+
raw.TCPKeepAlive = &c.TCPKeepAlive
75+
}
76+
return raw
77+
}
78+
4479
// RemoteConfig models ingress settings that can be managed remotely, for example through the dashboard.
4580
type RemoteConfig struct {
4681
Ingress Ingress
47-
WarpRouting config.WarpRoutingConfig
82+
WarpRouting WarpRoutingConfig
4883
}
4984

5085
type RemoteConfigJSON struct {
@@ -72,18 +107,18 @@ func (rc *RemoteConfig) UnmarshalJSON(b []byte) error {
72107
}
73108

74109
rc.Ingress = ingress
75-
rc.WarpRouting = rawConfig.WarpRouting
110+
rc.WarpRouting = NewWarpRoutingConfig(&rawConfig.WarpRouting)
76111

77112
return nil
78113
}
79114

80115
func originRequestFromSingeRule(c *cli.Context) OriginRequestConfig {
81-
var connectTimeout config.CustomDuration = defaultConnectTimeout
82-
var tlsTimeout config.CustomDuration = defaultTLSTimeout
83-
var tcpKeepAlive config.CustomDuration = defaultTCPKeepAlive
116+
var connectTimeout = defaultHTTPConnectTimeout
117+
var tlsTimeout = defaultTLSTimeout
118+
var tcpKeepAlive = defaultTCPKeepAlive
84119
var noHappyEyeballs bool
85-
var keepAliveConnections int = defaultKeepAliveConnections
86-
var keepAliveTimeout config.CustomDuration = defaultKeepAliveTimeout
120+
var keepAliveConnections = defaultKeepAliveConnections
121+
var keepAliveTimeout = defaultKeepAliveTimeout
87122
var httpHostHeader string
88123
var originServerName string
89124
var caPool string
@@ -160,7 +195,7 @@ func originRequestFromSingeRule(c *cli.Context) OriginRequestConfig {
160195

161196
func originRequestFromConfig(c config.OriginRequestConfig) OriginRequestConfig {
162197
out := OriginRequestConfig{
163-
ConnectTimeout: defaultConnectTimeout,
198+
ConnectTimeout: defaultHTTPConnectTimeout,
164199
TLSTimeout: defaultTLSTimeout,
165200
TCPKeepAlive: defaultTCPKeepAlive,
166201
KeepAliveConnections: defaultKeepAliveConnections,
@@ -404,7 +439,7 @@ func ConvertToRawOriginConfig(c OriginRequestConfig) config.OriginRequestConfig
404439
var keepAliveTimeout *config.CustomDuration
405440
var proxyAddress *string
406441

407-
if c.ConnectTimeout != defaultConnectTimeout {
442+
if c.ConnectTimeout != defaultHTTPConnectTimeout {
408443
connectTimeout = &c.ConnectTimeout
409444
}
410445
if c.TLSTimeout != defaultTLSTimeout {

ingress/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func TestOriginRequestConfigDefaults(t *testing.T) {
274274
// Rule 0 didn't override anything, so it inherits the cloudflared defaults
275275
actual0 := ing.Rules[0].Config
276276
expected0 := OriginRequestConfig{
277-
ConnectTimeout: defaultConnectTimeout,
277+
ConnectTimeout: defaultHTTPConnectTimeout,
278278
TLSTimeout: defaultTLSTimeout,
279279
TCPKeepAlive: defaultTCPKeepAlive,
280280
KeepAliveConnections: defaultKeepAliveConnections,
@@ -404,7 +404,7 @@ func TestDefaultConfigFromCLI(t *testing.T) {
404404
c := cli.NewContext(nil, set, nil)
405405

406406
expected := OriginRequestConfig{
407-
ConnectTimeout: defaultConnectTimeout,
407+
ConnectTimeout: defaultHTTPConnectTimeout,
408408
TLSTimeout: defaultTLSTimeout,
409409
TCPKeepAlive: defaultTCPKeepAlive,
410410
KeepAliveConnections: defaultKeepAliveConnections,

ingress/ingress.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,16 @@ type WarpRoutingService struct {
9797
Proxy StreamBasedOriginProxy
9898
}
9999

100-
func NewWarpRoutingService() *WarpRoutingService {
101-
return &WarpRoutingService{Proxy: &rawTCPService{name: ServiceWarpRouting}}
100+
func NewWarpRoutingService(config WarpRoutingConfig) *WarpRoutingService {
101+
svc := &rawTCPService{
102+
name: ServiceWarpRouting,
103+
dialer: net.Dialer{
104+
Timeout: config.ConnectTimeout.Duration,
105+
KeepAlive: config.TCPKeepAlive.Duration,
106+
},
107+
}
108+
109+
return &WarpRoutingService{Proxy: svc}
102110
}
103111

104112
// Get a single origin service from the CLI/config.

ingress/origin_proxy.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,20 @@
11
package ingress
22

33
import (
4+
"context"
45
"fmt"
5-
"net"
66
"net/http"
7-
8-
"github.com/pkg/errors"
9-
)
10-
11-
var (
12-
errUnsupportedConnectionType = errors.New("internal error: unsupported connection type")
137
)
148

159
// HTTPOriginProxy can be implemented by origin services that want to proxy http requests.
1610
type HTTPOriginProxy interface {
17-
// RoundTrip is how cloudflared proxies eyeball requests to the actual origin services
11+
// RoundTripper is how cloudflared proxies eyeball requests to the actual origin services
1812
http.RoundTripper
1913
}
2014

2115
// StreamBasedOriginProxy can be implemented by origin services that want to proxy ws/TCP.
2216
type StreamBasedOriginProxy interface {
23-
EstablishConnection(dest string) (OriginConnection, error)
17+
EstablishConnection(ctx context.Context, dest string) (OriginConnection, error)
2418
}
2519

2620
func (o *unixSocketPath) RoundTrip(req *http.Request) (*http.Response, error) {
@@ -59,8 +53,8 @@ func (o *statusCode) RoundTrip(_ *http.Request) (*http.Response, error) {
5953
return resp, nil
6054
}
6155

62-
func (o *rawTCPService) EstablishConnection(dest string) (OriginConnection, error) {
63-
conn, err := net.Dial("tcp", dest)
56+
func (o *rawTCPService) EstablishConnection(ctx context.Context, dest string) (OriginConnection, error) {
57+
conn, err := o.dialer.DialContext(ctx, "tcp", dest)
6458
if err != nil {
6559
return nil, err
6660
}
@@ -71,13 +65,13 @@ func (o *rawTCPService) EstablishConnection(dest string) (OriginConnection, erro
7165
return originConn, nil
7266
}
7367

74-
func (o *tcpOverWSService) EstablishConnection(dest string) (OriginConnection, error) {
68+
func (o *tcpOverWSService) EstablishConnection(ctx context.Context, dest string) (OriginConnection, error) {
7569
var err error
7670
if !o.isBastion {
7771
dest = o.dest
7872
}
7973

80-
conn, err := net.Dial("tcp", dest)
74+
conn, err := o.dialer.DialContext(ctx, "tcp", dest)
8175
if err != nil {
8276
return nil, err
8377
}
@@ -89,6 +83,6 @@ func (o *tcpOverWSService) EstablishConnection(dest string) (OriginConnection, e
8983

9084
}
9185

92-
func (o *socksProxyOverWSService) EstablishConnection(dest string) (OriginConnection, error) {
86+
func (o *socksProxyOverWSService) EstablishConnection(_ctx context.Context, _dest string) (OriginConnection, error) {
9387
return o.conn, nil
9488
}

ingress/origin_proxy_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestRawTCPServiceEstablishConnection(t *testing.T) {
3636
require.NoError(t, err)
3737

3838
// Origin not listening for new connection, should return an error
39-
_, err = rawTCPService.EstablishConnection(req.URL.String())
39+
_, err = rawTCPService.EstablishConnection(context.Background(), req.URL.String())
4040
require.Error(t, err)
4141
}
4242

@@ -87,7 +87,7 @@ func TestTCPOverWSServiceEstablishConnection(t *testing.T) {
8787
t.Run(test.testCase, func(t *testing.T) {
8888
if test.expectErr {
8989
bastionHost, _ := carrier.ResolveBastionDest(test.req)
90-
_, err := test.service.EstablishConnection(bastionHost)
90+
_, err := test.service.EstablishConnection(context.Background(), bastionHost)
9191
assert.Error(t, err)
9292
}
9393
})
@@ -99,7 +99,7 @@ func TestTCPOverWSServiceEstablishConnection(t *testing.T) {
9999
for _, service := range []*tcpOverWSService{newTCPOverWSService(originURL), newBastionService()} {
100100
// Origin not listening for new connection, should return an error
101101
bastionHost, _ := carrier.ResolveBastionDest(bastionReq)
102-
_, err := service.EstablishConnection(bastionHost)
102+
_, err := service.EstablishConnection(context.Background(), bastionHost)
103103
assert.Error(t, err)
104104
}
105105
}

ingress/origin_service.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ func (o httpService) MarshalJSON() ([]byte, error) {
9191
// rawTCPService dials TCP to the destination specified by the client
9292
// It's used by warp routing
9393
type rawTCPService struct {
94-
name string
94+
name string
95+
dialer net.Dialer
9596
}
9697

9798
func (o *rawTCPService) String() string {
@@ -113,6 +114,7 @@ type tcpOverWSService struct {
113114
dest string
114115
isBastion bool
115116
streamHandler streamHandlerFunc
117+
dialer net.Dialer
116118
}
117119

118120
type socksProxyOverWSService struct {
@@ -176,6 +178,8 @@ func (o *tcpOverWSService) start(log *zerolog.Logger, _ <-chan struct{}, cfg Ori
176178
} else {
177179
o.streamHandler = DefaultStreamHandler
178180
}
181+
o.dialer.Timeout = cfg.ConnectTimeout.Duration
182+
o.dialer.KeepAlive = cfg.TCPKeepAlive.Duration
179183
return nil
180184
}
181185

orchestration/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ type newLocalConfig struct {
1919

2020
// Config is the original config as read and parsed by cloudflared.
2121
type Config struct {
22-
Ingress *ingress.Ingress
23-
WarpRoutingEnabled bool
22+
Ingress *ingress.Ingress
23+
WarpRouting ingress.WarpRoutingConfig
2424

2525
// Extra settings used to configure this instance but that are not eligible for remotely management
2626
// ie. (--protocol, --loglevel, ...)
@@ -37,7 +37,7 @@ func (rc *newLocalConfig) MarshalJSON() ([]byte, error) {
3737
// UI doesn't support top level configs, so we reconcile to individual ingress configs.
3838
GlobalOriginRequest: nil,
3939
IngressRules: convertToUnvalidatedIngressRules(rc.RemoteConfig.Ingress),
40-
WarpRouting: rc.RemoteConfig.WarpRouting,
40+
WarpRouting: rc.RemoteConfig.WarpRouting.RawConfig(),
4141
},
4242
}
4343

0 commit comments

Comments
 (0)