Skip to content

Commit 8ca0d86

Browse files
committed
TUN-3863: Consolidate header handling logic in the connection package; move headers definitions from h2mux to packages that manage them; cleanup header conversions
All header transformation code from h2mux has been consolidated in the connection package since it's used by both h2mux and http2 logic. Exported headers used by proxying between edge and cloudflared so then can be shared by tunnel service on the edge. Moved access-related headers to corresponding packages that have the code that sets/uses these headers. Removed tunnel hostname tracking from h2mux since it wasn't used by anything. We will continue to set the tunnel hostname header from the edge for backward compatibilty, but it's no longer used by cloudflared. Move bastion-related logic into carrier package, untangled dependencies between carrier, origin, and websocket packages.
1 parent ebf5292 commit 8ca0d86

29 files changed

+540
-712
lines changed

carrier/carrier.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,25 @@ package carrier
55

66
import (
77
"crypto/tls"
8+
"fmt"
89
"io"
910
"net"
1011
"net/http"
12+
"net/url"
1113
"os"
1214
"strings"
1315

1416
"github.com/pkg/errors"
1517
"github.com/rs/zerolog"
1618

17-
"github.com/cloudflare/cloudflared/h2mux"
1819
"github.com/cloudflare/cloudflared/token"
1920
)
2021

21-
const LogFieldOriginURL = "originURL"
22+
const (
23+
LogFieldOriginURL = "originURL"
24+
CFAccessTokenHeader = "Cf-Access-Token"
25+
cfJumpDestinationHeader = "Cf-Access-Jump-Destination"
26+
)
2227

2328
type StartOptions struct {
2429
AppInfo *token.AppInfo
@@ -32,15 +37,11 @@ type StartOptions struct {
3237
type Connection interface {
3338
// ServeStream is used to forward data from the client to the edge
3439
ServeStream(*StartOptions, io.ReadWriter) error
35-
36-
// StartServer is used to listen for incoming connections from the edge to the origin
37-
StartServer(net.Listener, string, <-chan struct{}) error
3840
}
3941

4042
// StdinoutStream is empty struct for wrapping stdin/stdout
4143
// into a single ReadWriter
42-
type StdinoutStream struct {
43-
}
44+
type StdinoutStream struct{}
4445

4546
// Read will read from Stdin
4647
func (c *StdinoutStream) Read(p []byte) (int, error) {
@@ -149,7 +150,7 @@ func BuildAccessRequest(options *StartOptions, log *zerolog.Logger) (*http.Reque
149150
if err != nil {
150151
return nil, err
151152
}
152-
originRequest.Header.Set(h2mux.CFAccessTokenHeader, token)
153+
originRequest.Header.Set(CFAccessTokenHeader, token)
153154

154155
for k, v := range options.Headers {
155156
if len(v) >= 1 {
@@ -159,3 +160,26 @@ func BuildAccessRequest(options *StartOptions, log *zerolog.Logger) (*http.Reque
159160

160161
return originRequest, nil
161162
}
163+
164+
func SetBastionDest(header http.Header, destination string) {
165+
if destination != "" {
166+
header.Set(cfJumpDestinationHeader, destination)
167+
}
168+
}
169+
170+
func ResolveBastionDest(r *http.Request) (string, error) {
171+
jumpDestination := r.Header.Get(cfJumpDestinationHeader)
172+
if jumpDestination == "" {
173+
return "", fmt.Errorf("Did not receive final destination from client. The --destination flag is likely not set on the client side")
174+
}
175+
// Strip scheme and path set by client. Without a scheme
176+
// Parsing a hostname and path without scheme might not return an error due to parsing ambiguities
177+
if jumpURL, err := url.Parse(jumpDestination); err == nil && jumpURL.Host != "" {
178+
return removePath(jumpURL.Host), nil
179+
}
180+
return removePath(jumpDestination), nil
181+
}
182+
183+
func removePath(dest string) string {
184+
return strings.SplitN(dest, "/", 2)[0]
185+
}

carrier/carrier_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,99 @@ func testRequest(t *testing.T, url string, stream io.ReadWriter) *http.Request {
156156

157157
return req
158158
}
159+
160+
func TestBastionDestination(t *testing.T) {
161+
tests := []struct {
162+
name string
163+
header http.Header
164+
expectedDest string
165+
wantErr bool
166+
}{
167+
{
168+
name: "hostname destination",
169+
header: http.Header{
170+
cfJumpDestinationHeader: []string{"localhost"},
171+
},
172+
expectedDest: "localhost",
173+
},
174+
{
175+
name: "hostname destination with port",
176+
header: http.Header{
177+
cfJumpDestinationHeader: []string{"localhost:9000"},
178+
},
179+
expectedDest: "localhost:9000",
180+
},
181+
{
182+
name: "hostname destination with scheme and port",
183+
header: http.Header{
184+
cfJumpDestinationHeader: []string{"ssh://localhost:9000"},
185+
},
186+
expectedDest: "localhost:9000",
187+
},
188+
{
189+
name: "full hostname url",
190+
header: http.Header{
191+
cfJumpDestinationHeader: []string{"ssh://localhost:9000/metrics"},
192+
},
193+
expectedDest: "localhost:9000",
194+
},
195+
{
196+
name: "hostname destination with port and path",
197+
header: http.Header{
198+
cfJumpDestinationHeader: []string{"localhost:9000/metrics"},
199+
},
200+
expectedDest: "localhost:9000",
201+
},
202+
{
203+
name: "ip destination",
204+
header: http.Header{
205+
cfJumpDestinationHeader: []string{"127.0.0.1"},
206+
},
207+
expectedDest: "127.0.0.1",
208+
},
209+
{
210+
name: "ip destination with port",
211+
header: http.Header{
212+
cfJumpDestinationHeader: []string{"127.0.0.1:9000"},
213+
},
214+
expectedDest: "127.0.0.1:9000",
215+
},
216+
{
217+
name: "ip destination with port and path",
218+
header: http.Header{
219+
cfJumpDestinationHeader: []string{"127.0.0.1:9000/metrics"},
220+
},
221+
expectedDest: "127.0.0.1:9000",
222+
},
223+
{
224+
name: "ip destination with schem and port",
225+
header: http.Header{
226+
cfJumpDestinationHeader: []string{"tcp://127.0.0.1:9000"},
227+
},
228+
expectedDest: "127.0.0.1:9000",
229+
},
230+
{
231+
name: "full ip url",
232+
header: http.Header{
233+
cfJumpDestinationHeader: []string{"ssh://127.0.0.1:9000/metrics"},
234+
},
235+
expectedDest: "127.0.0.1:9000",
236+
},
237+
{
238+
name: "no destination",
239+
wantErr: true,
240+
},
241+
}
242+
for _, test := range tests {
243+
r := &http.Request{
244+
Header: test.header,
245+
}
246+
dest, err := ResolveBastionDest(r)
247+
if test.wantErr {
248+
assert.Error(t, err, "Test %s expects error", test.name)
249+
} else {
250+
assert.NoError(t, err, "Test %s expects no error, got error %v", test.name, err)
251+
assert.Equal(t, test.expectedDest, dest, "Test %s expect dest %s, got %s", test.name, test.expectedDest, dest)
252+
}
253+
}
254+
}

carrier/websocket.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
package carrier
22

33
import (
4-
"fmt"
54
"io"
6-
"net"
75
"net/http"
86
"net/http/httputil"
97

108
"github.com/gorilla/websocket"
119
"github.com/rs/zerolog"
1210

13-
"github.com/cloudflare/cloudflared/ingress"
14-
"github.com/cloudflare/cloudflared/socks"
1511
"github.com/cloudflare/cloudflared/token"
1612
cfwebsocket "github.com/cloudflare/cloudflared/websocket"
1713
)
@@ -23,20 +19,6 @@ type Websocket struct {
2319
isSocks bool
2420
}
2521

26-
type wsdialer struct {
27-
conn *cfwebsocket.GorillaConn
28-
}
29-
30-
func (d *wsdialer) Dial(address string) (io.ReadWriteCloser, *socks.AddrSpec, error) {
31-
local, ok := d.conn.LocalAddr().(*net.TCPAddr)
32-
if !ok {
33-
return nil, nil, fmt.Errorf("not a tcp connection")
34-
}
35-
36-
addr := socks.AddrSpec{IP: local.IP, Port: local.Port}
37-
return d.conn, &addr, nil
38-
}
39-
4022
// NewWSConnection returns a new connection object
4123
func NewWSConnection(log *zerolog.Logger) Connection {
4224
return &Websocket{
@@ -54,16 +36,10 @@ func (ws *Websocket) ServeStream(options *StartOptions, conn io.ReadWriter) erro
5436
}
5537
defer wsConn.Close()
5638

57-
ingress.Stream(wsConn, conn, ws.log)
39+
cfwebsocket.Stream(wsConn, conn, ws.log)
5840
return nil
5941
}
6042

61-
// StartServer creates a Websocket server to listen for connections.
62-
// This is used on the origin (tunnel) side to take data from the muxer and send it to the origin
63-
func (ws *Websocket) StartServer(listener net.Listener, remote string, shutdownC <-chan struct{}) error {
64-
return cfwebsocket.StartProxyServer(ws.log, listener, remote, shutdownC, ingress.DefaultStreamHandler)
65-
}
66-
6743
// createWebsocketStream will create a WebSocket connection to stream data over
6844
// It also handles redirects from Access and will present that flow if
6945
// the token is not present on the request

cmd/cloudflared/access/carrier.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@ import (
66
"net/http"
77
"strings"
88

9+
"github.com/pkg/errors"
10+
"github.com/rs/zerolog"
11+
"github.com/urfave/cli/v2"
12+
913
"github.com/cloudflare/cloudflared/carrier"
1014
"github.com/cloudflare/cloudflared/config"
11-
"github.com/cloudflare/cloudflared/h2mux"
1215
"github.com/cloudflare/cloudflared/logger"
1316
"github.com/cloudflare/cloudflared/validation"
14-
15-
"github.com/pkg/errors"
16-
"github.com/rs/zerolog"
17-
"github.com/urfave/cli/v2"
1817
)
1918

2019
const (
21-
LogFieldHost = "host"
20+
LogFieldHost = "host"
21+
cfAccessClientIDHeader = "Cf-Access-Client-Id"
22+
cfAccessClientSecretHeader = "Cf-Access-Client-Secret"
2223
)
2324

2425
// StartForwarder starts a client side websocket forward
@@ -31,16 +32,14 @@ func StartForwarder(forwarder config.Forwarder, shutdown <-chan struct{}, log *z
3132
// get the headers from the config file and add to the request
3233
headers := make(http.Header)
3334
if forwarder.TokenClientID != "" {
34-
headers.Set(h2mux.CFAccessClientIDHeader, forwarder.TokenClientID)
35+
headers.Set(cfAccessClientIDHeader, forwarder.TokenClientID)
3536
}
3637

3738
if forwarder.TokenSecret != "" {
38-
headers.Set(h2mux.CFAccessClientSecretHeader, forwarder.TokenSecret)
39+
headers.Set(cfAccessClientSecretHeader, forwarder.TokenSecret)
3940
}
4041

41-
if forwarder.Destination != "" {
42-
headers.Add(h2mux.CFJumpDestinationHeader, forwarder.Destination)
43-
}
42+
carrier.SetBastionDest(headers, forwarder.Destination)
4443

4544
options := &carrier.StartOptions{
4645
OriginURL: forwarder.URL,
@@ -72,16 +71,13 @@ func ssh(c *cli.Context) error {
7271
// get the headers from the cmdline and add them
7372
headers := buildRequestHeaders(c.StringSlice(sshHeaderFlag))
7473
if c.IsSet(sshTokenIDFlag) {
75-
headers.Set(h2mux.CFAccessClientIDHeader, c.String(sshTokenIDFlag))
74+
headers.Set(cfAccessClientIDHeader, c.String(sshTokenIDFlag))
7675
}
7776
if c.IsSet(sshTokenSecretFlag) {
78-
headers.Set(h2mux.CFAccessClientSecretHeader, c.String(sshTokenSecretFlag))
77+
headers.Set(cfAccessClientSecretHeader, c.String(sshTokenSecretFlag))
7978
}
8079

81-
destination := c.String(sshDestinationFlag)
82-
if destination != "" {
83-
headers.Add(h2mux.CFJumpDestinationHeader, destination)
84-
}
80+
carrier.SetBastionDest(headers, c.String(sshDestinationFlag))
8581

8682
options := &carrier.StartOptions{
8783
OriginURL: originURL,

cmd/cloudflared/access/cmd.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919

2020
"github.com/cloudflare/cloudflared/carrier"
2121
"github.com/cloudflare/cloudflared/cmd/cloudflared/cliutil"
22-
"github.com/cloudflare/cloudflared/h2mux"
2322
"github.com/cloudflare/cloudflared/logger"
2423
"github.com/cloudflare/cloudflared/sshgen"
2524
"github.com/cloudflare/cloudflared/token"
@@ -286,7 +285,7 @@ func curl(c *cli.Context) error {
286285
}
287286

288287
cmdArgs = append(cmdArgs, "-H")
289-
cmdArgs = append(cmdArgs, fmt.Sprintf("%s: %s", h2mux.CFAccessTokenHeader, tok))
288+
cmdArgs = append(cmdArgs, fmt.Sprintf("%s: %s", carrier.CFAccessTokenHeader, tok))
290289
return run("curl", cmdArgs...)
291290
}
292291

@@ -472,10 +471,10 @@ func isFileThere(candidate string) bool {
472471
func verifyTokenAtEdge(appUrl *url.URL, appInfo *token.AppInfo, c *cli.Context, log *zerolog.Logger) error {
473472
headers := buildRequestHeaders(c.StringSlice(sshHeaderFlag))
474473
if c.IsSet(sshTokenIDFlag) {
475-
headers.Add(h2mux.CFAccessClientIDHeader, c.String(sshTokenIDFlag))
474+
headers.Add(cfAccessClientIDHeader, c.String(sshTokenIDFlag))
476475
}
477476
if c.IsSet(sshTokenSecretFlag) {
478-
headers.Add(h2mux.CFAccessClientSecretHeader, c.String(sshTokenSecretFlag))
477+
headers.Add(cfAccessClientSecretHeader, c.String(sshTokenSecretFlag))
479478
}
480479
options := &carrier.StartOptions{AppInfo: appInfo, OriginURL: appUrl.String(), Headers: headers}
481480

connection/h2mux.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func (h *h2muxConnection) newRequest(stream *h2mux.MuxedStream) (*http.Request,
234234
if err != nil {
235235
return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
236236
}
237-
err = h2mux.H2RequestHeadersToH1Request(stream.Headers, req)
237+
err = H2RequestHeadersToH1Request(stream.Headers, req)
238238
if err != nil {
239239
return nil, errors.Wrap(err, "invalid request received")
240240
}
@@ -246,15 +246,15 @@ type h2muxRespWriter struct {
246246
}
247247

248248
func (rp *h2muxRespWriter) WriteRespHeaders(status int, header http.Header) error {
249-
headers := h2mux.H1ResponseToH2ResponseHeaders(status, header)
250-
headers = append(headers, h2mux.Header{Name: ResponseMetaHeaderField, Value: responseMetaHeaderOrigin})
249+
headers := H1ResponseToH2ResponseHeaders(status, header)
250+
headers = append(headers, h2mux.Header{Name: ResponseMetaHeader, Value: responseMetaHeaderOrigin})
251251
return rp.WriteHeaders(headers)
252252
}
253253

254254
func (rp *h2muxRespWriter) WriteErrorResponse() {
255255
_ = rp.WriteHeaders([]h2mux.Header{
256256
{Name: ":status", Value: "502"},
257-
{Name: ResponseMetaHeaderField, Value: responseMetaHeaderCfd},
257+
{Name: ResponseMetaHeader, Value: responseMetaHeaderCfd},
258258
})
259259
_, _ = rp.Write([]byte("502 Bad Gateway"))
260260
}

connection/h2mux_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ func TestServeStreamHTTP(t *testing.T) {
115115
require.True(t, hasHeader(stream, ":status", strconv.Itoa(test.expectedStatus)))
116116

117117
if test.isProxyError {
118-
assert.True(t, hasHeader(stream, ResponseMetaHeaderField, responseMetaHeaderCfd))
118+
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderCfd))
119119
} else {
120-
assert.True(t, hasHeader(stream, ResponseMetaHeaderField, responseMetaHeaderOrigin))
120+
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
121121
body := make([]byte, len(test.expectedBody))
122122
_, err = stream.Read(body)
123123
require.NoError(t, err)
@@ -164,7 +164,7 @@ func TestServeStreamWS(t *testing.T) {
164164
require.NoError(t, err)
165165

166166
require.True(t, hasHeader(stream, ":status", strconv.Itoa(http.StatusSwitchingProtocols)))
167-
assert.True(t, hasHeader(stream, ResponseMetaHeaderField, responseMetaHeaderOrigin))
167+
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
168168

169169
data := []byte("test websocket")
170170
err = wsutil.WriteClientText(writePipe, data)
@@ -268,7 +268,7 @@ func benchmarkServeStreamHTTPSimple(b *testing.B, test testRequest) {
268268
b.StopTimer()
269269

270270
require.NoError(b, openstreamErr)
271-
assert.True(b, hasHeader(stream, ResponseMetaHeaderField, responseMetaHeaderOrigin))
271+
assert.True(b, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
272272
require.True(b, hasHeader(stream, ":status", strconv.Itoa(http.StatusOK)))
273273
require.NoError(b, readBodyErr)
274274
require.Equal(b, test.expectedBody, body)

0 commit comments

Comments
 (0)