From fa2e6c7d0364d67109e2f6fcd5b6172f8f5d9430 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E9=B8=BB?= Date: Tue, 30 Apr 2024 10:27:43 +0800 Subject: [PATCH 1/5] bugfix (share/tunnel/tunnel_out_ssh_udp.go): 1. fix the udpConns does not release new conn in udpConn map when the length is over maxConns. The length of map continues growing and eventually makes new connections cannot read. 2. make maxConns configurable --- share/tunnel/tunnel_out_ssh_udp.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/share/tunnel/tunnel_out_ssh_udp.go b/share/tunnel/tunnel_out_ssh_udp.go index d3c4c62f..0bddb195 100644 --- a/share/tunnel/tunnel_out_ssh_udp.go +++ b/share/tunnel/tunnel_out_ssh_udp.go @@ -28,6 +28,7 @@ func (t *Tunnel) handleUDP(l *cio.Logger, rwc io.ReadWriteCloser, hostPort strin }, udpConns: conns, maxMTU: settings.EnvInt("UDP_MAX_SIZE", 9012), + maxConns: settings.EnvInt("UDP_MAX_CONNS", 100), } h.Debugf("UDP max size: %d bytes", h.maxMTU) for { @@ -43,7 +44,8 @@ type udpHandler struct { hostPort string *udpChannel *udpConns - maxMTU int + maxMTU int + maxConns int } func (h *udpHandler) handleWrite(p *udpPacket) error { @@ -62,13 +64,8 @@ func (h *udpHandler) handleWrite(p *udpPacket) error { //TODO++ dont use go-routines, switch to pollable // array of listeners where all listeners are // sweeped periodically, removing the idle ones - const maxConns = 100 if !exists { - if h.udpConns.len() <= maxConns { - go h.handleRead(p, conn) - } else { - h.Debugf("exceeded max udp connections (%d)", maxConns) - } + go h.handleRead(p, conn) } _, err = conn.Write(p.Payload) if err != nil { @@ -80,6 +77,10 @@ func (h *udpHandler) handleWrite(p *udpPacket) error { func (h *udpHandler) handleRead(p *udpPacket, conn *udpConn) { //ensure connection is cleaned up defer h.udpConns.remove(conn.id) + if h.udpConns.len() > h.maxConns { + h.Debugf("exceeded max udp connections (%d)", h.maxConns) + return + } buff := make([]byte, h.maxMTU) for { //response must arrive within 15 seconds From 574cb30bc4fea4cb769619c3e8836eaed64b0815 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E9=B8=BB?= Date: Tue, 30 Apr 2024 13:01:22 +0800 Subject: [PATCH 2/5] refactor(tunnel_out_ssh_udp): add setWriteOnly to ensure the udp conns over max size can not read but can also write to ensure consistent usage with previous implementation. --- share/tunnel/tunnel_out_ssh_udp.go | 67 +++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/share/tunnel/tunnel_out_ssh_udp.go b/share/tunnel/tunnel_out_ssh_udp.go index 0bddb195..d2ca860f 100644 --- a/share/tunnel/tunnel_out_ssh_udp.go +++ b/share/tunnel/tunnel_out_ssh_udp.go @@ -16,6 +16,7 @@ func (t *Tunnel) handleUDP(l *cio.Logger, rwc io.ReadWriteCloser, hostPort strin conns := &udpConns{ Logger: l, m: map[string]*udpConn{}, + wm: map[string]*udpConn{}, } defer conns.closeAll() h := &udpHandler{ @@ -65,7 +66,13 @@ func (h *udpHandler) handleWrite(p *udpPacket) error { // array of listeners where all listeners are // sweeped periodically, removing the idle ones if !exists { - go h.handleRead(p, conn) + if h.udpConns.len() <= h.maxConns { + go h.handleRead(p, conn) + } else { + //write only + h.udpConns.setWriteOnly(conn.id) + h.Debugf("exceeded max udp connections (%d)", h.maxConns) + } } _, err = conn.Write(p.Payload) if err != nil { @@ -76,7 +83,10 @@ func (h *udpHandler) handleWrite(p *udpPacket) error { func (h *udpHandler) handleRead(p *udpPacket, conn *udpConn) { //ensure connection is cleaned up - defer h.udpConns.remove(conn.id) + defer func() { + h.udpConns.remove(conn.id) + conn.Close() + }() if h.udpConns.len() > h.maxConns { h.Debugf("exceeded max udp connections (%d)", h.maxConns) return @@ -107,7 +117,8 @@ func (h *udpHandler) handleRead(p *udpPacket, conn *udpConn) { type udpConns struct { *cio.Logger sync.Mutex - m map[string]*udpConn + m map[string]*udpConn + wm map[string]*udpConn //write only } func (cs *udpConns) dial(id, addr string) (*udpConn, bool, error) { @@ -115,15 +126,18 @@ func (cs *udpConns) dial(id, addr string) (*udpConn, bool, error) { defer cs.Unlock() conn, ok := cs.m[id] if !ok { - c, err := net.Dial("udp", addr) - if err != nil { - return nil, false, err - } - conn = &udpConn{ - id: id, - Conn: c, // cnet.MeterConn(cs.Logger.Fork(addr), c), + conn, ok = cs.wm[id] + if !ok { + c, err := net.Dial("udp", addr) + if err != nil { + return nil, false, err + } + conn = &udpConn{ + id: id, + Conn: c, // cnet.MeterConn(cs.Logger.Fork(addr), c), + } + cs.m[id] = conn } - cs.m[id] = conn } return conn, ok, nil } @@ -150,7 +164,38 @@ func (cs *udpConns) closeAll() { cs.Unlock() } +func (cs *udpConns) setWriteOnly(id string) { + cs.Lock() + conn, ok := cs.m[id] + if ok { + delete(cs.m, id) + conn.writeTimer = time.AfterFunc(settings.EnvDuration("UDP_DEADLINE", 15*time.Second), func() { + cs.Lock() + defer cs.Unlock() + delete(cs.wm, conn.id) + conn.Close() + }) + cs.wm[id] = conn + } + cs.Unlock() +} + type udpConn struct { id string net.Conn + writeTimer *time.Timer +} + +func (w *udpConn) Write(b []byte) (int, error) { + if w.writeTimer != nil { + w.writeTimer.Reset(settings.EnvDuration("UDP_DEADLINE", 15*time.Second)) + } + return w.Conn.Write(b) +} + +func (w *udpConn) Close() error { + if w.writeTimer != nil { + w.writeTimer.Stop() + } + return w.Conn.Close() } From a5cd1fca9c718bc13f5c1569992bf16899b30a77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E9=B8=BB?= Date: Tue, 30 Apr 2024 15:23:07 +0800 Subject: [PATCH 3/5] fix timer usage --- share/tunnel/tunnel_out_ssh_udp.go | 1 + 1 file changed, 1 insertion(+) diff --git a/share/tunnel/tunnel_out_ssh_udp.go b/share/tunnel/tunnel_out_ssh_udp.go index d2ca860f..881215b8 100644 --- a/share/tunnel/tunnel_out_ssh_udp.go +++ b/share/tunnel/tunnel_out_ssh_udp.go @@ -188,6 +188,7 @@ type udpConn struct { func (w *udpConn) Write(b []byte) (int, error) { if w.writeTimer != nil { + w.writeTimer.Stop() w.writeTimer.Reset(settings.EnvDuration("UDP_DEADLINE", 15*time.Second)) } return w.Conn.Write(b) From daa4fd6034fd18db37335c3cc7f4ce27da80f452 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E9=B8=BB?= Date: Mon, 6 May 2024 14:35:16 +0800 Subject: [PATCH 4/5] refactor(tunnel_out_ssh_udp): simplify the implementation of write only udp connection --- share/tunnel/tunnel_out_ssh_udp.go | 33 +++++++++++------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/share/tunnel/tunnel_out_ssh_udp.go b/share/tunnel/tunnel_out_ssh_udp.go index 881215b8..c09762b5 100644 --- a/share/tunnel/tunnel_out_ssh_udp.go +++ b/share/tunnel/tunnel_out_ssh_udp.go @@ -16,7 +16,6 @@ func (t *Tunnel) handleUDP(l *cio.Logger, rwc io.ReadWriteCloser, hostPort strin conns := &udpConns{ Logger: l, m: map[string]*udpConn{}, - wm: map[string]*udpConn{}, } defer conns.closeAll() h := &udpHandler{ @@ -70,7 +69,7 @@ func (h *udpHandler) handleWrite(p *udpPacket) error { go h.handleRead(p, conn) } else { //write only - h.udpConns.setWriteOnly(conn.id) + h.udpConns.setCleanUpTimer(conn.id) h.Debugf("exceeded max udp connections (%d)", h.maxConns) } } @@ -117,8 +116,7 @@ func (h *udpHandler) handleRead(p *udpPacket, conn *udpConn) { type udpConns struct { *cio.Logger sync.Mutex - m map[string]*udpConn - wm map[string]*udpConn //write only + m map[string]*udpConn } func (cs *udpConns) dial(id, addr string) (*udpConn, bool, error) { @@ -126,18 +124,15 @@ func (cs *udpConns) dial(id, addr string) (*udpConn, bool, error) { defer cs.Unlock() conn, ok := cs.m[id] if !ok { - conn, ok = cs.wm[id] - if !ok { - c, err := net.Dial("udp", addr) - if err != nil { - return nil, false, err - } - conn = &udpConn{ - id: id, - Conn: c, // cnet.MeterConn(cs.Logger.Fork(addr), c), - } - cs.m[id] = conn + c, err := net.Dial("udp", addr) + if err != nil { + return nil, false, err } + conn = &udpConn{ + id: id, + Conn: c, // cnet.MeterConn(cs.Logger.Fork(addr), c), + } + cs.m[id] = conn } return conn, ok, nil } @@ -164,18 +159,14 @@ func (cs *udpConns) closeAll() { cs.Unlock() } -func (cs *udpConns) setWriteOnly(id string) { +func (cs *udpConns) setCleanUpTimer(id string) { cs.Lock() conn, ok := cs.m[id] if ok { - delete(cs.m, id) conn.writeTimer = time.AfterFunc(settings.EnvDuration("UDP_DEADLINE", 15*time.Second), func() { - cs.Lock() - defer cs.Unlock() - delete(cs.wm, conn.id) + cs.remove(conn.id) conn.Close() }) - cs.wm[id] = conn } cs.Unlock() } From e658cedbf83f23da8a3710908c26191bf3f04921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E9=B8=BB?= Date: Tue, 7 May 2024 17:24:46 +0800 Subject: [PATCH 5/5] fix(tunnel_out_ssh_udp): use defer func to unlock --- share/tunnel/tunnel_out_ssh_udp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/share/tunnel/tunnel_out_ssh_udp.go b/share/tunnel/tunnel_out_ssh_udp.go index c09762b5..de5bad17 100644 --- a/share/tunnel/tunnel_out_ssh_udp.go +++ b/share/tunnel/tunnel_out_ssh_udp.go @@ -161,6 +161,7 @@ func (cs *udpConns) closeAll() { func (cs *udpConns) setCleanUpTimer(id string) { cs.Lock() + defer cs.Unlock() conn, ok := cs.m[id] if ok { conn.writeTimer = time.AfterFunc(settings.EnvDuration("UDP_DEADLINE", 15*time.Second), func() { @@ -168,7 +169,6 @@ func (cs *udpConns) setCleanUpTimer(id string) { conn.Close() }) } - cs.Unlock() } type udpConn struct {