Skip to content

Commit d7d712d

Browse files
mporschKais HasanMarco Porsch
authored
Use io.Reader to avoid file size limits (#26)
* allow large file download, and send the file directly to the user without reading its whole content on the server * use command-line-exposed timeout for tunnel to allow writing large requests * use command-line-exposed timeout for client write deadline * follow renamed mergo module * remove unused function argument --------- Co-authored-by: Kais Hasan <kais.hasan@extern.intenta.de> Co-authored-by: Marco Porsch <marco.porsch@intenta.de>
1 parent e38a446 commit d7d712d

File tree

4 files changed

+25
-22
lines changed

4 files changed

+25
-22
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ require (
1212
github.com/onsi/gomega v1.27.4
1313
gopkg.in/inconshreveable/log15.v2 v2.16.0
1414
)
15+
16+
replace github.com/imdario/mergo => dario.cat/mergo latest

tunnel/ws.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
package tunnel
44

55
import (
6-
"bytes"
76
"errors"
87
"fmt"
98
"html"
109
"io"
11-
"io/ioutil"
1210
"net/http"
11+
"sync"
1312

1413
// imported per documentation - https://golang.org/pkg/net/http/pprof/
1514
_ "net/http/pprof"
@@ -77,16 +76,14 @@ func wsHandler(t *WSTunnelServer, w http.ResponseWriter, r *http.Request) {
7776
go func() {
7877
rs.remoteName, rs.remoteWhois = ipAddrLookup(t.Log, rs.remoteAddr)
7978
}()
80-
// Set safety limits
81-
ws.SetReadLimit(100 * 1024 * 1024)
8279
// Start timeout handling
8380
wsSetPingHandler(t, ws, rs)
8481
// Create synchronization channel
8582
ch := make(chan int, 2)
8683
// Spawn goroutine to read responses
87-
go wsReader(rs, ws, t.WSTimeout, ch)
84+
go wsReader(rs, ws, t.WSTimeout, ch, &rs.readWG)
8885
// Send requests
89-
wsWriter(rs, ws, ch)
86+
wsWriter(rs, ws, t.WSTimeout, ch)
9087
}
9188

9289
func wsSetPingHandler(t *WSTunnelServer, ws *websocket.Conn, rs *remoteServer) {
@@ -111,7 +108,7 @@ func wsSetPingHandler(t *WSTunnelServer, ws *websocket.Conn, rs *remoteServer) {
111108
}
112109

113110
// Pick requests off the RemoteServer queue and send them into the tunnel
114-
func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) {
111+
func wsWriter(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int) {
115112
var req *remoteRequest
116113
var err error
117114
for {
@@ -136,7 +133,7 @@ func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) {
136133
continue
137134
}
138135
// write the request into the tunnel
139-
ws.SetWriteDeadline(time.Now().Add(time.Minute))
136+
ws.SetWriteDeadline(time.Now().Add(wsTimeout))
140137
var w io.WriteCloser
141138
w, err = ws.NextWriter(websocket.BinaryMessage)
142139
// got an error, reply with a "hey, retry" to the request handler
@@ -170,11 +167,15 @@ func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) {
170167
}
171168

172169
// Read responses from the tunnel and fulfill pending requests
173-
func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int) {
170+
func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int, readWG *sync.WaitGroup) {
174171
var err error
175172
logToken := cutToken(rs.token)
176173
// continue reading until we get an error
177174
for {
175+
// wait if another response is being sent
176+
readWG.Wait()
177+
// increment the WaitGroup counter
178+
readWG.Add(1)
178179
ws.SetReadDeadline(time.Time{}) // no timeout, there's the ping-pong for that
179180
// read a message from the tunnel
180181
var t int
@@ -195,34 +196,31 @@ func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch
195196
if err != nil {
196197
break
197198
}
198-
// read request itself, the size is limited by the SetReadLimit on the websocket
199-
var buf []byte
200-
buf, err = ioutil.ReadAll(r)
201-
if err != nil {
202-
break
203-
}
204-
rs.log.Info("WS RCV", "id", id, "ws", wsp(ws), "len", len(buf))
205199
// try to match request
206200
rs.requestSetMutex.Lock()
207201
req := rs.requestSet[id]
208202
rs.lastActivity = time.Now()
209203
rs.requestSetMutex.Unlock()
210204
// let's see...
211205
if req != nil {
212-
rb := responseBuffer{response: bytes.NewBuffer(buf)}
206+
rb := responseBuffer{response: r}
213207
// try to enqueue response
214208
select {
215209
case req.replyChan <- rb:
216210
// great!
211+
rs.log.Info("WS RCV enqueued response", "id", id, "ws", wsp(ws))
217212
default:
213+
readWG.Done()
218214
rs.log.Info("WS RCV can't enqueue response", "id", id, "ws", wsp(ws))
219215
}
220216
} else {
217+
readWG.Done()
221218
rs.log.Info("%s #%d: WS RCV orphan response", "id", id, "ws", wsp(ws))
222219
}
223220
}
224221
// print error message
225222
if err != nil {
223+
readWG.Done()
226224
rs.log.Info("WS closing", "token", logToken, "err", err.Error(), "ws", wsp(ws))
227225
}
228226
// close up shop

tunnel/wstuncli.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,7 @@ func (wsc *WSConnection) writeResponseMessage(id int16, resp *http.Response) {
780780
wsWriterMutex.Lock()
781781
defer wsWriterMutex.Unlock()
782782
// Write response into the tunnel
783-
wsc.ws.SetWriteDeadline(time.Now().Add(time.Minute))
783+
wsc.ws.SetWriteDeadline(time.Now().Add(wsc.tun.Timeout))
784784
w, err := wsc.ws.NextWriter(websocket.BinaryMessage)
785785
// got an error, reply with a "hey, retry" to the request handler
786786
if err != nil {

tunnel/wstunsrv.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type token string
5151

5252
type responseBuffer struct {
5353
err error
54-
response *bytes.Buffer
54+
response io.Reader
5555
}
5656

5757
// A request for a remote server
@@ -77,6 +77,7 @@ type remoteServer struct {
7777
requestSet map[int16]*remoteRequest // all requests in queue/flight indexed by ID
7878
requestSetMutex sync.Mutex
7979
log log15.Logger
80+
readWG sync.WaitGroup
8081
}
8182

8283
//WSTunnelServer a wstunnel server construct
@@ -372,7 +373,7 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r
372373
case resp := <-req.replyChan:
373374
// if there's no error just respond
374375
if resp.err == nil {
375-
code := writeResponse(w, resp.response)
376+
code := writeResponse(rs, w, resp.response)
376377
req.log.Info("HTTP RET", "status", code)
377378
return
378379
}
@@ -498,8 +499,9 @@ var censoredHeaders = []string{
498499
}
499500

500501
// Write an HTTP response from a byte buffer into a ResponseWriter
501-
func writeResponse(w http.ResponseWriter, buf *bytes.Buffer) int {
502-
resp, err := http.ReadResponse(bufio.NewReader(buf), nil)
502+
func writeResponse(rs *remoteServer, w http.ResponseWriter, r io.Reader) int {
503+
defer rs.readWG.Done()
504+
resp, err := http.ReadResponse(bufio.NewReader(r), nil)
503505
if err != nil {
504506
log15.Info("WriteResponse: can't parse incoming response", "err", err)
505507
w.WriteHeader(506)
@@ -512,6 +514,7 @@ func writeResponse(w http.ResponseWriter, buf *bytes.Buffer) int {
512514
copyHeader(w.Header(), resp.Header)
513515
w.WriteHeader(resp.StatusCode)
514516
io.Copy(w, resp.Body)
517+
resp.Body.Close()
515518
return resp.StatusCode
516519
}
517520

0 commit comments

Comments
 (0)