Skip to content

Commit 7bf8a91

Browse files
committed
save
1 parent fddde90 commit 7bf8a91

File tree

4 files changed

+184
-63
lines changed

4 files changed

+184
-63
lines changed

common/bufio/packet_reactor.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
const (
18-
batchReadTimeout = 250 * time.Millisecond
18+
batchReadTimeout = 5 *time.Second
1919
)
2020

2121
const (
@@ -141,12 +141,14 @@ type reactorStream struct {
141141
destination N.PacketWriter
142142
originSource N.PacketReader
143143

144-
pushable N.PacketPushable
145-
pollable N.PacketPollable
146-
options N.ReadWaitOptions
147-
readWaiter N.PacketReadWaiter
148-
readCounters []N.CountFunc
149-
writeCounters []N.CountFunc
144+
pushable N.PacketPushable
145+
pushableCallback func()
146+
pollable N.PacketPollable
147+
deadlineSetter deadlineSetter
148+
options N.ReadWaitOptions
149+
readWaiter N.PacketReadWaiter
150+
readCounters []N.CountFunc
151+
writeCounters []N.CountFunc
150152

151153
state atomic.Int32
152154
}
@@ -245,11 +247,27 @@ func (r *PacketReactor) registerStream(stream *reactorStream) {
245247
return
246248
}
247249

250+
// Check if deadline setter is available and functional
251+
if setter, ok := stream.source.(deadlineSetter); ok {
252+
err := setter.SetReadDeadline(time.Time{})
253+
if err != nil {
254+
r.logger.Trace("packet stream: SetReadDeadline not supported, using legacy copy")
255+
go stream.runLegacyCopy()
256+
return
257+
}
258+
stream.deadlineSetter = setter
259+
} else {
260+
r.logger.Trace("packet stream: no deadline setter, using legacy copy")
261+
go stream.runLegacyCopy()
262+
return
263+
}
264+
248265
if stream.pushable != nil {
249266
r.logger.Trace("packet stream: using pushable mode")
250-
stream.pushable.SetOnDataReady(func() {
267+
stream.pushableCallback = func() {
251268
go stream.runActiveLoop(nil)
252-
})
269+
}
270+
stream.pushable.SetOnDataReady(stream.pushableCallback)
253271
if stream.pushable.HasPendingData() {
254272
go stream.runActiveLoop(nil)
255273
}
@@ -293,6 +311,10 @@ func (s *reactorStream) runActiveLoop(firstPacket *N.PacketBuffer) {
293311
return
294312
}
295313

314+
if s.pushable != nil {
315+
s.pushable.SetOnDataReady(nil)
316+
}
317+
296318
notFirstTime := false
297319

298320
if firstPacket != nil {
@@ -309,8 +331,12 @@ func (s *reactorStream) runActiveLoop(firstPacket *N.PacketBuffer) {
309331
return
310332
}
311333

312-
if setter, ok := s.source.(interface{ SetReadDeadline(time.Time) error }); ok {
313-
setter.SetReadDeadline(time.Now().Add(batchReadTimeout))
334+
deadlineErr := s.deadlineSetter.SetReadDeadline(time.Now().Add(batchReadTimeout))
335+
if deadlineErr != nil {
336+
s.connection.reactor.logger.Trace("packet stream: SetReadDeadline failed, switching to legacy copy")
337+
s.state.Store(stateIdle)
338+
go s.runLegacyCopy()
339+
return
314340
}
315341

316342
var (
@@ -341,16 +367,16 @@ func (s *reactorStream) runActiveLoop(firstPacket *N.PacketBuffer) {
341367

342368
if err != nil {
343369
if E.IsTimeout(err) {
344-
if setter, ok := s.source.(interface{ SetReadDeadline(time.Time) error }); ok {
345-
setter.SetReadDeadline(time.Time{})
346-
}
370+
s.deadlineSetter.SetReadDeadline(time.Time{})
347371
if !s.state.CompareAndSwap(stateActive, stateIdle) {
348372
return
349373
}
350374
s.connection.reactor.logger.Trace("packet stream: timeout, returning to idle pool")
351375
if s.pushable != nil {
376+
s.pushable.SetOnDataReady(s.pushableCallback)
352377
if s.pushable.HasPendingData() {
353378
if s.state.CompareAndSwap(stateIdle, stateActive) {
379+
s.pushable.SetOnDataReady(nil)
354380
continue
355381
}
356382
}

common/bufio/stream_reactor.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
)
1717

1818
const (
19-
streamBatchReadTimeout = 250 * time.Millisecond
19+
streamBatchReadTimeout = 5*time.Second
2020
)
2121

2222
func CreateStreamPollable(reader io.Reader) (N.StreamPollable, bool) {
@@ -88,18 +88,23 @@ type streamConnection struct {
8888
err error
8989
}
9090

91+
type deadlineSetter interface {
92+
SetReadDeadline(time.Time) error
93+
}
94+
9195
type streamDirection struct {
9296
connection *streamConnection
9397

9498
source io.Reader
9599
destination io.Writer
96100
originSource net.Conn
97101

98-
pollable N.StreamPollable
99-
options N.ReadWaitOptions
100-
readWaiter N.ReadWaiter
101-
readCounters []N.CountFunc
102-
writeCounters []N.CountFunc
102+
pollable N.StreamPollable
103+
deadlineSetter deadlineSetter
104+
options N.ReadWaitOptions
105+
readWaiter N.ReadWaiter
106+
readCounters []N.CountFunc
107+
writeCounters []N.CountFunc
103108

104109
isUpload bool
105110
state atomic.Int32
@@ -221,6 +226,21 @@ func (r *StreamReactor) registerDirection(direction *streamDirection) {
221226
return
222227
}
223228

229+
// Check if deadline setter is available and functional
230+
if setter, ok := direction.originSource.(deadlineSetter); ok {
231+
err := setter.SetReadDeadline(time.Time{})
232+
if err != nil {
233+
r.logger.Trace("stream direction: SetReadDeadline not supported, using legacy copy")
234+
go direction.runLegacyCopy()
235+
return
236+
}
237+
direction.deadlineSetter = setter
238+
} else {
239+
r.logger.Trace("stream direction: no deadline setter, using legacy copy")
240+
go direction.runLegacyCopy()
241+
return
242+
}
243+
224244
// Check if there's buffered data that needs processing first
225245
if direction.pollable != nil && direction.pollable.Buffered() > 0 {
226246
r.logger.Trace("stream direction: has buffered data, starting active loop")
@@ -261,8 +281,12 @@ func (d *streamDirection) runActiveLoop() {
261281
}
262282

263283
// Set batch read timeout
264-
if setter, ok := d.originSource.(interface{ SetReadDeadline(time.Time) error }); ok {
265-
setter.SetReadDeadline(time.Now().Add(streamBatchReadTimeout))
284+
deadlineErr := d.deadlineSetter.SetReadDeadline(time.Now().Add(streamBatchReadTimeout))
285+
if deadlineErr != nil {
286+
d.connection.reactor.logger.Trace("stream direction: SetReadDeadline failed, switching to legacy copy")
287+
d.state.Store(stateIdle)
288+
go d.runLegacyCopy()
289+
return
266290
}
267291

268292
var (
@@ -284,9 +308,7 @@ func (d *streamDirection) runActiveLoop() {
284308
if err != nil {
285309
if E.IsTimeout(err) {
286310
// Timeout: check buffer and return to pool
287-
if setter, ok := d.originSource.(interface{ SetReadDeadline(time.Time) error }); ok {
288-
setter.SetReadDeadline(time.Time{})
289-
}
311+
d.deadlineSetter.SetReadDeadline(time.Time{})
290312
if d.state.CompareAndSwap(stateActive, stateIdle) {
291313
d.connection.reactor.logger.Trace("stream direction: timeout, returning to idle pool")
292314
d.returnToPool()

common/udpnat2/conn.go

Lines changed: 105 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,31 +40,45 @@ type natConn struct {
4040
queueMutex sync.Mutex
4141
onDataReady func()
4242

43+
deadlineMutex sync.Mutex
44+
deadlineTimer *time.Timer
45+
deadlineChan chan struct{}
46+
dataSignal chan struct{}
47+
4348
closeOnce sync.Once
4449
doneChan chan struct{}
4550
}
4651

47-
func (c *natConn) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) {
48-
select {
49-
case <-c.doneChan:
50-
return M.Socksaddr{}, io.ErrClosedPipe
51-
default:
52-
}
52+
func (c *natConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
53+
for {
54+
select {
55+
case <-c.doneChan:
56+
return M.Socksaddr{}, io.ErrClosedPipe
57+
default:
58+
}
5359

54-
c.queueMutex.Lock()
55-
if len(c.dataQueue) == 0 {
60+
c.queueMutex.Lock()
61+
if len(c.dataQueue) > 0 {
62+
packet := c.dataQueue[0]
63+
c.dataQueue = c.dataQueue[1:]
64+
c.queueMutex.Unlock()
65+
_, err = buffer.ReadOnceFrom(packet.Buffer)
66+
destination = packet.Destination
67+
packet.Buffer.Release()
68+
N.PutPacketBuffer(packet)
69+
return
70+
}
5671
c.queueMutex.Unlock()
57-
return M.Socksaddr{}, os.ErrDeadlineExceeded
58-
}
59-
packet := c.dataQueue[0]
60-
c.dataQueue = c.dataQueue[1:]
61-
c.queueMutex.Unlock()
6272

63-
_, err = buffer.ReadOnceFrom(packet.Buffer)
64-
destination := packet.Destination
65-
packet.Buffer.Release()
66-
N.PutPacketBuffer(packet)
67-
return destination, err
73+
select {
74+
case <-c.doneChan:
75+
return M.Socksaddr{}, io.ErrClosedPipe
76+
case <-c.waitDeadline():
77+
return M.Socksaddr{}, os.ErrDeadlineExceeded
78+
case <-c.dataSignal:
79+
continue
80+
}
81+
}
6882
}
6983

7084
func (c *natConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
@@ -79,25 +93,34 @@ func (c *natConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool
7993
}
8094

8195
func (c *natConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
82-
select {
83-
case <-c.doneChan:
84-
return nil, M.Socksaddr{}, io.ErrClosedPipe
85-
default:
86-
}
96+
for {
97+
select {
98+
case <-c.doneChan:
99+
return nil, M.Socksaddr{}, io.ErrClosedPipe
100+
default:
101+
}
87102

88-
c.queueMutex.Lock()
89-
if len(c.dataQueue) == 0 {
103+
c.queueMutex.Lock()
104+
if len(c.dataQueue) > 0 {
105+
packet := c.dataQueue[0]
106+
c.dataQueue = c.dataQueue[1:]
107+
c.queueMutex.Unlock()
108+
buffer = c.readWaitOptions.Copy(packet.Buffer)
109+
destination = packet.Destination
110+
N.PutPacketBuffer(packet)
111+
return
112+
}
90113
c.queueMutex.Unlock()
91-
return nil, M.Socksaddr{}, os.ErrDeadlineExceeded
92-
}
93-
packet := c.dataQueue[0]
94-
c.dataQueue = c.dataQueue[1:]
95-
c.queueMutex.Unlock()
96114

97-
buffer = c.readWaitOptions.Copy(packet.Buffer)
98-
destination = packet.Destination
99-
N.PutPacketBuffer(packet)
100-
return
115+
select {
116+
case <-c.doneChan:
117+
return nil, M.Socksaddr{}, io.ErrClosedPipe
118+
case <-c.waitDeadline():
119+
return nil, M.Socksaddr{}, os.ErrDeadlineExceeded
120+
case <-c.dataSignal:
121+
continue
122+
}
123+
}
101124
}
102125

103126
func (c *natConn) SetHandler(handler N.UDPHandlerEx) {
@@ -141,6 +164,11 @@ func (c *natConn) PushPacket(packet *N.PacketBuffer) {
141164
callback := c.onDataReady
142165
c.queueMutex.Unlock()
143166

167+
select {
168+
case c.dataSignal <- struct{}{}:
169+
default:
170+
}
171+
144172
if callback != nil {
145173
callback()
146174
}
@@ -187,9 +215,52 @@ func (c *natConn) SetDeadline(t time.Time) error {
187215
}
188216

189217
func (c *natConn) SetReadDeadline(t time.Time) error {
218+
c.deadlineMutex.Lock()
219+
defer c.deadlineMutex.Unlock()
220+
221+
if c.deadlineTimer != nil && !c.deadlineTimer.Stop() {
222+
<-c.deadlineChan
223+
}
224+
c.deadlineTimer = nil
225+
226+
if t.IsZero() {
227+
if isClosedChan(c.deadlineChan) {
228+
c.deadlineChan = make(chan struct{})
229+
}
230+
return nil
231+
}
232+
233+
if duration := time.Until(t); duration > 0 {
234+
if isClosedChan(c.deadlineChan) {
235+
c.deadlineChan = make(chan struct{})
236+
}
237+
c.deadlineTimer = time.AfterFunc(duration, func() {
238+
close(c.deadlineChan)
239+
})
240+
return nil
241+
}
242+
243+
if !isClosedChan(c.deadlineChan) {
244+
close(c.deadlineChan)
245+
}
190246
return nil
191247
}
192248

249+
func (c *natConn) waitDeadline() chan struct{} {
250+
c.deadlineMutex.Lock()
251+
defer c.deadlineMutex.Unlock()
252+
return c.deadlineChan
253+
}
254+
255+
func isClosedChan(channel <-chan struct{}) bool {
256+
select {
257+
case <-channel:
258+
return true
259+
default:
260+
return false
261+
}
262+
}
263+
193264
func (c *natConn) SetWriteDeadline(t time.Time) error {
194265
return os.ErrInvalid
195266
}

common/udpnat2/service.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,12 @@ func (s *Service) NewPacket(bufferSlices [][]byte, source M.Socksaddr, destinati
5656
return nil, false
5757
}
5858
newConn := &natConn{
59-
cache: s.cache,
60-
writer: writer,
61-
localAddr: source,
62-
doneChan: make(chan struct{}),
59+
cache: s.cache,
60+
writer: writer,
61+
localAddr: source,
62+
deadlineChan: make(chan struct{}),
63+
dataSignal: make(chan struct{}, 1),
64+
doneChan: make(chan struct{}),
6365
}
6466
go s.handler.NewPacketConnectionEx(ctx, newConn, source, destination, onClose)
6567
return newConn, true

0 commit comments

Comments
 (0)