Skip to content

Commit 1163438

Browse files
authored
Limit the maximum number of udp connections
1 parent 0aaed67 commit 1163438

File tree

2 files changed

+73
-60
lines changed

2 files changed

+73
-60
lines changed

daze.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"net/url"
2323
"os"
2424
"path/filepath"
25-
"slices"
2625
"strconv"
2726
"strings"
2827
"sync"
@@ -50,12 +49,14 @@ import (
5049
var Conf = struct {
5150
DialerTimeout time.Duration
5251
RouterLruSize int
52+
Socks5LruSize int
5353
}{
5454
DialerTimeout: time.Second * 8,
5555
// A single cache entry represents a single host or DNS name lookup. Make the cache as large as the maximum number
5656
// of clients that access your web site concurrently. Note that setting the cache size too high is a waste of
5757
// memory and degrades performance.
5858
RouterLruSize: 64,
59+
Socks5LruSize: 8,
5960
}
6061

6162
// ResolverDns returns a DNS resolver.
@@ -409,6 +410,28 @@ func (l *Locale) ServeSocks5TCP(ctx *Context, cli io.ReadWriteCloser, dst string
409410
return err
410411
}
411412

413+
// ServeSocks5UDPRead handles the reading and forwarding of udp data.
414+
func (l *Locale) ServeSocks5UDPRead(srv io.Reader, bnd *net.UDPConn, app *net.UDPAddr, pre []byte) error {
415+
var (
416+
buf = make([]byte, 2048)
417+
err error
418+
m = len(pre)
419+
n int
420+
)
421+
copy(buf[:m], pre)
422+
for {
423+
n, err = srv.Read(buf[m:])
424+
if err != nil {
425+
break
426+
}
427+
_, err = bnd.WriteToUDP(buf[:m+n], app)
428+
if err != nil {
429+
break
430+
}
431+
}
432+
return err
433+
}
434+
412435
// ServeSocks5UDP serves socks5 UDP protocol.
413436
func (l *Locale) ServeSocks5UDP(ctx *Context, cli io.ReadWriteCloser) error {
414437
var (
@@ -420,7 +443,7 @@ func (l *Locale) ServeSocks5UDP(ctx *Context, cli io.ReadWriteCloser) error {
420443
bndPort uint16
421444
bnd *net.UDPConn
422445
buf = make([]byte, 2048)
423-
cpl = map[string]io.ReadWriteCloser{}
446+
cpl = lru.New[string, io.ReadWriteCloser](Conf.Socks5LruSize)
424447
dstHost string
425448
dstPort uint16
426449
dst string
@@ -437,6 +460,9 @@ func (l *Locale) ServeSocks5UDP(ctx *Context, cli io.ReadWriteCloser) error {
437460
if err != nil {
438461
return err
439462
}
463+
cpl.Drop = func(k string, v io.ReadWriteCloser) {
464+
v.Close()
465+
}
440466

441467
// https://datatracker.ietf.org/doc/html/rfc1928, Page 7, UDP ASSOCIATE:
442468
// A UDP association terminates when the TCP connection that the UDP ASSOCIATE request arrived on terminates.
@@ -492,30 +518,26 @@ func (l *Locale) ServeSocks5UDP(ctx *Context, cli io.ReadWriteCloser) error {
492518
dstPort = binary.BigEndian.Uint16(appHead[20:22])
493519
}
494520
dst = dstHost + ":" + strconv.Itoa(int(dstPort))
495-
srv = cpl[dst]
496-
if srv == nil {
521+
if !cpl.Has(dst) {
497522
log.Printf("conn: %08x proto format=socks5", ctx.Cid)
498523
srv, err = l.Dialer.Dial(ctx, "udp", dst)
499524
if err != nil {
500525
log.Printf("conn: %08x error %s", ctx.Cid, err)
501526
continue
502527
}
503-
cpl[dst] = srv
504-
retHead := slices.Clone(appHead)
505-
go ReadCall(srv, func(data []byte) error {
506-
return doa.Err(bnd.WriteToUDP(append(retHead, data...), appAddr))
507-
})
528+
cpl.Set(dst, srv)
529+
go l.ServeSocks5UDPRead(srv, bnd, appAddr, appHead)
508530
}
531+
srv = cpl.Get(dst)
509532
_, err = srv.Write(buf[appHeadSize:appSize])
510533
if err != nil {
511534
log.Printf("conn: %08x error %s", ctx.Cid, err)
512-
srv.Close()
513-
delete(cpl, dst)
535+
cpl.Del(dst)
514536
continue
515537
}
516538
}
517-
for _, e := range cpl {
518-
e.Close()
539+
for k := range cpl.C {
540+
cpl.Del(k)
519541
}
520542
return nil
521543
}
@@ -1030,25 +1052,6 @@ func (r *RandomReader) Read(p []byte) (int, error) {
10301052
return len(p), nil
10311053
}
10321054

1033-
// ReadCall reads data from the given io.Reader and passes it to the provided call function.
1034-
func ReadCall(conn io.Reader, call func([]byte) error) error {
1035-
var (
1036-
buf = make([]byte, 2048)
1037-
err error
1038-
n int
1039-
)
1040-
for {
1041-
n, err = conn.Read(buf)
1042-
if err != nil {
1043-
break
1044-
}
1045-
if err = call(buf[:n]); err != nil {
1046-
break
1047-
}
1048-
}
1049-
return err
1050-
}
1051-
10521055
// Salt converts the stupid password passed in by the user to 32-sized byte array.
10531056
func Salt(s string) []byte {
10541057
h := sha256.Sum256([]byte(s))

lib/lru/lru.go

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ func (l *List[K, V]) Insert(e *Elem[K, V]) *Elem[K, V] {
3838
return e
3939
}
4040

41+
// Remove removes e from its list, decrements l's size.
42+
func (l *List[K, V]) Remove(e *Elem[K, V]) {
43+
e.Prev.Next = e.Next
44+
e.Next.Prev = e.Prev
45+
e.Prev = nil // Avoid memory leaks
46+
e.Next = nil // Avoid memory leaks
47+
l.Size--
48+
}
49+
4150
// Update e to next to root.
4251
func (l *List[K, V]) Update(e *Elem[K, V]) {
4352
if l.Root.Next == e {
@@ -51,42 +60,17 @@ func (l *List[K, V]) Update(e *Elem[K, V]) {
5160
e.Next.Prev = e
5261
}
5362

54-
// Remove removes e from its list, decrements l's size.
55-
func (l *List[K, V]) Remove(e *Elem[K, V]) {
56-
e.Prev.Next = e.Next
57-
e.Next.Prev = e.Prev
58-
e.Prev = nil // Avoid memory leaks
59-
e.Next = nil // Avoid memory leaks
60-
l.Size--
61-
}
62-
6363
// Lru cache. It is safe for concurrent access.
6464
type Lru[K comparable, V any] struct {
65-
// Size is the maximum number of cache entries before
66-
// an item is evicted. Zero means no limit.
65+
// Drop is called automatically when an elem is deleted.
66+
Drop func(k K, v V)
67+
// Size is the maximum number of cache entries before an item is evicted. Zero means no limit.
6768
Size int
6869
List *List[K, V]
6970
C map[K]*Elem[K, V]
7071
M *sync.Mutex
7172
}
7273

73-
// Set adds a value to the cache.
74-
func (l *Lru[K, V]) Set(k K, v V) {
75-
l.M.Lock()
76-
defer l.M.Unlock()
77-
if e, ok := l.C[k]; ok {
78-
l.List.Update(e)
79-
e.K = k
80-
e.V = v
81-
return
82-
}
83-
if l.List.Size == l.Size {
84-
delete(l.C, l.List.Root.Prev.K)
85-
l.List.Remove(l.List.Root.Prev)
86-
}
87-
l.C[k] = l.List.Insert(&Elem[K, V]{K: k, V: v})
88-
}
89-
9074
// Get looks up a key's value from the cache.
9175
func (l *Lru[K, V]) GetExists(k K) (v V, ok bool) {
9276
l.M.Lock()
@@ -111,21 +95,47 @@ func (l *Lru[K, V]) Del(k K) {
11195
l.M.Lock()
11296
defer l.M.Unlock()
11397
if e, ok := l.C[k]; ok {
98+
l.Drop(k, e.V)
11499
delete(l.C, k)
115100
l.List.Remove(e)
116101
}
117102
}
118103

104+
// Has returns true if a key exists.
105+
func (l *Lru[K, V]) Has(k K) bool {
106+
_, b := l.C[k]
107+
return b
108+
}
109+
119110
// Len returns the number of items in the cache.
120111
func (l *Lru[K, V]) Len() int {
121112
l.M.Lock()
122113
defer l.M.Unlock()
123114
return l.List.Size
124115
}
125116

117+
// Set adds a value to the cache.
118+
func (l *Lru[K, V]) Set(k K, v V) {
119+
l.M.Lock()
120+
defer l.M.Unlock()
121+
if e, ok := l.C[k]; ok {
122+
l.List.Update(e)
123+
e.K = k
124+
e.V = v
125+
return
126+
}
127+
if l.List.Size == l.Size {
128+
l.Drop(l.List.Root.Prev.K, l.List.Root.Prev.V)
129+
delete(l.C, l.List.Root.Prev.K)
130+
l.List.Remove(l.List.Root.Prev)
131+
}
132+
l.C[k] = l.List.Insert(&Elem[K, V]{K: k, V: v})
133+
}
134+
126135
// New returns a new LRU cache. If size is zero, the cache has no limit.
127136
func New[K comparable, V any](size int) *Lru[K, V] {
128137
return &Lru[K, V]{
138+
Drop: func(k K, v V) {},
129139
Size: size,
130140
List: new(List[K, V]).Init(),
131141
C: map[K]*Elem[K, V]{},

0 commit comments

Comments
 (0)