88 "fmt"
99 "io"
1010 "net/http"
11+ "sync/atomic"
1112 "time"
1213
1314 "github.com/gorilla/websocket"
@@ -28,28 +29,64 @@ func NewResponseHeader(req *http.Request) http.Header {
2829 return header
2930}
3031
32+ type bidirectionalStreamStatus struct {
33+ doneChan chan struct {}
34+ anyDone uint32
35+ }
36+
37+ func newBiStreamStatus () * bidirectionalStreamStatus {
38+ return & bidirectionalStreamStatus {
39+ doneChan : make (chan struct {}, 2 ),
40+ anyDone : 0 ,
41+ }
42+ }
43+
44+ func (s * bidirectionalStreamStatus ) markUniStreamDone () {
45+ atomic .StoreUint32 (& s .anyDone , 1 )
46+ s .doneChan <- struct {}{}
47+ }
48+
49+ func (s * bidirectionalStreamStatus ) waitAnyDone () {
50+ <- s .doneChan
51+ }
52+ func (s * bidirectionalStreamStatus ) isAnyDone () bool {
53+ return atomic .LoadUint32 (& s .anyDone ) > 0
54+ }
55+
3156// Stream copies copy data to & from provided io.ReadWriters.
3257func Stream (tunnelConn , originConn io.ReadWriter , log * zerolog.Logger ) {
33- proxyDone := make ( chan struct {}, 2 )
58+ status := newBiStreamStatus ( )
3459
35- go func () {
36- _ , err := copyData (tunnelConn , originConn , "origin->tunnel" )
37- if err != nil {
38- log .Debug ().Msgf ("origin to tunnel copy: %v" , err )
39- }
40- proxyDone <- struct {}{}
41- }()
60+ go unidirectionalStream (tunnelConn , originConn , "origin->tunnel" , status , log )
61+ go unidirectionalStream (originConn , tunnelConn , "tunnel->origin" , status , log )
62+
63+ // If one side is done, we are done.
64+ status .waitAnyDone ()
65+ }
4266
43- go func () {
44- _ , err := copyData (originConn , tunnelConn , "tunnel->origin" )
45- if err != nil {
46- log .Debug ().Msgf ("tunnel to origin copy: %v" , err )
67+ func unidirectionalStream (dst io.Writer , src io.Reader , dir string , status * bidirectionalStreamStatus , log * zerolog.Logger ) {
68+ defer func () {
69+ // The bidirectional streaming spawns 2 goroutines to stream each direction.
70+ // If any ends, the callstack returns, meaning the Tunnel request/stream (depending on http2 vs quic) will
71+ // close. In such case, if the other direction did not stop (due to application level stopping, e.g., if a
72+ // server/origin listens forever until closure), it may read/write from the underlying ReadWriter (backed by
73+ // the Edge<->cloudflared transport) in an unexpected state.
74+
75+ if status .isAnyDone () {
76+ // Because of this, we set this recover() logic, which kicks-in *only* if any stream is known to have
77+ // exited. In such case, we stop a possible panic from propagating upstream.
78+ if r := recover (); r != nil {
79+ // We handle such unexpected errors only when we detect that one side of the streaming is done.
80+ log .Debug ().Msgf ("Handled gracefully error %v in Streaming for %s" , r , dir )
81+ }
4782 }
48- proxyDone <- struct {}{}
4983 }()
5084
51- // If one side is done, we are done.
52- <- proxyDone
85+ _ , err := copyData (dst , src , dir )
86+ if err != nil {
87+ log .Debug ().Msgf ("%s copy: %v" , dir , err )
88+ }
89+ status .markUniStreamDone ()
5390}
5491
5592// when set to true, enables logging of content copied to/from origin and tunnel
0 commit comments