Skip to content

Commit d68ff39

Browse files
committed
TUN-5698: Make ingress rules and warp routing dynamically configurable
1 parent 0571210 commit d68ff39

File tree

21 files changed

+978
-175
lines changed

21 files changed

+978
-175
lines changed

cmd/cloudflared/tunnel/cmd.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/cloudflare/cloudflared/ingress"
3232
"github.com/cloudflare/cloudflared/logger"
3333
"github.com/cloudflare/cloudflared/metrics"
34+
"github.com/cloudflare/cloudflared/orchestration"
3435
"github.com/cloudflare/cloudflared/signal"
3536
"github.com/cloudflare/cloudflared/supervisor"
3637
"github.com/cloudflare/cloudflared/tlsconfig"
@@ -353,7 +354,8 @@ func StartServer(
353354
errC <- metrics.ServeMetrics(metricsListener, ctx.Done(), readinessServer, quickTunnelURL, log)
354355
}()
355356

356-
if err := dynamicConfig.Ingress.StartOrigins(&wg, log, ctx.Done(), errC); err != nil {
357+
orchestrator, err := orchestration.NewOrchestrator(ctx, dynamicConfig, tunnelConfig.Tags, tunnelConfig.Log)
358+
if err != nil {
357359
return err
358360
}
359361

@@ -369,7 +371,7 @@ func StartServer(
369371
wg.Done()
370372
log.Info().Msg("Tunnel server stopped")
371373
}()
372-
errC <- supervisor.StartTunnelDaemon(ctx, tunnelConfig, dynamicConfig, connectedSignal, reconnectCh, graceShutdownC)
374+
errC <- supervisor.StartTunnelDaemon(ctx, tunnelConfig, orchestrator, connectedSignal, reconnectCh, graceShutdownC)
373375
}()
374376

375377
if isUIEnabled {

cmd/cloudflared/tunnel/configuration.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cloudflare/cloudflared/edgediscovery"
2424
"github.com/cloudflare/cloudflared/h2mux"
2525
"github.com/cloudflare/cloudflared/ingress"
26+
"github.com/cloudflare/cloudflared/orchestration"
2627
"github.com/cloudflare/cloudflared/supervisor"
2728
"github.com/cloudflare/cloudflared/tlsconfig"
2829
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
@@ -153,7 +154,7 @@ func prepareTunnelConfig(
153154
log, logTransport *zerolog.Logger,
154155
observer *connection.Observer,
155156
namedTunnel *connection.NamedTunnelProperties,
156-
) (*supervisor.TunnelConfig, *supervisor.DynamicConfig, error) {
157+
) (*supervisor.TunnelConfig, *orchestration.Config, error) {
157158
isNamedTunnel := namedTunnel != nil
158159

159160
configHostname := c.String("hostname")
@@ -292,7 +293,7 @@ func prepareTunnelConfig(
292293
ProtocolSelector: protocolSelector,
293294
EdgeTLSConfigs: edgeTLSConfigs,
294295
}
295-
dynamicConfig := &supervisor.DynamicConfig{
296+
dynamicConfig := &orchestration.Config{
296297
Ingress: &ingressRules,
297298
WarpRoutingEnabled: warpRoutingEnabled,
298299
}

connection/connection.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ const (
2525

2626
var switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols))
2727

28-
type ConfigManager interface {
29-
Update(version int32, config []byte) *pogs.UpdateConfigurationResponse
30-
GetOriginProxy() OriginProxy
28+
type Orchestrator interface {
29+
UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
30+
GetOriginProxy() (OriginProxy, error)
3131
}
3232

3333
type NamedTunnelProperties struct {

connection/connection_test.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@ import (
66
"io"
77
"math/rand"
88
"net/http"
9-
"net/url"
109
"testing"
1110
"time"
1211

1312
"github.com/rs/zerolog"
1413
"github.com/stretchr/testify/assert"
1514

16-
"github.com/cloudflare/cloudflared/ingress"
1715
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
1816
"github.com/cloudflare/cloudflared/websocket"
1917
)
@@ -24,15 +22,10 @@ const (
2422
)
2523

2624
var (
27-
unusedWarpRoutingService = (*ingress.WarpRoutingService)(nil)
28-
testConfigManager = &mockConfigManager{
25+
testOrchestrator = &mockOrchestrator{
2926
originProxy: &mockOriginProxy{},
3027
}
3128
log = zerolog.Nop()
32-
testOriginURL = &url.URL{
33-
Scheme: "https",
34-
Host: "connectiontest.argotunnel.com",
35-
}
3629
testLargeResp = make([]byte, largeFileSize)
3730
)
3831

@@ -44,18 +37,18 @@ type testRequest struct {
4437
isProxyError bool
4538
}
4639

47-
type mockConfigManager struct {
40+
type mockOrchestrator struct {
4841
originProxy OriginProxy
4942
}
5043

51-
func (*mockConfigManager) Update(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
44+
func (*mockOrchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
5245
return &tunnelpogs.UpdateConfigurationResponse{
5346
LastAppliedVersion: version,
5447
}
5548
}
5649

57-
func (mcr *mockConfigManager) GetOriginProxy() OriginProxy {
58-
return mcr.originProxy
50+
func (mcr *mockOrchestrator) GetOriginProxy() (OriginProxy, error) {
51+
return mcr.originProxy, nil
5952
}
6053

6154
type mockOriginProxy struct{}

connection/h2mux.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ const (
2222
)
2323

2424
type h2muxConnection struct {
25-
configManager ConfigManager
26-
gracePeriod time.Duration
27-
muxerConfig *MuxerConfig
28-
muxer *h2mux.Muxer
25+
orchestrator Orchestrator
26+
gracePeriod time.Duration
27+
muxerConfig *MuxerConfig
28+
muxer *h2mux.Muxer
2929
// connectionID is only used by metrics, and prometheus requires labels to be string
3030
connIndexStr string
3131
connIndex uint8
@@ -61,7 +61,7 @@ func (mc *MuxerConfig) H2MuxerConfig(h h2mux.MuxedStreamHandler, log *zerolog.Lo
6161

6262
// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
6363
func NewH2muxConnection(
64-
configManager ConfigManager,
64+
orchestrator Orchestrator,
6565
gracePeriod time.Duration,
6666
muxerConfig *MuxerConfig,
6767
edgeConn net.Conn,
@@ -70,7 +70,7 @@ func NewH2muxConnection(
7070
gracefulShutdownC <-chan struct{},
7171
) (*h2muxConnection, error, bool) {
7272
h := &h2muxConnection{
73-
configManager: configManager,
73+
orchestrator: orchestrator,
7474
gracePeriod: gracePeriod,
7575
muxerConfig: muxerConfig,
7676
connIndexStr: uint8ToString(connIndex),
@@ -227,7 +227,13 @@ func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error {
227227
sourceConnectionType = TypeWebsocket
228228
}
229229

230-
err := h.configManager.GetOriginProxy().ProxyHTTP(respWriter, req, sourceConnectionType == TypeWebsocket)
230+
originProxy, err := h.orchestrator.GetOriginProxy()
231+
if err != nil {
232+
respWriter.WriteErrorResponse()
233+
return err
234+
}
235+
236+
err = originProxy.ProxyHTTP(respWriter, req, sourceConnectionType == TypeWebsocket)
231237
if err != nil {
232238
respWriter.WriteErrorResponse()
233239
}

connection/h2mux_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
4848
}()
4949
var connIndex = uint8(0)
5050
testObserver := NewObserver(&log, &log, false)
51-
h2muxConn, err, _ := NewH2muxConnection(testConfigManager, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil)
51+
h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil)
5252
require.NoError(t, err)
5353
return h2muxConn, <-edgeMuxChan
5454
}

connection/http2.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
3030
// HTTP2Connection represents a net.Conn that uses HTTP2 frames to proxy traffic from the edge to cloudflared on the
3131
// origin.
3232
type HTTP2Connection struct {
33-
conn net.Conn
34-
server *http2.Server
35-
configManager ConfigManager
36-
connOptions *tunnelpogs.ConnectionOptions
37-
observer *Observer
38-
connIndex uint8
33+
conn net.Conn
34+
server *http2.Server
35+
orchestrator Orchestrator
36+
connOptions *tunnelpogs.ConnectionOptions
37+
observer *Observer
38+
connIndex uint8
3939
// newRPCClientFunc allows us to mock RPCs during testing
4040
newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
4141

@@ -49,7 +49,7 @@ type HTTP2Connection struct {
4949
// NewHTTP2Connection returns a new instance of HTTP2Connection.
5050
func NewHTTP2Connection(
5151
conn net.Conn,
52-
configManager ConfigManager,
52+
orchestrator Orchestrator,
5353
connOptions *tunnelpogs.ConnectionOptions,
5454
observer *Observer,
5555
connIndex uint8,
@@ -61,7 +61,7 @@ func NewHTTP2Connection(
6161
server: &http2.Server{
6262
MaxConcurrentStreams: MaxConcurrentStreams,
6363
},
64-
configManager: configManager,
64+
orchestrator: orchestrator,
6565
connOptions: connOptions,
6666
observer: observer,
6767
connIndex: connIndex,
@@ -106,6 +106,12 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
106106
return
107107
}
108108

109+
originProxy, err := c.orchestrator.GetOriginProxy()
110+
if err != nil {
111+
c.observer.log.Error().Msg(err.Error())
112+
return
113+
}
114+
109115
switch connType {
110116
case TypeControlStream:
111117
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil {
@@ -116,7 +122,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
116122

117123
case TypeWebsocket, TypeHTTP:
118124
stripWebsocketUpgradeHeader(r)
119-
if err := c.configManager.GetOriginProxy().ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil {
125+
if err := originProxy.ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil {
120126
err := fmt.Errorf("Failed to proxy HTTP: %w", err)
121127
c.log.Error().Err(err)
122128
respWriter.WriteErrorResponse()
@@ -131,7 +137,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
131137
}
132138

133139
rws := NewHTTPResponseReadWriterAcker(respWriter, r)
134-
if err := c.configManager.GetOriginProxy().ProxyTCP(r.Context(), rws, &TCPRequest{
140+
if err := originProxy.ProxyTCP(r.Context(), rws, &TCPRequest{
135141
Dest: host,
136142
CFRay: FindCfRayHeader(r),
137143
LBProbe: IsLBProbeRequest(r),

connection/http2_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func newTestHTTP2Connection() (*HTTP2Connection, net.Conn) {
4444
return NewHTTP2Connection(
4545
cfdConn,
4646
// OriginProxy is set in testConfigManager
47-
testConfigManager,
47+
testOrchestrator,
4848
&pogs.ConnectionOptions{},
4949
obs,
5050
connIndex,

connection/quic.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ const (
3636
type QUICConnection struct {
3737
session quic.Session
3838
logger *zerolog.Logger
39-
configManager ConfigManager
39+
orchestrator Orchestrator
4040
sessionManager datagramsession.Manager
4141
controlStreamHandler ControlStreamHandler
4242
connOptions *tunnelpogs.ConnectionOptions
@@ -47,7 +47,7 @@ func NewQUICConnection(
4747
quicConfig *quic.Config,
4848
edgeAddr net.Addr,
4949
tlsConfig *tls.Config,
50-
configManager ConfigManager,
50+
orchestrator Orchestrator,
5151
connOptions *tunnelpogs.ConnectionOptions,
5252
controlStreamHandler ControlStreamHandler,
5353
logger *zerolog.Logger,
@@ -66,7 +66,7 @@ func NewQUICConnection(
6666

6767
return &QUICConnection{
6868
session: session,
69-
configManager: configManager,
69+
orchestrator: orchestrator,
7070
logger: logger,
7171
sessionManager: sessionManager,
7272
controlStreamHandler: controlStreamHandler,
@@ -175,6 +175,10 @@ func (q *QUICConnection) handleDataStream(stream *quicpogs.RequestServerStream)
175175
return err
176176
}
177177

178+
originProxy, err := q.orchestrator.GetOriginProxy()
179+
if err != nil {
180+
return err
181+
}
178182
switch connectRequest.Type {
179183
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
180184
req, err := buildHTTPRequest(connectRequest, stream)
@@ -183,10 +187,10 @@ func (q *QUICConnection) handleDataStream(stream *quicpogs.RequestServerStream)
183187
}
184188

185189
w := newHTTPResponseAdapter(stream)
186-
return q.configManager.GetOriginProxy().ProxyHTTP(w, req, connectRequest.Type == quicpogs.ConnectionTypeWebsocket)
190+
return originProxy.ProxyHTTP(w, req, connectRequest.Type == quicpogs.ConnectionTypeWebsocket)
187191
case quicpogs.ConnectionTypeTCP:
188192
rwa := &streamReadWriteAcker{stream}
189-
return q.configManager.GetOriginProxy().ProxyTCP(context.Background(), rwa, &TCPRequest{Dest: connectRequest.Dest})
193+
return originProxy.ProxyTCP(context.Background(), rwa, &TCPRequest{Dest: connectRequest.Dest})
190194
}
191195
return nil
192196
}

connection/quic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ func testQUICConnection(udpListenerAddr net.Addr, t *testing.T) *QUICConnection
632632
testQUICConfig,
633633
udpListenerAddr,
634634
tlsClientConfig,
635-
&mockConfigManager{originProxy: &mockOriginProxyWithRequest{}},
635+
&mockOrchestrator{originProxy: &mockOriginProxyWithRequest{}},
636636
&tunnelpogs.ConnectionOptions{},
637637
fakeControlStream{},
638638
&log,

0 commit comments

Comments
 (0)