From 2aa4c14ad7e97fa0db7e082a0dbb0a4c682bc8d8 Mon Sep 17 00:00:00 2001 From: Kais Hasan Date: Mon, 31 Jul 2023 10:43:24 +0300 Subject: [PATCH 01/10] allow large file download, and send the file directly to the user without reading its whole content on the server --- tunnel/ws.go | 26 ++++++++++++-------------- tunnel/wstunsrv.go | 11 +++++++---- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/tunnel/ws.go b/tunnel/ws.go index eba8dbe..f9cdba1 100644 --- a/tunnel/ws.go +++ b/tunnel/ws.go @@ -3,13 +3,12 @@ package tunnel import ( - "bytes" "errors" "fmt" "html" "io" - "io/ioutil" "net/http" + "sync" // imported per documentation - https://golang.org/pkg/net/http/pprof/ _ "net/http/pprof" @@ -77,14 +76,12 @@ func wsHandler(t *WSTunnelServer, w http.ResponseWriter, r *http.Request) { go func() { rs.remoteName, rs.remoteWhois = ipAddrLookup(t.Log, rs.remoteAddr) }() - // Set safety limits - ws.SetReadLimit(100 * 1024 * 1024) // Start timeout handling wsSetPingHandler(t, ws, rs) // Create synchronization channel ch := make(chan int, 2) // Spawn goroutine to read responses - go wsReader(rs, ws, t.WSTimeout, ch) + go wsReader(rs, ws, t.WSTimeout, ch, &rs.readWG) // Send requests wsWriter(rs, ws, ch) } @@ -170,11 +167,15 @@ func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) { } // Read responses from the tunnel and fulfill pending requests -func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int) { +func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int, readWG *sync.WaitGroup) { var err error logToken := cutToken(rs.token) // continue reading until we get an error for { + // wait if another response is being sent + readWG.Wait() + // increment the WaitGroup counter + readWG.Add(1) ws.SetReadDeadline(time.Time{}) // no timeout, there's the ping-pong for that // read a message from the tunnel var t int @@ -195,13 +196,6 @@ func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch if err != nil { break } - // read request itself, the size is limited by the SetReadLimit on the websocket - var buf []byte - buf, err = ioutil.ReadAll(r) - if err != nil { - break - } - rs.log.Info("WS RCV", "id", id, "ws", wsp(ws), "len", len(buf)) // try to match request rs.requestSetMutex.Lock() req := rs.requestSet[id] @@ -209,20 +203,24 @@ func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch rs.requestSetMutex.Unlock() // let's see... if req != nil { - rb := responseBuffer{response: bytes.NewBuffer(buf)} + rb := responseBuffer{response: r} // try to enqueue response select { case req.replyChan <- rb: // great! + rs.log.Info("WS RCV enqueued response", "id", id, "ws", wsp(ws)) default: + readWG.Done() rs.log.Info("WS RCV can't enqueue response", "id", id, "ws", wsp(ws)) } } else { + readWG.Done() rs.log.Info("%s #%d: WS RCV orphan response", "id", id, "ws", wsp(ws)) } } // print error message if err != nil { + readWG.Done() rs.log.Info("WS closing", "token", logToken, "err", err.Error(), "ws", wsp(ws)) } // close up shop diff --git a/tunnel/wstunsrv.go b/tunnel/wstunsrv.go index 034ff56..e41b810 100644 --- a/tunnel/wstunsrv.go +++ b/tunnel/wstunsrv.go @@ -51,7 +51,7 @@ type token string type responseBuffer struct { err error - response *bytes.Buffer + response io.Reader } // A request for a remote server @@ -77,6 +77,7 @@ type remoteServer struct { requestSet map[int16]*remoteRequest // all requests in queue/flight indexed by ID requestSetMutex sync.Mutex log log15.Logger + readWG sync.WaitGroup } //WSTunnelServer a wstunnel server construct @@ -372,7 +373,7 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r case resp := <-req.replyChan: // if there's no error just respond if resp.err == nil { - code := writeResponse(w, resp.response) + code := writeResponse(t, rs, w, resp.response) req.log.Info("HTTP RET", "status", code) return } @@ -498,8 +499,9 @@ var censoredHeaders = []string{ } // Write an HTTP response from a byte buffer into a ResponseWriter -func writeResponse(w http.ResponseWriter, buf *bytes.Buffer) int { - resp, err := http.ReadResponse(bufio.NewReader(buf), nil) +func writeResponse(t *WSTunnelServer, rs *remoteServer, w http.ResponseWriter, r io.Reader) int { + defer rs.readWG.Done() + resp, err := http.ReadResponse(bufio.NewReader(r), nil) if err != nil { log15.Info("WriteResponse: can't parse incoming response", "err", err) w.WriteHeader(506) @@ -512,6 +514,7 @@ func writeResponse(w http.ResponseWriter, buf *bytes.Buffer) int { copyHeader(w.Header(), resp.Header) w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body) + resp.Body.Close() return resp.StatusCode } From 01c553e6e67ffb785292e6a54467c53b5a967851 Mon Sep 17 00:00:00 2001 From: Kais Hasan Date: Thu, 3 Aug 2023 15:33:31 +0300 Subject: [PATCH 02/10] use command-line-exposed timeout for tunnel to allow writing large requests --- tunnel/ws.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tunnel/ws.go b/tunnel/ws.go index f9cdba1..f2c3678 100644 --- a/tunnel/ws.go +++ b/tunnel/ws.go @@ -83,7 +83,7 @@ func wsHandler(t *WSTunnelServer, w http.ResponseWriter, r *http.Request) { // Spawn goroutine to read responses go wsReader(rs, ws, t.WSTimeout, ch, &rs.readWG) // Send requests - wsWriter(rs, ws, ch) + wsWriter(rs, ws, t.WSTimeout, ch) } func wsSetPingHandler(t *WSTunnelServer, ws *websocket.Conn, rs *remoteServer) { @@ -108,7 +108,7 @@ func wsSetPingHandler(t *WSTunnelServer, ws *websocket.Conn, rs *remoteServer) { } // Pick requests off the RemoteServer queue and send them into the tunnel -func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) { +func wsWriter(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int) { var req *remoteRequest var err error for { @@ -133,7 +133,7 @@ func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) { continue } // write the request into the tunnel - ws.SetWriteDeadline(time.Now().Add(time.Minute)) + ws.SetWriteDeadline(time.Now().Add(wsTimeout)) var w io.WriteCloser w, err = ws.NextWriter(websocket.BinaryMessage) // got an error, reply with a "hey, retry" to the request handler From 6be4a9c2718f8c90bda62e32b19334d6a8e59df3 Mon Sep 17 00:00:00 2001 From: Marco Porsch Date: Thu, 10 Aug 2023 09:45:15 +0200 Subject: [PATCH 03/10] use command-line-exposed timeout for client write deadline --- tunnel/wstuncli.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tunnel/wstuncli.go b/tunnel/wstuncli.go index 1ffd00d..09615e1 100644 --- a/tunnel/wstuncli.go +++ b/tunnel/wstuncli.go @@ -758,7 +758,7 @@ func (wsc *WSConnection) writeResponseMessage(id int16, resp *http.Response) { wsWriterMutex.Lock() defer wsWriterMutex.Unlock() // Write response into the tunnel - wsc.ws.SetWriteDeadline(time.Now().Add(time.Minute)) + wsc.ws.SetWriteDeadline(time.Now().Add(wsc.tun.Timeout)) w, err := wsc.ws.NextWriter(websocket.BinaryMessage) // got an error, reply with a "hey, retry" to the request handler if err != nil { From 5eb4389f37bd8136bacacead3bf0509e92bffdba Mon Sep 17 00:00:00 2001 From: Marco Porsch Date: Wed, 16 Aug 2023 12:51:43 +0200 Subject: [PATCH 04/10] follow renamed mergo module --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index be0925f..b0d4b0f 100644 --- a/go.mod +++ b/go.mod @@ -13,3 +13,5 @@ require ( golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect gopkg.in/inconshreveable/log15.v2 v2.0.0-20200109203555-b30bc20e4fd1 ) + +replace github.com/imdario/mergo => dario.cat/mergo latest From 4875ff0f1c480df8cb95d56c1328eb71c13a3704 Mon Sep 17 00:00:00 2001 From: Marco Porsch <1180665+mporsch@users.noreply.github.com> Date: Mon, 21 Aug 2023 15:31:35 +0200 Subject: [PATCH 05/10] remove unused function argument --- tunnel/wstunsrv.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tunnel/wstunsrv.go b/tunnel/wstunsrv.go index e41b810..de862ed 100644 --- a/tunnel/wstunsrv.go +++ b/tunnel/wstunsrv.go @@ -373,7 +373,7 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r case resp := <-req.replyChan: // if there's no error just respond if resp.err == nil { - code := writeResponse(t, rs, w, resp.response) + code := writeResponse(rs, w, resp.response) req.log.Info("HTTP RET", "status", code) return } @@ -499,7 +499,7 @@ var censoredHeaders = []string{ } // Write an HTTP response from a byte buffer into a ResponseWriter -func writeResponse(t *WSTunnelServer, rs *remoteServer, w http.ResponseWriter, r io.Reader) int { +func writeResponse(rs *remoteServer, w http.ResponseWriter, r io.Reader) int { defer rs.readWG.Done() resp, err := http.ReadResponse(bufio.NewReader(r), nil) if err != nil { From 58f2cdef4c02f1b9c6ef4676820d367856dceaef Mon Sep 17 00:00:00 2001 From: Marco Porsch Date: Mon, 16 Oct 2023 09:58:41 +0200 Subject: [PATCH 06/10] disable write deadline this reverts "use command-line-exposed timeout for client write deadline" and "use command-line-exposed timeout for tunnel to allow writing large requests" --- tunnel/ws.go | 6 +++--- tunnel/wstuncli.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tunnel/ws.go b/tunnel/ws.go index f2c3678..ca6c7c4 100644 --- a/tunnel/ws.go +++ b/tunnel/ws.go @@ -83,7 +83,7 @@ func wsHandler(t *WSTunnelServer, w http.ResponseWriter, r *http.Request) { // Spawn goroutine to read responses go wsReader(rs, ws, t.WSTimeout, ch, &rs.readWG) // Send requests - wsWriter(rs, ws, t.WSTimeout, ch) + wsWriter(rs, ws, ch) } func wsSetPingHandler(t *WSTunnelServer, ws *websocket.Conn, rs *remoteServer) { @@ -108,7 +108,7 @@ func wsSetPingHandler(t *WSTunnelServer, ws *websocket.Conn, rs *remoteServer) { } // Pick requests off the RemoteServer queue and send them into the tunnel -func wsWriter(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int) { +func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) { var req *remoteRequest var err error for { @@ -133,7 +133,7 @@ func wsWriter(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch continue } // write the request into the tunnel - ws.SetWriteDeadline(time.Now().Add(wsTimeout)) + ws.SetWriteDeadline(time.Time{}) // no timeout, there's the ping-pong for that var w io.WriteCloser w, err = ws.NextWriter(websocket.BinaryMessage) // got an error, reply with a "hey, retry" to the request handler diff --git a/tunnel/wstuncli.go b/tunnel/wstuncli.go index 09615e1..c72993d 100644 --- a/tunnel/wstuncli.go +++ b/tunnel/wstuncli.go @@ -758,7 +758,7 @@ func (wsc *WSConnection) writeResponseMessage(id int16, resp *http.Response) { wsWriterMutex.Lock() defer wsWriterMutex.Unlock() // Write response into the tunnel - wsc.ws.SetWriteDeadline(time.Now().Add(wsc.tun.Timeout)) + wsc.ws.SetWriteDeadline(time.Time{}) // separate ping-pong routine does timeout w, err := wsc.ws.NextWriter(websocket.BinaryMessage) // got an error, reply with a "hey, retry" to the request handler if err != nil { From 51c1280ee8cc7659e08ebcbcfa9175a7ff6d54df Mon Sep 17 00:00:00 2001 From: Marco Porsch Date: Tue, 17 Oct 2023 14:43:18 +0200 Subject: [PATCH 07/10] disable read deadline --- tunnel/ws.go | 6 ++---- tunnel/wstuncli.go | 2 -- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/tunnel/ws.go b/tunnel/ws.go index ca6c7c4..7f98cce 100644 --- a/tunnel/ws.go +++ b/tunnel/ws.go @@ -81,7 +81,7 @@ func wsHandler(t *WSTunnelServer, w http.ResponseWriter, r *http.Request) { // Create synchronization channel ch := make(chan int, 2) // Spawn goroutine to read responses - go wsReader(rs, ws, t.WSTimeout, ch, &rs.readWG) + go wsReader(rs, ws, ch, &rs.readWG) // Send requests wsWriter(rs, ws, ch) } @@ -167,7 +167,7 @@ func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) { } // Read responses from the tunnel and fulfill pending requests -func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch chan int, readWG *sync.WaitGroup) { +func wsReader(rs *remoteServer, ws *websocket.Conn, ch chan int, readWG *sync.WaitGroup) { var err error logToken := cutToken(rs.token) // continue reading until we get an error @@ -188,8 +188,6 @@ func wsReader(rs *remoteServer, ws *websocket.Conn, wsTimeout time.Duration, ch err = fmt.Errorf("non-binary message received, type=%d", t) break } - // give the sender a fixed time to get us the data - ws.SetReadDeadline(time.Now().Add(wsTimeout)) // get request id var id int16 _, err = fmt.Fscanf(io.LimitReader(r, 4), "%04x", &id) diff --git a/tunnel/wstuncli.go b/tunnel/wstuncli.go index c72993d..1d5e5fa 100644 --- a/tunnel/wstuncli.go +++ b/tunnel/wstuncli.go @@ -364,8 +364,6 @@ func (wsc *WSConnection) handleRequests() { wsc.Log.Warn("WS invalid message type", "type", typ) break } - // give the sender a minute to produce the request - wsc.ws.SetReadDeadline(time.Now().Add(time.Minute)) // read request id var id int16 _, err = fmt.Fscanf(io.LimitReader(r, 4), "%04x", &id) From d6000e0ff270c57dcced2861f99e06e8a5d7a3de Mon Sep 17 00:00:00 2001 From: Marco Porsch Date: Mon, 5 Aug 2024 16:47:48 +0200 Subject: [PATCH 08/10] replace sync.WaitGroup with sync.Cond This fixes a race condition caused by the invalid use of sync.WaitGroup which caused "panic: sync: WaitGroup is reused before previous Wait has returned". --- tunnel/ws.go | 24 ++++++++++++------------ tunnel/wstunsrv.go | 11 +++++++---- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/tunnel/ws.go b/tunnel/ws.go index 7f98cce..c113aeb 100644 --- a/tunnel/ws.go +++ b/tunnel/ws.go @@ -8,7 +8,6 @@ import ( "html" "io" "net/http" - "sync" // imported per documentation - https://golang.org/pkg/net/http/pprof/ _ "net/http/pprof" @@ -25,7 +24,7 @@ func httpError(log log15.Logger, w http.ResponseWriter, token, err string, code http.Error(w, html.EscapeString(err), code) } -//websocket error constants +// websocket error constants const ( wsReadClose = iota wsReadError = iota @@ -81,7 +80,7 @@ func wsHandler(t *WSTunnelServer, w http.ResponseWriter, r *http.Request) { // Create synchronization channel ch := make(chan int, 2) // Spawn goroutine to read responses - go wsReader(rs, ws, ch, &rs.readWG) + go wsReader(rs, ws, ch) // Send requests wsWriter(rs, ws, ch) } @@ -167,15 +166,19 @@ func wsWriter(rs *remoteServer, ws *websocket.Conn, ch chan int) { } // Read responses from the tunnel and fulfill pending requests -func wsReader(rs *remoteServer, ws *websocket.Conn, ch chan int, readWG *sync.WaitGroup) { +func wsReader(rs *remoteServer, ws *websocket.Conn, ch chan int) { var err error logToken := cutToken(rs.token) + + // the mutex remains locked unless we are within Cond.Wait() + rs.readCond.L.Lock() + defer func() { + rs.readCond.L.Unlock() + rs.readCond.Signal() + }() + // continue reading until we get an error for { - // wait if another response is being sent - readWG.Wait() - // increment the WaitGroup counter - readWG.Add(1) ws.SetReadDeadline(time.Time{}) // no timeout, there's the ping-pong for that // read a message from the tunnel var t int @@ -205,20 +208,17 @@ func wsReader(rs *remoteServer, ws *websocket.Conn, ch chan int, readWG *sync.Wa // try to enqueue response select { case req.replyChan <- rb: - // great! rs.log.Info("WS RCV enqueued response", "id", id, "ws", wsp(ws)) + rs.readCond.Wait() // wait for response to be sent default: - readWG.Done() rs.log.Info("WS RCV can't enqueue response", "id", id, "ws", wsp(ws)) } } else { - readWG.Done() rs.log.Info("%s #%d: WS RCV orphan response", "id", id, "ws", wsp(ws)) } } // print error message if err != nil { - readWG.Done() rs.log.Info("WS closing", "token", logToken, "err", err.Error(), "ws", wsp(ws)) } // close up shop diff --git a/tunnel/wstunsrv.go b/tunnel/wstunsrv.go index de862ed..15643fa 100644 --- a/tunnel/wstunsrv.go +++ b/tunnel/wstunsrv.go @@ -77,7 +77,8 @@ type remoteServer struct { requestSet map[int16]*remoteRequest // all requests in queue/flight indexed by ID requestSetMutex sync.Mutex log log15.Logger - readWG sync.WaitGroup + readMutex sync.Mutex // ensure that no more than one goroutine calls the websocket read methods concurrently + readCond *sync.Cond // (NextReader, SetReadDeadline, SetPingHandler, ...) } //WSTunnelServer a wstunnel server construct @@ -350,8 +351,10 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r } // Ensure we retire the request when we pop out of this function + // and release the lock on reading new requests defer func() { rs.RetireRequest(req) + rs.readCond.Signal() }() // enqueue request @@ -373,7 +376,7 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r case resp := <-req.replyChan: // if there's no error just respond if resp.err == nil { - code := writeResponse(rs, w, resp.response) + code := writeResponse(w, resp.response) req.log.Info("HTTP RET", "status", code) return } @@ -427,6 +430,7 @@ func (t *WSTunnelServer) getRemoteServer(tok token, create bool) *remoteServer { requestSet: make(map[int16]*remoteRequest), log: log15.New("token", cutToken(tok)), } + rs.readCond = sync.NewCond(&rs.readMutex) t.serverRegistry[tok] = rs return rs } @@ -499,8 +503,7 @@ var censoredHeaders = []string{ } // Write an HTTP response from a byte buffer into a ResponseWriter -func writeResponse(rs *remoteServer, w http.ResponseWriter, r io.Reader) int { - defer rs.readWG.Done() +func writeResponse(w http.ResponseWriter, r io.Reader) int { resp, err := http.ReadResponse(bufio.NewReader(r), nil) if err != nil { log15.Info("WriteResponse: can't parse incoming response", "err", err) From 828c58714cb0085cd2edfc7b69fe57e25ac6c391 Mon Sep 17 00:00:00 2001 From: Marco Porsch Date: Thu, 8 Aug 2024 13:43:46 +0200 Subject: [PATCH 09/10] fix mutex release on retry causing HTTP 506 error --- tunnel/wstunsrv.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tunnel/wstunsrv.go b/tunnel/wstunsrv.go index 15643fa..7201b9c 100644 --- a/tunnel/wstunsrv.go +++ b/tunnel/wstunsrv.go @@ -354,7 +354,9 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r // and release the lock on reading new requests defer func() { rs.RetireRequest(req) - rs.readCond.Signal() + if !retry { + rs.readCond.Signal() + } }() // enqueue request From 032a8fad7a938620382bf97f2438cad49616363d Mon Sep 17 00:00:00 2001 From: Marco Porsch Date: Mon, 19 Aug 2024 13:06:06 +0200 Subject: [PATCH 10/10] fix race condition causing server tunnel reader to be stuck waiting encountered when the chan in the server HTTP payloadHandler sends the signal before the tunnel reader started waiting fixed by obtaining the lock before sending the signal which is only available if the tunnel reader is in wait or has closed already --- tunnel/wstunsrv.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tunnel/wstunsrv.go b/tunnel/wstunsrv.go index 7201b9c..fcabf19 100644 --- a/tunnel/wstunsrv.go +++ b/tunnel/wstunsrv.go @@ -351,11 +351,13 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r } // Ensure we retire the request when we pop out of this function - // and release the lock on reading new requests + // and signal the tunnel reader to continue defer func() { rs.RetireRequest(req) if !retry { + rs.readCond.L.Lock() // make sure the reader is in Wait() rs.readCond.Signal() + rs.readCond.L.Unlock() } }() @@ -373,6 +375,7 @@ func getResponse(t *WSTunnelServer, req *remoteRequest, w http.ResponseWriter, r } req.log.Info("HTTP RCV", "verb", r.Method, "url", r.URL, "addr", req.remoteAddr, "x-host", r.Header.Get("X-Host"), "try", try) + // wait for response select { case resp := <-req.replyChan: