Skip to content

Commit 99d4e48

Browse files
committed
TUN-6016: Push local managed tunnels configuration to the edge
1 parent 0180b6d commit 99d4e48

File tree

20 files changed

+441
-50
lines changed

20 files changed

+441
-50
lines changed

cmd/cloudflared/tunnel/cmd.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,13 @@ func StartServer(
343343
observer.SendURL(quickTunnelURL)
344344
}
345345

346-
tunnelConfig, dynamicConfig, err := prepareTunnelConfig(c, info, log, logTransport, observer, namedTunnel)
346+
tunnelConfig, orchestratorConfig, err := prepareTunnelConfig(c, info, log, logTransport, observer, namedTunnel)
347347
if err != nil {
348348
log.Err(err).Msg("Couldn't start tunnel")
349349
return err
350350
}
351351

352-
orchestrator, err := orchestration.NewOrchestrator(ctx, dynamicConfig, tunnelConfig.Tags, tunnelConfig.Log)
352+
orchestrator, err := orchestration.NewOrchestrator(ctx, orchestratorConfig, tunnelConfig.Tags, tunnelConfig.Log)
353353
if err != nil {
354354
return err
355355
}
@@ -388,7 +388,7 @@ func StartServer(
388388
info.Version(),
389389
hostname,
390390
metricsListener.Addr().String(),
391-
dynamicConfig.Ingress,
391+
orchestratorConfig.Ingress,
392392
tunnelConfig.HAConnections,
393393
)
394394
app := tunnelUI.Launch(ctx, log, logTransport)

cmd/cloudflared/tunnel/configuration.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ var (
4343

4444
secretFlags = [2]*altsrc.StringFlag{credentialsContentsFlag, tunnelTokenFlag}
4545
defaultFeatures = []string{supervisor.FeatureAllowRemoteConfig, supervisor.FeatureSerializedHeaders}
46+
47+
configFlags = []string{"autoupdate-freq", "no-autoupdate", "retries", "protocol", "loglevel", "transport-loglevel", "origincert", "metrics", "metrics-update-freq"}
4648
)
4749

4850
// returns the first path that contains a cert.pem file. If none of the DefaultConfigSearchDirectories
@@ -348,11 +350,24 @@ func prepareTunnelConfig(
348350
ProtocolSelector: protocolSelector,
349351
EdgeTLSConfigs: edgeTLSConfigs,
350352
}
351-
dynamicConfig := &orchestration.Config{
353+
orchestratorConfig := &orchestration.Config{
352354
Ingress: &ingressRules,
353355
WarpRoutingEnabled: warpRoutingEnabled,
356+
ConfigurationFlags: parseConfigFlags(c),
354357
}
355-
return tunnelConfig, dynamicConfig, nil
358+
return tunnelConfig, orchestratorConfig, nil
359+
}
360+
361+
func parseConfigFlags(c *cli.Context) map[string]string {
362+
result := make(map[string]string)
363+
364+
for _, flag := range configFlags {
365+
if v := c.String(flag); c.IsSet(flag) && v != "" {
366+
result[flag] = v
367+
}
368+
}
369+
370+
return result
356371
}
357372

358373
func gracePeriod(c *cli.Context) (time.Duration, error) {

config/configuration.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,9 @@ func ValidateUrl(c *cli.Context, allowURLFromArgs bool) (*url.URL, error) {
177177
}
178178

179179
type UnvalidatedIngressRule struct {
180-
Hostname string `json:"hostname"`
181-
Path string `json:"path"`
182-
Service string `json:"service"`
180+
Hostname string `json:"hostname,omitempty"`
181+
Path string `json:"path,omitempty"`
182+
Service string `json:"service,omitempty"`
183183
OriginRequest OriginRequestConfig `yaml:"originRequest" json:"originRequest"`
184184
}
185185

@@ -192,41 +192,41 @@ type UnvalidatedIngressRule struct {
192192
// - To specify a time.Duration in json, use int64 of the nanoseconds
193193
type OriginRequestConfig struct {
194194
// HTTP proxy timeout for establishing a new connection
195-
ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout"`
195+
ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
196196
// HTTP proxy timeout for completing a TLS handshake
197-
TLSTimeout *CustomDuration `yaml:"tlsTimeout" json:"tlsTimeout"`
197+
TLSTimeout *CustomDuration `yaml:"tlsTimeout" json:"tlsTimeout,omitempty"`
198198
// HTTP proxy TCP keepalive duration
199-
TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive"`
199+
TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
200200
// HTTP proxy should disable "happy eyeballs" for IPv4/v6 fallback
201-
NoHappyEyeballs *bool `yaml:"noHappyEyeballs" json:"noHappyEyeballs"`
201+
NoHappyEyeballs *bool `yaml:"noHappyEyeballs" json:"noHappyEyeballs,omitempty"`
202202
// HTTP proxy maximum keepalive connection pool size
203-
KeepAliveConnections *int `yaml:"keepAliveConnections" json:"keepAliveConnections"`
203+
KeepAliveConnections *int `yaml:"keepAliveConnections" json:"keepAliveConnections,omitempty"`
204204
// HTTP proxy timeout for closing an idle connection
205-
KeepAliveTimeout *CustomDuration `yaml:"keepAliveTimeout" json:"keepAliveTimeout"`
205+
KeepAliveTimeout *CustomDuration `yaml:"keepAliveTimeout" json:"keepAliveTimeout,omitempty"`
206206
// Sets the HTTP Host header for the local webserver.
207-
HTTPHostHeader *string `yaml:"httpHostHeader" json:"httpHostHeader"`
207+
HTTPHostHeader *string `yaml:"httpHostHeader" json:"httpHostHeader,omitempty"`
208208
// Hostname on the origin server certificate.
209-
OriginServerName *string `yaml:"originServerName" json:"originServerName"`
209+
OriginServerName *string `yaml:"originServerName" json:"originServerName,omitempty"`
210210
// Path to the CA for the certificate of your origin.
211211
// This option should be used only if your certificate is not signed by Cloudflare.
212-
CAPool *string `yaml:"caPool" json:"caPool"`
212+
CAPool *string `yaml:"caPool" json:"caPool,omitempty"`
213213
// Disables TLS verification of the certificate presented by your origin.
214214
// Will allow any certificate from the origin to be accepted.
215215
// Note: The connection from your machine to Cloudflare's Edge is still encrypted.
216-
NoTLSVerify *bool `yaml:"noTLSVerify" json:"noTLSVerify"`
216+
NoTLSVerify *bool `yaml:"noTLSVerify" json:"noTLSVerify,omitempty"`
217217
// Disables chunked transfer encoding.
218218
// Useful if you are running a WSGI server.
219-
DisableChunkedEncoding *bool `yaml:"disableChunkedEncoding" json:"disableChunkedEncoding"`
219+
DisableChunkedEncoding *bool `yaml:"disableChunkedEncoding" json:"disableChunkedEncoding,omitempty"`
220220
// Runs as jump host
221-
BastionMode *bool `yaml:"bastionMode" json:"bastionMode"`
221+
BastionMode *bool `yaml:"bastionMode" json:"bastionMode,omitempty"`
222222
// Listen address for the proxy.
223-
ProxyAddress *string `yaml:"proxyAddress" json:"proxyAddress"`
223+
ProxyAddress *string `yaml:"proxyAddress" json:"proxyAddress,omitempty"`
224224
// Listen port for the proxy.
225-
ProxyPort *uint `yaml:"proxyPort" json:"proxyPort"`
225+
ProxyPort *uint `yaml:"proxyPort" json:"proxyPort,omitempty"`
226226
// Valid options are 'socks' or empty.
227-
ProxyType *string `yaml:"proxyType" json:"proxyType"`
227+
ProxyType *string `yaml:"proxyType" json:"proxyType,omitempty"`
228228
// IP rules for the proxy service
229-
IPRules []IngressIPRule `yaml:"ipRules" json:"ipRules"`
229+
IPRules []IngressIPRule `yaml:"ipRules" json:"ipRules,omitempty"`
230230
}
231231

232232
type IngressIPRule struct {

connection/connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols,
3030

3131
type Orchestrator interface {
3232
UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
33+
GetConfigJSON() ([]byte, error)
3334
GetOriginProxy() (OriginProxy, error)
3435
}
3536

connection/connection_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type mockOrchestrator struct {
4242
originProxy OriginProxy
4343
}
4444

45+
func (mcr *mockOrchestrator) GetConfigJSON() ([]byte, error) {
46+
return nil, fmt.Errorf("not implemented")
47+
}
48+
4549
func (*mockOrchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
4650
return &tunnelpogs.UpdateConfigurationResponse{
4751
LastAppliedVersion: version,

connection/control.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@ type controlStream struct {
3030
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
3131
type ControlStreamHandler interface {
3232
// ServeControlStream handles the control plane of the transport in the current goroutine calling this
33-
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error
33+
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error
3434
// IsStopped tells whether the method above has finished
3535
IsStopped() bool
3636
}
3737

38+
type TunnelConfigJSONGetter interface {
39+
GetConfigJSON() ([]byte, error)
40+
}
41+
3842
// NewControlStream returns a new instance of ControlStreamHandler
3943
func NewControlStream(
4044
observer *Observer,
@@ -63,15 +67,28 @@ func (c *controlStream) ServeControlStream(
6367
ctx context.Context,
6468
rw io.ReadWriteCloser,
6569
connOptions *tunnelpogs.ConnectionOptions,
70+
tunnelConfigGetter TunnelConfigJSONGetter,
6671
) error {
6772
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
6873

69-
if err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.observer); err != nil {
74+
registrationDetails, err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.observer)
75+
if err != nil {
7076
rpcClient.Close()
7177
return err
7278
}
7379
c.connectedFuse.Connected()
7480

81+
// if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration
82+
if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged {
83+
if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil {
84+
if err := rpcClient.SendLocalConfiguration(ctx, tunnelConfig, c.observer); err != nil {
85+
c.observer.log.Err(err).Msg("unable to send local configuration")
86+
}
87+
} else {
88+
c.observer.log.Err(err).Msg("failed to obtain current configuration")
89+
}
90+
}
91+
7592
c.waitForUnregister(ctx, rpcClient)
7693
return nil
7794
}

connection/http2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
117117

118118
switch connType {
119119
case TypeControlStream:
120-
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil {
120+
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, c.orchestrator); err != nil {
121121
c.controlStreamErr = err
122122
c.log.Error().Err(err)
123123
respWriter.WriteErrorResponse()

connection/http2_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/gobwas/ws/wsutil"
18+
"github.com/google/uuid"
1819
"github.com/rs/zerolog"
1920
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
@@ -166,18 +167,26 @@ type mockNamedTunnelRPCClient struct {
166167
unregistered chan struct{}
167168
}
168169

170+
func (mc mockNamedTunnelRPCClient) SendLocalConfiguration(c context.Context, config []byte, observer *Observer) error {
171+
return nil
172+
}
173+
169174
func (mc mockNamedTunnelRPCClient) RegisterConnection(
170175
c context.Context,
171176
properties *NamedTunnelProperties,
172177
options *tunnelpogs.ConnectionOptions,
173178
connIndex uint8,
174179
observer *Observer,
175-
) error {
180+
) (*tunnelpogs.ConnectionDetails, error) {
176181
if mc.shouldFail != nil {
177-
return mc.shouldFail
182+
return nil, mc.shouldFail
178183
}
179184
close(mc.registered)
180-
return nil
185+
return &tunnelpogs.ConnectionDetails{
186+
Location: "LIS",
187+
UUID: uuid.New(),
188+
TunnelIsRemotelyManaged: false,
189+
}, nil
181190
}
182191

183192
func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
@@ -477,7 +486,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
477486

478487
select {
479488
case <-rpcClientFactory.registered:
480-
break //ok
489+
break // ok
481490
case <-time.Tick(time.Second):
482491
t.Fatal("timeout out waiting for registration")
483492
}
@@ -487,7 +496,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
487496

488497
select {
489498
case <-rpcClientFactory.unregistered:
490-
break //ok
499+
break // ok
491500
case <-time.Tick(time.Second):
492501
t.Fatal("timeout out waiting for unregistered signal")
493502
}

connection/metrics.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313
MetricsNamespace = "cloudflared"
1414
TunnelSubsystem = "tunnel"
1515
muxerSubsystem = "muxer"
16+
configSubsystem = "config"
1617
)
1718

1819
type muxerMetrics struct {
@@ -36,6 +37,11 @@ type muxerMetrics struct {
3637
compRateAve *prometheus.GaugeVec
3738
}
3839

40+
type localConfigMetrics struct {
41+
pushes prometheus.Counter
42+
pushesErrors prometheus.Counter
43+
}
44+
3945
type tunnelMetrics struct {
4046
timerRetries prometheus.Gauge
4147
serverLocations *prometheus.GaugeVec
@@ -51,6 +57,39 @@ type tunnelMetrics struct {
5157
muxerMetrics *muxerMetrics
5258
tunnelsHA tunnelsForHA
5359
userHostnamesCounts *prometheus.CounterVec
60+
61+
localConfigMetrics *localConfigMetrics
62+
}
63+
64+
func newLocalConfigMetrics() *localConfigMetrics {
65+
66+
pushesMetric := prometheus.NewCounter(
67+
prometheus.CounterOpts{
68+
Namespace: MetricsNamespace,
69+
Subsystem: configSubsystem,
70+
Name: "local_config_pushes",
71+
Help: "Number of local configuration pushes to the edge",
72+
},
73+
)
74+
75+
pushesErrorsMetric := prometheus.NewCounter(
76+
prometheus.CounterOpts{
77+
Namespace: MetricsNamespace,
78+
Subsystem: configSubsystem,
79+
Name: "local_config_pushes_errors",
80+
Help: "Number of errors occurred during local configuration pushes",
81+
},
82+
)
83+
84+
prometheus.MustRegister(
85+
pushesMetric,
86+
pushesErrorsMetric,
87+
)
88+
89+
return &localConfigMetrics{
90+
pushes: pushesMetric,
91+
pushesErrors: pushesErrorsMetric,
92+
}
5493
}
5594

5695
func newMuxerMetrics() *muxerMetrics {
@@ -386,6 +425,7 @@ func initTunnelMetrics() *tunnelMetrics {
386425
regFail: registerFail,
387426
rpcFail: rpcFail,
388427
userHostnamesCounts: userHostnamesCounts,
428+
localConfigMetrics: newLocalConfigMetrics(),
389429
}
390430
}
391431

connection/quic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
111111

112112
func (q *QUICConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
113113
// This blocks until the control plane is done.
114-
err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions)
114+
err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions, q.orchestrator)
115115
if err != nil {
116116
// Not wrapping error here to be consistent with the http2 message.
117117
return err

0 commit comments

Comments
 (0)