Skip to content

Commit e49a7a4

Browse files
TUN-4597: Added HTTPProxy for QUIC
1 parent 5749425 commit e49a7a4

File tree

3 files changed

+221
-36
lines changed

3 files changed

+221
-36
lines changed

connection/quic.go

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ package connection
33
import (
44
"context"
55
"crypto/tls"
6+
"fmt"
7+
"io"
68
"net"
9+
"net/http"
10+
"strconv"
11+
"strings"
712

813
"github.com/lucas-clemente/quic-go"
914
"github.com/pkg/errors"
@@ -12,10 +17,20 @@ import (
1217
quicpogs "github.com/cloudflare/cloudflared/quic"
1318
)
1419

20+
const (
21+
// HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP.
22+
HTTPHeaderKey = "HttpHeader"
23+
// HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP.
24+
HTTPMethodKey = "HttpMethod"
25+
// HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP.
26+
HTTPHostKey = "HttpHost"
27+
)
28+
1529
// QUICConnection represents the type that facilitates Proxying via QUIC streams.
1630
type QUICConnection struct {
17-
session quic.Session
18-
logger zerolog.Logger
31+
session quic.Session
32+
logger zerolog.Logger
33+
httpProxy OriginProxy
1934
}
2035

2136
// NewQUICConnection returns a new instance of QUICConnection.
@@ -24,6 +39,7 @@ func NewQUICConnection(
2439
quicConfig *quic.Config,
2540
edgeAddr net.Addr,
2641
tlsConfig *tls.Config,
42+
httpProxy OriginProxy,
2743
logger zerolog.Logger,
2844
) (*QUICConnection, error) {
2945
session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig)
@@ -34,8 +50,9 @@ func NewQUICConnection(
3450
//TODO: RegisterConnectionRPC here.
3551

3652
return &QUICConnection{
37-
session: session,
38-
logger: logger,
53+
session: session,
54+
httpProxy: httpProxy,
55+
logger: logger,
3956
}, nil
4057
}
4158

@@ -58,7 +75,7 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
5875
}
5976
}
6077

61-
// Close calls this to close the QuicConnection stream.
78+
// Close closes the session with no errors specified.
6279
func (q *QUICConnection) Close() {
6380
q.session.CloseWithError(0, "")
6481
}
@@ -71,14 +88,66 @@ func (q *QUICConnection) handleStream(stream quic.Stream) error {
7188

7289
switch connectRequest.Type {
7390
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
74-
// Temporary dummy code for the unit test.
75-
if err := quicpogs.WriteConnectResponseData(stream, nil, quicpogs.Metadata{Key: "HTTPStatus", Val: "200"}); err != nil {
91+
req, err := buildHTTPRequest(connectRequest, stream)
92+
if err != nil {
7693
return err
7794
}
7895

79-
stream.Write([]byte("OK"))
96+
w := newHTTPResponseAdapter(stream)
97+
return q.httpProxy.ProxyHTTP(w, req, connectRequest.Type == quicpogs.ConnectionTypeWebsocket)
8098
case quicpogs.ConnectionTypeTCP:
81-
99+
return errors.New("not implemented")
82100
}
83101
return nil
84102
}
103+
104+
// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
105+
type httpResponseAdapter struct {
106+
io.Writer
107+
}
108+
109+
func newHTTPResponseAdapter(w io.Writer) httpResponseAdapter {
110+
return httpResponseAdapter{w}
111+
}
112+
113+
func (hrw httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
114+
metadata := make([]quicpogs.Metadata, 0)
115+
metadata = append(metadata, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
116+
for k, vv := range header {
117+
for _, v := range vv {
118+
httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k)
119+
metadata = append(metadata, quicpogs.Metadata{Key: httpHeaderKey, Val: v})
120+
}
121+
}
122+
return quicpogs.WriteConnectResponseData(hrw, nil, metadata...)
123+
}
124+
125+
func (hrw httpResponseAdapter) WriteErrorResponse(err error) {
126+
quicpogs.WriteConnectResponseData(hrw, err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
127+
}
128+
129+
func buildHTTPRequest(connectRequest *quicpogs.ConnectRequest, body io.Reader) (*http.Request, error) {
130+
metadata := connectRequest.MetadataMap()
131+
dest := connectRequest.Dest
132+
method := metadata[HTTPMethodKey]
133+
host := metadata[HTTPHostKey]
134+
135+
req, err := http.NewRequest(method, dest, body)
136+
if err != nil {
137+
return nil, err
138+
}
139+
140+
req.Host = host
141+
for _, metadata := range connectRequest.Metadata {
142+
if strings.Contains(metadata.Key, HTTPHeaderKey) {
143+
// metadata.Key is off the format httpHeaderKey:<HTTPHeader>
144+
httpHeaderKey := strings.Split(metadata.Key, ":")
145+
if len(httpHeaderKey) != 2 {
146+
return nil, fmt.Errorf("Header Key: %s malformed", metadata.Key)
147+
}
148+
req.Header.Add(httpHeaderKey[1], metadata.Val)
149+
}
150+
}
151+
stripWebsocketUpgradeHeader(req)
152+
return req, err
153+
}

connection/quic_test.go

Lines changed: 134 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
package connection
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/rand"
67
"crypto/rsa"
78
"crypto/tls"
89
"crypto/x509"
910
"encoding/pem"
10-
"io/ioutil"
11+
"fmt"
12+
"io"
1113
"math/big"
1214
"net"
15+
"net/http"
1316
"os"
1417
"sync"
1518
"testing"
1619

20+
"github.com/gobwas/ws/wsutil"
1721
"github.com/lucas-clemente/quic-go"
22+
"github.com/pkg/errors"
1823
"github.com/rs/zerolog"
1924
"github.com/stretchr/testify/assert"
2025
"github.com/stretchr/testify/require"
@@ -29,65 +34,127 @@ func TestQUICServer(t *testing.T) {
2934
KeepAlive: true,
3035
}
3136

37+
// Setup test.
3238
log := zerolog.New(os.Stdout)
3339

40+
// Start a UDP Listener for QUIC.
3441
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
35-
3642
require.NoError(t, err)
3743
udpListener, err := net.ListenUDP(udpAddr.Network(), udpAddr)
3844
require.NoError(t, err)
3945
defer udpListener.Close()
46+
47+
// Create a simple tls config.
4048
tlsConfig := generateTLSConfig()
41-
tlsConfClient := &tls.Config{
49+
50+
// Create a client config
51+
tlsClientConfig := &tls.Config{
4252
InsecureSkipVerify: true,
4353
NextProtos: []string{"argotunnel"},
4454
}
55+
56+
// Start a mock httpProxy
57+
originProxy := &mockOriginProxyWithRequest{}
58+
59+
// This is simply a sample websocket frame message.
60+
wsBuf := &bytes.Buffer{}
61+
wsutil.WriteClientText(wsBuf, []byte("Hello"))
62+
4563
var tests = []struct {
46-
desc string
47-
dest string
48-
connectionType quicpogs.ConnectionType
49-
metadata []quicpogs.Metadata
50-
message []byte
51-
expectedMessage []byte
64+
desc string
65+
dest string
66+
connectionType quicpogs.ConnectionType
67+
metadata []quicpogs.Metadata
68+
message []byte
69+
expectedResponse []byte
5270
}{
5371
{
54-
desc: "",
55-
dest: "somehost.com",
72+
desc: "test http proxy",
73+
dest: "/ok",
74+
connectionType: quicpogs.ConnectionTypeHTTP,
75+
metadata: []quicpogs.Metadata{
76+
quicpogs.Metadata{
77+
Key: "HttpHeader:Cf-Ray",
78+
Val: "123123123",
79+
},
80+
quicpogs.Metadata{
81+
Key: "HttpHost",
82+
Val: "cf.host",
83+
},
84+
quicpogs.Metadata{
85+
Key: "HttpMethod",
86+
Val: "GET",
87+
},
88+
},
89+
expectedResponse: []byte("OK"),
90+
},
91+
{
92+
desc: "test http body request streaming",
93+
dest: "/echo_body",
94+
connectionType: quicpogs.ConnectionTypeHTTP,
95+
metadata: []quicpogs.Metadata{
96+
quicpogs.Metadata{
97+
Key: "HttpHeader:Cf-Ray",
98+
Val: "123123123",
99+
},
100+
quicpogs.Metadata{
101+
Key: "HttpHost",
102+
Val: "cf.host",
103+
},
104+
quicpogs.Metadata{
105+
Key: "HttpMethod",
106+
Val: "POST",
107+
},
108+
},
109+
message: []byte("This is the message body"),
110+
expectedResponse: []byte("This is the message body"),
111+
},
112+
{
113+
desc: "test ws proxy",
114+
dest: "/ok",
56115
connectionType: quicpogs.ConnectionTypeWebsocket,
57116
metadata: []quicpogs.Metadata{
58117
quicpogs.Metadata{
59-
Key: "key",
60-
Val: "value",
118+
Key: "HttpHeader:Cf-Cloudflared-Proxy-Connection-Upgrade",
119+
Val: "Websocket",
120+
},
121+
quicpogs.Metadata{
122+
Key: "HttpHeader:Another-Header",
123+
Val: "Misc",
124+
},
125+
quicpogs.Metadata{
126+
Key: "HttpHost",
127+
Val: "cf.host",
128+
},
129+
quicpogs.Metadata{
130+
Key: "HttpMethod",
131+
Val: "get",
61132
},
62133
},
63-
expectedMessage: []byte("OK"),
134+
message: wsBuf.Bytes(),
135+
expectedResponse: []byte{0x81, 0x5, 0x48, 0x65, 0x6c, 0x6c, 0x6f},
64136
},
65137
}
66138

67139
for _, test := range tests {
68140
t.Run(test.desc, func(t *testing.T) {
69141
ctx, cancel := context.WithCancel(context.Background())
70-
71142
var wg sync.WaitGroup
72143
go func() {
73144
wg.Add(1)
145+
defer wg.Done()
74146
quicServer(
75147
t, udpListener, tlsConfig, quicConfig,
76-
test.dest, test.connectionType, test.metadata, test.message, test.expectedMessage,
148+
test.dest, test.connectionType, test.metadata, test.message, test.expectedResponse,
77149
)
78-
wg.Done()
79150
}()
80151

81-
qC, err := NewQUICConnection(context.Background(), quicConfig, udpListener.LocalAddr(), tlsConfClient, log)
152+
qC, err := NewQUICConnection(ctx, quicConfig, udpListener.LocalAddr(), tlsClientConfig, originProxy, log)
82153
require.NoError(t, err)
154+
go qC.Serve(ctx)
83155

84-
go func() {
85-
wg.Wait()
86-
cancel()
87-
}()
88-
89-
qC.Serve(ctx)
90-
156+
wg.Wait()
157+
cancel()
91158
})
92159
}
93160

@@ -125,11 +192,12 @@ func quicServer(
125192

126193
if message != nil {
127194
// ALPN successful. Write data.
128-
_, err = stream.Write([]byte(message))
195+
_, err := stream.Write([]byte(message))
129196
require.NoError(t, err)
130197
}
131198

132-
response, err := ioutil.ReadAll(stream)
199+
response := make([]byte, len(expectedResponse))
200+
stream.Read(response)
133201
require.NoError(t, err)
134202

135203
// For now it is an echo server. Verify if the same data is returned.
@@ -159,3 +227,42 @@ func generateTLSConfig() *tls.Config {
159227
NextProtos: []string{"argotunnel"},
160228
}
161229
}
230+
231+
type mockOriginProxyWithRequest struct{}
232+
233+
func (moc *mockOriginProxyWithRequest) ProxyHTTP(w ResponseWriter, r *http.Request, isWebsocket bool) error {
234+
// These are a series of crude tests to ensure the headers and http related data is transferred from
235+
// metadata.
236+
if r.Method == "" {
237+
return errors.New("method not sent")
238+
}
239+
if r.Host == "" {
240+
return errors.New("host not sent")
241+
}
242+
if len(r.Header) == 0 {
243+
return errors.New("headers not set")
244+
}
245+
246+
if isWebsocket {
247+
return wsEndpoint(w, r)
248+
}
249+
switch r.URL.Path {
250+
case "/ok":
251+
originRespEndpoint(w, http.StatusOK, []byte(http.StatusText(http.StatusOK)))
252+
case "/echo_body":
253+
resp := &http.Response{
254+
StatusCode: http.StatusOK,
255+
}
256+
_ = w.WriteRespHeaders(resp.StatusCode, resp.Header)
257+
io.Copy(w, r.Body)
258+
case "/error":
259+
return fmt.Errorf("Failed to proxy to origin")
260+
default:
261+
originRespEndpoint(w, http.StatusNotFound, []byte("page not found"))
262+
}
263+
return nil
264+
}
265+
266+
func (moc *mockOriginProxyWithRequest) ProxyTCP(ctx context.Context, rwa ReadWriteAcker, tcpRequest *TCPRequest) error {
267+
return nil
268+
}

quic/pogs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ type Metadata struct {
2929
Val string `capnp:"val"`
3030
}
3131

32+
// MetadataMap returns a map format of []Metadata.
33+
func (r *ConnectRequest) MetadataMap() map[string]string {
34+
metadataMap := make(map[string]string)
35+
for _, metadata := range r.Metadata {
36+
metadataMap[metadata.Key] = metadata.Val
37+
}
38+
return metadataMap
39+
}
40+
3241
func (r *ConnectRequest) fromPogs(msg *capnp.Message) error {
3342
metadata, err := schema.ReadRootConnectRequest(msg)
3443
if err != nil {

0 commit comments

Comments
 (0)