Skip to content

Commit 025e8b6

Browse files
committed
Add packet reactor
1 parent 449f8b4 commit 025e8b6

18 files changed

+3710
-0
lines changed

common/bufio/channel_demux.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package bufio
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"sync"
7+
"sync/atomic"
8+
9+
N "github.com/sagernet/sing/common/network"
10+
)
11+
12+
type channelDemuxEntry struct {
13+
channel <-chan *N.PacketBuffer
14+
stream *reactorStream
15+
}
16+
17+
type ChannelDemultiplexer struct {
18+
ctx context.Context
19+
cancel context.CancelFunc
20+
mutex sync.Mutex
21+
entries map[<-chan *N.PacketBuffer]*channelDemuxEntry
22+
updateChan chan struct{}
23+
running bool
24+
closed atomic.Bool
25+
wg sync.WaitGroup
26+
}
27+
28+
func NewChannelDemultiplexer(ctx context.Context) *ChannelDemultiplexer {
29+
ctx, cancel := context.WithCancel(ctx)
30+
demux := &ChannelDemultiplexer{
31+
ctx: ctx,
32+
cancel: cancel,
33+
entries: make(map[<-chan *N.PacketBuffer]*channelDemuxEntry),
34+
updateChan: make(chan struct{}, 1),
35+
}
36+
return demux
37+
}
38+
39+
func (p *ChannelDemultiplexer) Add(stream *reactorStream, channel <-chan *N.PacketBuffer) {
40+
p.mutex.Lock()
41+
42+
if p.closed.Load() {
43+
p.mutex.Unlock()
44+
return
45+
}
46+
47+
entry := &channelDemuxEntry{
48+
channel: channel,
49+
stream: stream,
50+
}
51+
p.entries[channel] = entry
52+
if !p.running {
53+
p.running = true
54+
p.wg.Add(1)
55+
go p.run()
56+
}
57+
p.mutex.Unlock()
58+
p.signalUpdate()
59+
}
60+
61+
func (p *ChannelDemultiplexer) Remove(channel <-chan *N.PacketBuffer) {
62+
p.mutex.Lock()
63+
delete(p.entries, channel)
64+
p.mutex.Unlock()
65+
p.signalUpdate()
66+
}
67+
68+
func (p *ChannelDemultiplexer) signalUpdate() {
69+
select {
70+
case p.updateChan <- struct{}{}:
71+
default:
72+
}
73+
}
74+
75+
func (p *ChannelDemultiplexer) Close() error {
76+
p.mutex.Lock()
77+
p.closed.Store(true)
78+
p.mutex.Unlock()
79+
80+
p.cancel()
81+
p.signalUpdate()
82+
p.wg.Wait()
83+
return nil
84+
}
85+
86+
func (p *ChannelDemultiplexer) run() {
87+
defer p.wg.Done()
88+
89+
for {
90+
p.mutex.Lock()
91+
if len(p.entries) == 0 {
92+
p.running = false
93+
p.mutex.Unlock()
94+
return
95+
}
96+
97+
cases := make([]reflect.SelectCase, 0, len(p.entries)+2)
98+
99+
cases = append(cases, reflect.SelectCase{
100+
Dir: reflect.SelectRecv,
101+
Chan: reflect.ValueOf(p.ctx.Done()),
102+
})
103+
104+
cases = append(cases, reflect.SelectCase{
105+
Dir: reflect.SelectRecv,
106+
Chan: reflect.ValueOf(p.updateChan),
107+
})
108+
109+
entryList := make([]*channelDemuxEntry, 0, len(p.entries))
110+
for _, entry := range p.entries {
111+
cases = append(cases, reflect.SelectCase{
112+
Dir: reflect.SelectRecv,
113+
Chan: reflect.ValueOf(entry.channel),
114+
})
115+
entryList = append(entryList, entry)
116+
}
117+
p.mutex.Unlock()
118+
119+
chosen, recv, recvOK := reflect.Select(cases)
120+
121+
switch chosen {
122+
case 0:
123+
p.mutex.Lock()
124+
p.running = false
125+
p.mutex.Unlock()
126+
return
127+
case 1:
128+
continue
129+
default:
130+
entry := entryList[chosen-2]
131+
p.mutex.Lock()
132+
delete(p.entries, entry.channel)
133+
p.mutex.Unlock()
134+
135+
if recvOK {
136+
packet := recv.Interface().(*N.PacketBuffer)
137+
go entry.stream.runActiveLoop(packet)
138+
} else {
139+
go entry.stream.closeWithError(nil)
140+
}
141+
}
142+
}
143+
}

common/bufio/fd_demux_darwin.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
//go:build darwin
2+
3+
package bufio
4+
5+
import (
6+
"context"
7+
"sync"
8+
"sync/atomic"
9+
10+
"golang.org/x/sys/unix"
11+
)
12+
13+
type fdDemuxEntry struct {
14+
fd int
15+
stream *reactorStream
16+
}
17+
18+
type FDDemultiplexer struct {
19+
ctx context.Context
20+
cancel context.CancelFunc
21+
kqFD int
22+
mutex sync.Mutex
23+
entries map[int]*fdDemuxEntry
24+
running bool
25+
closed atomic.Bool
26+
wg sync.WaitGroup
27+
pipeFDs [2]int
28+
}
29+
30+
func NewFDDemultiplexer(ctx context.Context) (*FDDemultiplexer, error) {
31+
kqFD, err := unix.Kqueue()
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
var pipeFDs [2]int
37+
err = unix.Pipe(pipeFDs[:])
38+
if err != nil {
39+
unix.Close(kqFD)
40+
return nil, err
41+
}
42+
43+
unix.SetNonblock(pipeFDs[0], true)
44+
unix.SetNonblock(pipeFDs[1], true)
45+
46+
_, err = unix.Kevent(kqFD, []unix.Kevent_t{{
47+
Ident: uint64(pipeFDs[0]),
48+
Filter: unix.EVFILT_READ,
49+
Flags: unix.EV_ADD,
50+
}}, nil, nil)
51+
if err != nil {
52+
unix.Close(pipeFDs[0])
53+
unix.Close(pipeFDs[1])
54+
unix.Close(kqFD)
55+
return nil, err
56+
}
57+
58+
ctx, cancel := context.WithCancel(ctx)
59+
demux := &FDDemultiplexer{
60+
ctx: ctx,
61+
cancel: cancel,
62+
kqFD: kqFD,
63+
entries: make(map[int]*fdDemuxEntry),
64+
pipeFDs: pipeFDs,
65+
}
66+
return demux, nil
67+
}
68+
69+
func (p *FDDemultiplexer) Add(stream *reactorStream, fd int) error {
70+
p.mutex.Lock()
71+
defer p.mutex.Unlock()
72+
73+
if p.closed.Load() {
74+
return unix.EINVAL
75+
}
76+
77+
_, err := unix.Kevent(p.kqFD, []unix.Kevent_t{{
78+
Ident: uint64(fd),
79+
Filter: unix.EVFILT_READ,
80+
Flags: unix.EV_ADD,
81+
}}, nil, nil)
82+
if err != nil {
83+
return err
84+
}
85+
86+
entry := &fdDemuxEntry{
87+
fd: fd,
88+
stream: stream,
89+
}
90+
p.entries[fd] = entry
91+
92+
if !p.running {
93+
p.running = true
94+
p.wg.Add(1)
95+
go p.run()
96+
}
97+
98+
return nil
99+
}
100+
101+
func (p *FDDemultiplexer) Remove(fd int) {
102+
p.mutex.Lock()
103+
defer p.mutex.Unlock()
104+
105+
_, ok := p.entries[fd]
106+
if !ok {
107+
return
108+
}
109+
110+
unix.Kevent(p.kqFD, []unix.Kevent_t{{
111+
Ident: uint64(fd),
112+
Filter: unix.EVFILT_READ,
113+
Flags: unix.EV_DELETE,
114+
}}, nil, nil)
115+
delete(p.entries, fd)
116+
}
117+
118+
func (p *FDDemultiplexer) wakeup() {
119+
unix.Write(p.pipeFDs[1], []byte{0})
120+
}
121+
122+
func (p *FDDemultiplexer) Close() error {
123+
p.mutex.Lock()
124+
p.closed.Store(true)
125+
p.mutex.Unlock()
126+
127+
p.cancel()
128+
p.wakeup()
129+
p.wg.Wait()
130+
131+
p.mutex.Lock()
132+
defer p.mutex.Unlock()
133+
134+
if p.kqFD != -1 {
135+
unix.Close(p.kqFD)
136+
p.kqFD = -1
137+
}
138+
if p.pipeFDs[0] != -1 {
139+
unix.Close(p.pipeFDs[0])
140+
unix.Close(p.pipeFDs[1])
141+
p.pipeFDs[0] = -1
142+
p.pipeFDs[1] = -1
143+
}
144+
return nil
145+
}
146+
147+
func (p *FDDemultiplexer) run() {
148+
defer p.wg.Done()
149+
150+
events := make([]unix.Kevent_t, 64)
151+
var buf [1]byte
152+
153+
for {
154+
select {
155+
case <-p.ctx.Done():
156+
p.mutex.Lock()
157+
p.running = false
158+
p.mutex.Unlock()
159+
return
160+
default:
161+
}
162+
163+
n, err := unix.Kevent(p.kqFD, nil, events, nil)
164+
if err != nil {
165+
if err == unix.EINTR {
166+
continue
167+
}
168+
p.mutex.Lock()
169+
p.running = false
170+
p.mutex.Unlock()
171+
return
172+
}
173+
174+
for i := 0; i < n; i++ {
175+
ev := events[i]
176+
fd := int(ev.Ident)
177+
178+
if fd == p.pipeFDs[0] {
179+
unix.Read(p.pipeFDs[0], buf[:])
180+
continue
181+
}
182+
183+
if ev.Flags&unix.EV_ERROR != 0 {
184+
continue
185+
}
186+
187+
p.mutex.Lock()
188+
entry, ok := p.entries[fd]
189+
if !ok {
190+
p.mutex.Unlock()
191+
continue
192+
}
193+
194+
unix.Kevent(p.kqFD, []unix.Kevent_t{{
195+
Ident: uint64(fd),
196+
Filter: unix.EVFILT_READ,
197+
Flags: unix.EV_DELETE,
198+
}}, nil, nil)
199+
delete(p.entries, fd)
200+
p.mutex.Unlock()
201+
202+
go entry.stream.runActiveLoop(nil)
203+
}
204+
205+
p.mutex.Lock()
206+
if len(p.entries) == 0 {
207+
p.running = false
208+
p.mutex.Unlock()
209+
return
210+
}
211+
p.mutex.Unlock()
212+
}
213+
}

0 commit comments

Comments
 (0)