Skip to content

Commit e22422a

Browse files
committed
TUN-5749: Refactor cloudflared to pave way for reconfigurable ingress
- Split origin into supervisor and proxy packages - Create configManager to handle dynamic config
1 parent ff4cfed commit e22422a

33 files changed

+318
-221
lines changed

cmd/cloudflared/tunnel/cmd.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ import (
3131
"github.com/cloudflare/cloudflared/ingress"
3232
"github.com/cloudflare/cloudflared/logger"
3333
"github.com/cloudflare/cloudflared/metrics"
34-
"github.com/cloudflare/cloudflared/origin"
3534
"github.com/cloudflare/cloudflared/signal"
35+
"github.com/cloudflare/cloudflared/supervisor"
3636
"github.com/cloudflare/cloudflared/tlsconfig"
3737
"github.com/cloudflare/cloudflared/tunneldns"
3838
)
@@ -223,7 +223,7 @@ func routeFromFlag(c *cli.Context) (route cfapi.HostnameRoute, ok bool) {
223223
func StartServer(
224224
c *cli.Context,
225225
info *cliutil.BuildInfo,
226-
namedTunnel *connection.NamedTunnelConfig,
226+
namedTunnel *connection.NamedTunnelProperties,
227227
log *zerolog.Logger,
228228
isUIEnabled bool,
229229
) error {
@@ -333,7 +333,7 @@ func StartServer(
333333
observer.SendURL(quickTunnelURL)
334334
}
335335

336-
tunnelConfig, ingressRules, err := prepareTunnelConfig(c, info, log, logTransport, observer, namedTunnel)
336+
tunnelConfig, dynamicConfig, err := prepareTunnelConfig(c, info, log, logTransport, observer, namedTunnel)
337337
if err != nil {
338338
log.Err(err).Msg("Couldn't start tunnel")
339339
return err
@@ -353,11 +353,11 @@ func StartServer(
353353
errC <- metrics.ServeMetrics(metricsListener, ctx.Done(), readinessServer, quickTunnelURL, log)
354354
}()
355355

356-
if err := ingressRules.StartOrigins(&wg, log, ctx.Done(), errC); err != nil {
356+
if err := dynamicConfig.Ingress.StartOrigins(&wg, log, ctx.Done(), errC); err != nil {
357357
return err
358358
}
359359

360-
reconnectCh := make(chan origin.ReconnectSignal, 1)
360+
reconnectCh := make(chan supervisor.ReconnectSignal, 1)
361361
if c.IsSet("stdin-control") {
362362
log.Info().Msg("Enabling control through stdin")
363363
go stdinControl(reconnectCh, log)
@@ -369,15 +369,15 @@ func StartServer(
369369
wg.Done()
370370
log.Info().Msg("Tunnel server stopped")
371371
}()
372-
errC <- origin.StartTunnelDaemon(ctx, tunnelConfig, connectedSignal, reconnectCh, graceShutdownC)
372+
errC <- supervisor.StartTunnelDaemon(ctx, tunnelConfig, dynamicConfig, connectedSignal, reconnectCh, graceShutdownC)
373373
}()
374374

375375
if isUIEnabled {
376376
tunnelUI := ui.NewUIModel(
377377
info.Version(),
378378
hostname,
379379
metricsListener.Addr().String(),
380-
&ingressRules,
380+
dynamicConfig.Ingress,
381381
tunnelConfig.HAConnections,
382382
)
383383
app := tunnelUI.Launch(ctx, log, logTransport)
@@ -998,7 +998,7 @@ func configureProxyDNSFlags(shouldHide bool) []cli.Flag {
998998
}
999999
}
10001000

1001-
func stdinControl(reconnectCh chan origin.ReconnectSignal, log *zerolog.Logger) {
1001+
func stdinControl(reconnectCh chan supervisor.ReconnectSignal, log *zerolog.Logger) {
10021002
for {
10031003
scanner := bufio.NewScanner(os.Stdin)
10041004
for scanner.Scan() {
@@ -1009,7 +1009,7 @@ func stdinControl(reconnectCh chan origin.ReconnectSignal, log *zerolog.Logger)
10091009
case "":
10101010
break
10111011
case "reconnect":
1012-
var reconnect origin.ReconnectSignal
1012+
var reconnect supervisor.ReconnectSignal
10131013
if len(parts) > 1 {
10141014
var err error
10151015
if reconnect.Delay, err = time.ParseDuration(parts[1]); err != nil {

cmd/cloudflared/tunnel/configuration.go

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/cloudflare/cloudflared/edgediscovery"
2424
"github.com/cloudflare/cloudflared/h2mux"
2525
"github.com/cloudflare/cloudflared/ingress"
26-
"github.com/cloudflare/cloudflared/origin"
26+
"github.com/cloudflare/cloudflared/supervisor"
2727
"github.com/cloudflare/cloudflared/tlsconfig"
2828
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
2929
"github.com/cloudflare/cloudflared/validation"
@@ -87,7 +87,7 @@ func logClientOptions(c *cli.Context, log *zerolog.Logger) {
8787
}
8888
}
8989

90-
func dnsProxyStandAlone(c *cli.Context, namedTunnel *connection.NamedTunnelConfig) bool {
90+
func dnsProxyStandAlone(c *cli.Context, namedTunnel *connection.NamedTunnelProperties) bool {
9191
return c.IsSet("proxy-dns") && (!c.IsSet("hostname") && !c.IsSet("tag") && !c.IsSet("hello-world") && namedTunnel == nil)
9292
}
9393

@@ -152,44 +152,44 @@ func prepareTunnelConfig(
152152
info *cliutil.BuildInfo,
153153
log, logTransport *zerolog.Logger,
154154
observer *connection.Observer,
155-
namedTunnel *connection.NamedTunnelConfig,
156-
) (*origin.TunnelConfig, ingress.Ingress, error) {
155+
namedTunnel *connection.NamedTunnelProperties,
156+
) (*supervisor.TunnelConfig, *supervisor.DynamicConfig, error) {
157157
isNamedTunnel := namedTunnel != nil
158158

159159
configHostname := c.String("hostname")
160160
hostname, err := validation.ValidateHostname(configHostname)
161161
if err != nil {
162162
log.Err(err).Str(LogFieldHostname, configHostname).Msg("Invalid hostname")
163-
return nil, ingress.Ingress{}, errors.Wrap(err, "Invalid hostname")
163+
return nil, nil, errors.Wrap(err, "Invalid hostname")
164164
}
165165
clientID := c.String("id")
166166
if !c.IsSet("id") {
167167
clientID, err = generateRandomClientID(log)
168168
if err != nil {
169-
return nil, ingress.Ingress{}, err
169+
return nil, nil, err
170170
}
171171
}
172172

173173
tags, err := NewTagSliceFromCLI(c.StringSlice("tag"))
174174
if err != nil {
175175
log.Err(err).Msg("Tag parse failure")
176-
return nil, ingress.Ingress{}, errors.Wrap(err, "Tag parse failure")
176+
return nil, nil, errors.Wrap(err, "Tag parse failure")
177177
}
178178

179179
tags = append(tags, tunnelpogs.Tag{Name: "ID", Value: clientID})
180180

181181
var (
182182
ingressRules ingress.Ingress
183-
classicTunnel *connection.ClassicTunnelConfig
183+
classicTunnel *connection.ClassicTunnelProperties
184184
)
185185
cfg := config.GetConfiguration()
186186
if isNamedTunnel {
187187
clientUUID, err := uuid.NewRandom()
188188
if err != nil {
189-
return nil, ingress.Ingress{}, errors.Wrap(err, "can't generate connector UUID")
189+
return nil, nil, errors.Wrap(err, "can't generate connector UUID")
190190
}
191191
log.Info().Msgf("Generated Connector ID: %s", clientUUID)
192-
features := append(c.StringSlice("features"), origin.FeatureSerializedHeaders)
192+
features := append(c.StringSlice("features"), supervisor.FeatureSerializedHeaders)
193193
namedTunnel.Client = tunnelpogs.ClientInfo{
194194
ClientID: clientUUID[:],
195195
Features: dedup(features),
@@ -198,10 +198,10 @@ func prepareTunnelConfig(
198198
}
199199
ingressRules, err = ingress.ParseIngress(cfg)
200200
if err != nil && err != ingress.ErrNoIngressRules {
201-
return nil, ingress.Ingress{}, err
201+
return nil, nil, err
202202
}
203203
if !ingressRules.IsEmpty() && c.IsSet("url") {
204-
return nil, ingress.Ingress{}, ingress.ErrURLIncompatibleWithIngress
204+
return nil, nil, ingress.ErrURLIncompatibleWithIngress
205205
}
206206
} else {
207207

@@ -212,10 +212,10 @@ func prepareTunnelConfig(
212212

213213
originCert, err := getOriginCert(originCertPath, &originCertLog)
214214
if err != nil {
215-
return nil, ingress.Ingress{}, errors.Wrap(err, "Error getting origin cert")
215+
return nil, nil, errors.Wrap(err, "Error getting origin cert")
216216
}
217217

218-
classicTunnel = &connection.ClassicTunnelConfig{
218+
classicTunnel = &connection.ClassicTunnelProperties{
219219
Hostname: hostname,
220220
OriginCert: originCert,
221221
// turn off use of reconnect token and auth refresh when using named tunnels
@@ -227,48 +227,36 @@ func prepareTunnelConfig(
227227
if ingressRules.IsEmpty() {
228228
ingressRules, err = ingress.NewSingleOrigin(c, !isNamedTunnel)
229229
if err != nil {
230-
return nil, ingress.Ingress{}, err
230+
return nil, nil, err
231231
}
232232
}
233233

234-
var warpRoutingService *ingress.WarpRoutingService
235234
warpRoutingEnabled := isWarpRoutingEnabled(cfg.WarpRouting, isNamedTunnel)
236-
if warpRoutingEnabled {
237-
warpRoutingService = ingress.NewWarpRoutingService()
238-
log.Info().Msgf("Warp-routing is enabled")
239-
}
240-
241-
protocolSelector, err := connection.NewProtocolSelector(c.String("protocol"), warpRoutingEnabled, namedTunnel, edgediscovery.ProtocolPercentage, origin.ResolveTTL, log)
235+
protocolSelector, err := connection.NewProtocolSelector(c.String("protocol"), warpRoutingEnabled, namedTunnel, edgediscovery.ProtocolPercentage, supervisor.ResolveTTL, log)
242236
if err != nil {
243-
return nil, ingress.Ingress{}, err
237+
return nil, nil, err
244238
}
245239
log.Info().Msgf("Initial protocol %s", protocolSelector.Current())
246240

247241
edgeTLSConfigs := make(map[connection.Protocol]*tls.Config, len(connection.ProtocolList))
248242
for _, p := range connection.ProtocolList {
249243
tlsSettings := p.TLSSettings()
250244
if tlsSettings == nil {
251-
return nil, ingress.Ingress{}, fmt.Errorf("%s has unknown TLS settings", p)
245+
return nil, nil, fmt.Errorf("%s has unknown TLS settings", p)
252246
}
253247
edgeTLSConfig, err := tlsconfig.CreateTunnelConfig(c, tlsSettings.ServerName)
254248
if err != nil {
255-
return nil, ingress.Ingress{}, errors.Wrap(err, "unable to create TLS config to connect with edge")
249+
return nil, nil, errors.Wrap(err, "unable to create TLS config to connect with edge")
256250
}
257251
if len(tlsSettings.NextProtos) > 0 {
258252
edgeTLSConfig.NextProtos = tlsSettings.NextProtos
259253
}
260254
edgeTLSConfigs[p] = edgeTLSConfig
261255
}
262256

263-
originProxy := origin.NewOriginProxy(ingressRules, warpRoutingService, tags, log)
264257
gracePeriod, err := gracePeriod(c)
265258
if err != nil {
266-
return nil, ingress.Ingress{}, err
267-
}
268-
connectionConfig := &connection.Config{
269-
OriginProxy: originProxy,
270-
GracePeriod: gracePeriod,
271-
ReplaceExisting: c.Bool("force"),
259+
return nil, nil, err
272260
}
273261
muxerConfig := &connection.MuxerConfig{
274262
HeartbeatInterval: c.Duration("heartbeat-interval"),
@@ -279,21 +267,22 @@ func prepareTunnelConfig(
279267
MetricsUpdateFreq: c.Duration("metrics-update-freq"),
280268
}
281269

282-
return &origin.TunnelConfig{
283-
ConnectionConfig: connectionConfig,
284-
OSArch: info.OSArch(),
285-
ClientID: clientID,
286-
EdgeAddrs: c.StringSlice("edge"),
287-
Region: c.String("region"),
288-
HAConnections: c.Int("ha-connections"),
289-
IncidentLookup: origin.NewIncidentLookup(),
290-
IsAutoupdated: c.Bool("is-autoupdated"),
291-
LBPool: c.String("lb-pool"),
292-
Tags: tags,
293-
Log: log,
294-
LogTransport: logTransport,
295-
Observer: observer,
296-
ReportedVersion: info.Version(),
270+
tunnelConfig := &supervisor.TunnelConfig{
271+
GracePeriod: gracePeriod,
272+
ReplaceExisting: c.Bool("force"),
273+
OSArch: info.OSArch(),
274+
ClientID: clientID,
275+
EdgeAddrs: c.StringSlice("edge"),
276+
Region: c.String("region"),
277+
HAConnections: c.Int("ha-connections"),
278+
IncidentLookup: supervisor.NewIncidentLookup(),
279+
IsAutoupdated: c.Bool("is-autoupdated"),
280+
LBPool: c.String("lb-pool"),
281+
Tags: tags,
282+
Log: log,
283+
LogTransport: logTransport,
284+
Observer: observer,
285+
ReportedVersion: info.Version(),
297286
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
298287
Retries: uint(c.Int("retries")),
299288
RunFromTerminal: isRunningFromTerminal(),
@@ -302,7 +291,12 @@ func prepareTunnelConfig(
302291
MuxerConfig: muxerConfig,
303292
ProtocolSelector: protocolSelector,
304293
EdgeTLSConfigs: edgeTLSConfigs,
305-
}, ingressRules, nil
294+
}
295+
dynamicConfig := &supervisor.DynamicConfig{
296+
Ingress: &ingressRules,
297+
WarpRoutingEnabled: warpRoutingEnabled,
298+
}
299+
return tunnelConfig, dynamicConfig, nil
306300
}
307301

308302
func gracePeriod(c *cli.Context) (time.Duration, error) {

cmd/cloudflared/tunnel/quick_tunnel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func RunQuickTunnel(sc *subcommandContext) error {
7777
return StartServer(
7878
sc.c,
7979
buildInfo,
80-
&connection.NamedTunnelConfig{Credentials: credentials, QuickTunnelUrl: data.Result.Hostname},
80+
&connection.NamedTunnelProperties{Credentials: credentials, QuickTunnelUrl: data.Result.Hostname},
8181
sc.log,
8282
sc.isUIEnabled,
8383
)

cmd/cloudflared/tunnel/subcommand_context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (sc *subcommandContext) run(tunnelID uuid.UUID) error {
304304
return StartServer(
305305
sc.c,
306306
buildInfo,
307-
&connection.NamedTunnelConfig{Credentials: credentials},
307+
&connection.NamedTunnelProperties{Credentials: credentials},
308308
sc.log,
309309
sc.isUIEnabled,
310310
)

connection/connection.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,12 @@ const (
2525

2626
var switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols))
2727

28-
type Config struct {
29-
OriginProxy OriginProxy
30-
GracePeriod time.Duration
31-
ReplaceExisting bool
28+
type ConfigManager interface {
29+
Update(version int32, config []byte) *pogs.UpdateConfigurationResponse
30+
GetOriginProxy() OriginProxy
3231
}
3332

34-
type NamedTunnelConfig struct {
33+
type NamedTunnelProperties struct {
3534
Credentials Credentials
3635
Client pogs.ClientInfo
3736
QuickTunnelUrl string
@@ -52,7 +51,7 @@ func (c *Credentials) Auth() pogs.TunnelAuth {
5251
}
5352
}
5453

55-
type ClassicTunnelConfig struct {
54+
type ClassicTunnelProperties struct {
5655
Hostname string
5756
OriginCert []byte
5857
// feature-flag to use new edge reconnect tokens

connection/connection_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,19 @@ import (
1414
"github.com/stretchr/testify/assert"
1515

1616
"github.com/cloudflare/cloudflared/ingress"
17+
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
1718
"github.com/cloudflare/cloudflared/websocket"
1819
)
1920

2021
const (
21-
largeFileSize = 2 * 1024 * 1024
22+
largeFileSize = 2 * 1024 * 1024
23+
testGracePeriod = time.Millisecond * 100
2224
)
2325

2426
var (
2527
unusedWarpRoutingService = (*ingress.WarpRoutingService)(nil)
26-
testConfig = &Config{
27-
OriginProxy: &mockOriginProxy{},
28-
GracePeriod: time.Millisecond * 100,
28+
testConfigManager = &mockConfigManager{
29+
originProxy: &mockOriginProxy{},
2930
}
3031
log = zerolog.Nop()
3132
testOriginURL = &url.URL{
@@ -43,6 +44,20 @@ type testRequest struct {
4344
isProxyError bool
4445
}
4546

47+
type mockConfigManager struct {
48+
originProxy OriginProxy
49+
}
50+
51+
func (*mockConfigManager) Update(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
52+
return &tunnelpogs.UpdateConfigurationResponse{
53+
LastAppliedVersion: version,
54+
}
55+
}
56+
57+
func (mcr *mockConfigManager) GetOriginProxy() OriginProxy {
58+
return mcr.originProxy
59+
}
60+
4661
type mockOriginProxy struct{}
4762

4863
func (moc *mockOriginProxy) ProxyHTTP(

0 commit comments

Comments
 (0)