Skip to content

Commit 12ad264

Browse files
TUN-4866: Add Control Stream for QUIC
This commit adds support to Register and Unregister Connections via RPC on the QUIC transport protocol
1 parent 1082ac1 commit 12ad264

File tree

8 files changed

+390
-96
lines changed

8 files changed

+390
-96
lines changed

cmd/cloudflared/tunnel/configuration.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,17 @@ func prepareTunnelConfig(
248248

249249
edgeTLSConfigs := make(map[connection.Protocol]*tls.Config, len(connection.ProtocolList))
250250
for _, p := range connection.ProtocolList {
251-
edgeTLSConfig, err := tlsconfig.CreateTunnelConfig(c, p.ServerName())
251+
tlsSettings := p.TLSSettings()
252+
if tlsSettings == nil {
253+
return nil, ingress.Ingress{}, fmt.Errorf("%s has unknown TLS settings", p)
254+
}
255+
edgeTLSConfig, err := tlsconfig.CreateTunnelConfig(c, tlsSettings.ServerName)
252256
if err != nil {
253257
return nil, ingress.Ingress{}, errors.Wrap(err, "unable to create TLS config to connect with edge")
254258
}
259+
if len(edgeTLSConfig.NextProtos) > 0 {
260+
edgeTLSConfig.NextProtos = tlsSettings.NextProtos
261+
}
255262
edgeTLSConfigs[p] = edgeTLSConfig
256263
}
257264

connection/control.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package connection
2+
3+
import (
4+
"context"
5+
"io"
6+
"time"
7+
8+
"github.com/rs/zerolog"
9+
10+
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
11+
)
12+
13+
// RPCClientFunc derives a named tunnel rpc client that can then be used to register and unregister connections.
14+
type RPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
15+
16+
type controlStream struct {
17+
observer *Observer
18+
19+
connectedFuse ConnectedFuse
20+
namedTunnelConfig *NamedTunnelConfig
21+
connIndex uint8
22+
23+
newRPCClientFunc RPCClientFunc
24+
25+
gracefulShutdownC <-chan struct{}
26+
gracePeriod time.Duration
27+
stoppedGracefully bool
28+
}
29+
30+
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
31+
type ControlStreamHandler interface {
32+
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error
33+
IsStopped() bool
34+
}
35+
36+
// NewControlStream returns a new instance of ControlStreamHandler
37+
func NewControlStream(
38+
observer *Observer,
39+
connectedFuse ConnectedFuse,
40+
namedTunnelConfig *NamedTunnelConfig,
41+
connIndex uint8,
42+
newRPCClientFunc RPCClientFunc,
43+
gracefulShutdownC <-chan struct{},
44+
gracePeriod time.Duration,
45+
) ControlStreamHandler {
46+
if newRPCClientFunc == nil {
47+
newRPCClientFunc = newRegistrationRPCClient
48+
}
49+
return &controlStream{
50+
observer: observer,
51+
connectedFuse: connectedFuse,
52+
namedTunnelConfig: namedTunnelConfig,
53+
newRPCClientFunc: newRPCClientFunc,
54+
connIndex: connIndex,
55+
gracefulShutdownC: gracefulShutdownC,
56+
gracePeriod: gracePeriod,
57+
}
58+
}
59+
60+
func (c *controlStream) ServeControlStream(
61+
ctx context.Context,
62+
rw io.ReadWriteCloser,
63+
connOptions *tunnelpogs.ConnectionOptions,
64+
) error {
65+
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
66+
defer rpcClient.Close()
67+
68+
if err := rpcClient.RegisterConnection(ctx, c.namedTunnelConfig, connOptions, c.connIndex, c.observer); err != nil {
69+
return err
70+
}
71+
c.connectedFuse.Connected()
72+
73+
// wait for connection termination or start of graceful shutdown
74+
select {
75+
case <-ctx.Done():
76+
break
77+
case <-c.gracefulShutdownC:
78+
c.stoppedGracefully = true
79+
}
80+
81+
c.observer.sendUnregisteringEvent(c.connIndex)
82+
rpcClient.GracefulShutdown(ctx, c.gracePeriod)
83+
c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
84+
return nil
85+
}
86+
87+
func (c *controlStream) IsStopped() bool {
88+
return c.stoppedGracefully
89+
}

connection/http2.go

Lines changed: 21 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -30,52 +30,44 @@ var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
3030
// HTTP2Connection represents a net.Conn that uses HTTP2 frames to proxy traffic from the edge to cloudflared on the
3131
// origin.
3232
type HTTP2Connection struct {
33-
conn net.Conn
34-
server *http2.Server
35-
config *Config
36-
namedTunnel *NamedTunnelConfig
37-
connOptions *tunnelpogs.ConnectionOptions
38-
observer *Observer
39-
connIndexStr string
40-
connIndex uint8
33+
conn net.Conn
34+
server *http2.Server
35+
config *Config
36+
connOptions *tunnelpogs.ConnectionOptions
37+
observer *Observer
38+
connIndex uint8
4139
// newRPCClientFunc allows us to mock RPCs during testing
4240
newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
4341

44-
log *zerolog.Logger
45-
activeRequestsWG sync.WaitGroup
46-
connectedFuse ConnectedFuse
47-
gracefulShutdownC <-chan struct{}
48-
stoppedGracefully bool
49-
controlStreamErr error // result of running control stream handler
42+
log *zerolog.Logger
43+
activeRequestsWG sync.WaitGroup
44+
controlStreamHandler ControlStreamHandler
45+
stoppedGracefully bool
46+
controlStreamErr error // result of running control stream handler
5047
}
5148

5249
// NewHTTP2Connection returns a new instance of HTTP2Connection.
5350
func NewHTTP2Connection(
5451
conn net.Conn,
5552
config *Config,
56-
namedTunnelConfig *NamedTunnelConfig,
5753
connOptions *tunnelpogs.ConnectionOptions,
5854
observer *Observer,
5955
connIndex uint8,
60-
connectedFuse ConnectedFuse,
56+
controlStreamHandler ControlStreamHandler,
6157
log *zerolog.Logger,
62-
gracefulShutdownC <-chan struct{},
6358
) *HTTP2Connection {
6459
return &HTTP2Connection{
6560
conn: conn,
6661
server: &http2.Server{
6762
MaxConcurrentStreams: math.MaxUint32,
6863
},
69-
config: config,
70-
namedTunnel: namedTunnelConfig,
71-
connOptions: connOptions,
72-
observer: observer,
73-
connIndexStr: uint8ToString(connIndex),
74-
connIndex: connIndex,
75-
newRPCClientFunc: newRegistrationRPCClient,
76-
connectedFuse: connectedFuse,
77-
log: log,
78-
gracefulShutdownC: gracefulShutdownC,
64+
config: config,
65+
connOptions: connOptions,
66+
observer: observer,
67+
connIndex: connIndex,
68+
newRPCClientFunc: newRegistrationRPCClient,
69+
controlStreamHandler: controlStreamHandler,
70+
log: log,
7971
}
8072
}
8173

@@ -91,7 +83,7 @@ func (c *HTTP2Connection) Serve(ctx context.Context) error {
9183
})
9284

9385
switch {
94-
case c.stoppedGracefully:
86+
case c.controlStreamHandler.IsStopped():
9587
return nil
9688
case c.controlStreamErr != nil:
9789
return c.controlStreamErr
@@ -116,7 +108,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
116108

117109
switch connType {
118110
case TypeControlStream:
119-
if err := c.serveControlStream(r.Context(), respWriter); err != nil {
111+
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil {
120112
c.controlStreamErr = err
121113
c.log.Error().Err(err)
122114
respWriter.WriteErrorResponse()
@@ -154,29 +146,6 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
154146
}
155147
}
156148

157-
func (c *HTTP2Connection) serveControlStream(ctx context.Context, respWriter *http2RespWriter) error {
158-
rpcClient := c.newRPCClientFunc(ctx, respWriter, c.observer.log)
159-
defer rpcClient.Close()
160-
161-
if err := rpcClient.RegisterConnection(ctx, c.namedTunnel, c.connOptions, c.connIndex, c.observer); err != nil {
162-
return err
163-
}
164-
c.connectedFuse.Connected()
165-
166-
// wait for connection termination or start of graceful shutdown
167-
select {
168-
case <-ctx.Done():
169-
break
170-
case <-c.gracefulShutdownC:
171-
c.stoppedGracefully = true
172-
}
173-
174-
c.observer.sendUnregisteringEvent(c.connIndex)
175-
rpcClient.GracefulShutdown(ctx, c.config.GracePeriod)
176-
c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
177-
return nil
178-
}
179-
180149
func (c *HTTP2Connection) close() {
181150
// Wait for all serve HTTP handlers to return
182151
c.activeRequestsWG.Wait()

connection/http2_test.go

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,24 @@ func newTestHTTP2Connection() (*HTTP2Connection, net.Conn) {
3030
edgeConn, originConn := net.Pipe()
3131
var connIndex = uint8(0)
3232
log := zerolog.Nop()
33+
obs := NewObserver(&log, &log, false)
34+
controlStream := NewControlStream(
35+
obs,
36+
mockConnectedFuse{},
37+
&NamedTunnelConfig{},
38+
connIndex,
39+
nil,
40+
nil,
41+
1*time.Second,
42+
)
3343
return NewHTTP2Connection(
3444
originConn,
3545
testConfig,
36-
&NamedTunnelConfig{},
3746
&pogs.ConnectionOptions{},
38-
NewObserver(&log, &log, false),
47+
obs,
3948
connIndex,
40-
mockConnectedFuse{},
49+
controlStream,
4150
&log,
42-
nil,
4351
), edgeConn
4452
}
4553

@@ -225,7 +233,18 @@ func TestServeControlStream(t *testing.T) {
225233
registered: make(chan struct{}),
226234
unregistered: make(chan struct{}),
227235
}
228-
http2Conn.newRPCClientFunc = rpcClientFactory.newMockRPCClient
236+
237+
obs := NewObserver(&log, &log, false)
238+
controlStream := NewControlStream(
239+
obs,
240+
mockConnectedFuse{},
241+
&NamedTunnelConfig{},
242+
1,
243+
rpcClientFactory.newMockRPCClient,
244+
nil,
245+
1*time.Second,
246+
)
247+
http2Conn.controlStreamHandler = controlStream
229248

230249
ctx, cancel := context.WithCancel(context.Background())
231250
var wg sync.WaitGroup
@@ -264,7 +283,18 @@ func TestFailRegistration(t *testing.T) {
264283
registered: make(chan struct{}),
265284
unregistered: make(chan struct{}),
266285
}
267-
http2Conn.newRPCClientFunc = rpcClientFactory.newMockRPCClient
286+
287+
obs := NewObserver(&log, &log, false)
288+
controlStream := NewControlStream(
289+
obs,
290+
mockConnectedFuse{},
291+
&NamedTunnelConfig{},
292+
http2Conn.connIndex,
293+
rpcClientFactory.newMockRPCClient,
294+
nil,
295+
1*time.Second,
296+
)
297+
http2Conn.controlStreamHandler = controlStream
268298

269299
ctx, cancel := context.WithCancel(context.Background())
270300
var wg sync.WaitGroup
@@ -297,10 +327,21 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
297327
unregistered: make(chan struct{}),
298328
}
299329
events := &eventCollectorSink{}
300-
http2Conn.newRPCClientFunc = rpcClientFactory.newMockRPCClient
301-
http2Conn.observer.RegisterSink(events)
330+
302331
shutdownC := make(chan struct{})
303-
http2Conn.gracefulShutdownC = shutdownC
332+
obs := NewObserver(&log, &log, false)
333+
obs.RegisterSink(events)
334+
controlStream := NewControlStream(
335+
obs,
336+
mockConnectedFuse{},
337+
&NamedTunnelConfig{},
338+
http2Conn.connIndex,
339+
rpcClientFactory.newMockRPCClient,
340+
shutdownC,
341+
1*time.Second,
342+
)
343+
344+
http2Conn.controlStreamHandler = controlStream
304345

305346
ctx, cancel := context.WithCancel(context.Background())
306347
var wg sync.WaitGroup
@@ -339,7 +380,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
339380
case <-time.Tick(time.Second):
340381
t.Fatal("timeout out waiting for unregistered signal")
341382
}
342-
assert.True(t, http2Conn.stoppedGracefully)
383+
assert.True(t, controlStream.IsStopped())
343384

344385
cancel()
345386
wg.Wait()

0 commit comments

Comments
 (0)