Skip to content

Commit 3bacbe6

Browse files
committed
Add stream reactor
1 parent 8874f10 commit 3bacbe6

15 files changed

+1283
-366
lines changed
Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type channelDemuxEntry struct {
1414
stream *reactorStream
1515
}
1616

17-
type ChannelDemultiplexer struct {
17+
type ChannelPoller struct {
1818
ctx context.Context
1919
cancel context.CancelFunc
2020
mutex sync.Mutex
@@ -25,112 +25,112 @@ type ChannelDemultiplexer struct {
2525
wg sync.WaitGroup
2626
}
2727

28-
func NewChannelDemultiplexer(ctx context.Context) *ChannelDemultiplexer {
28+
func NewChannelPoller(ctx context.Context) *ChannelPoller {
2929
ctx, cancel := context.WithCancel(ctx)
30-
demux := &ChannelDemultiplexer{
30+
poller := &ChannelPoller{
3131
ctx: ctx,
3232
cancel: cancel,
3333
entries: make(map[<-chan *N.PacketBuffer]*channelDemuxEntry),
3434
updateChan: make(chan struct{}, 1),
3535
}
36-
return demux
36+
return poller
3737
}
3838

39-
func (d *ChannelDemultiplexer) Add(stream *reactorStream, channel <-chan *N.PacketBuffer) {
40-
d.mutex.Lock()
39+
func (p *ChannelPoller) Add(stream *reactorStream, channel <-chan *N.PacketBuffer) {
40+
p.mutex.Lock()
4141

42-
if d.closed.Load() {
43-
d.mutex.Unlock()
42+
if p.closed.Load() {
43+
p.mutex.Unlock()
4444
return
4545
}
4646

4747
entry := &channelDemuxEntry{
4848
channel: channel,
4949
stream: stream,
5050
}
51-
d.entries[channel] = entry
52-
if !d.running {
53-
d.running = true
54-
d.wg.Add(1)
55-
go d.run()
51+
p.entries[channel] = entry
52+
if !p.running {
53+
p.running = true
54+
p.wg.Add(1)
55+
go p.run()
5656
}
57-
d.mutex.Unlock()
58-
d.signalUpdate()
57+
p.mutex.Unlock()
58+
p.signalUpdate()
5959
}
6060

61-
func (d *ChannelDemultiplexer) Remove(channel <-chan *N.PacketBuffer) {
62-
d.mutex.Lock()
63-
delete(d.entries, channel)
64-
d.mutex.Unlock()
65-
d.signalUpdate()
61+
func (p *ChannelPoller) Remove(channel <-chan *N.PacketBuffer) {
62+
p.mutex.Lock()
63+
delete(p.entries, channel)
64+
p.mutex.Unlock()
65+
p.signalUpdate()
6666
}
6767

68-
func (d *ChannelDemultiplexer) signalUpdate() {
68+
func (p *ChannelPoller) signalUpdate() {
6969
select {
70-
case d.updateChan <- struct{}{}:
70+
case p.updateChan <- struct{}{}:
7171
default:
7272
}
7373
}
7474

75-
func (d *ChannelDemultiplexer) Close() error {
76-
d.mutex.Lock()
77-
d.closed.Store(true)
78-
d.mutex.Unlock()
75+
func (p *ChannelPoller) Close() error {
76+
p.mutex.Lock()
77+
p.closed.Store(true)
78+
p.mutex.Unlock()
7979

80-
d.cancel()
81-
d.signalUpdate()
82-
d.wg.Wait()
80+
p.cancel()
81+
p.signalUpdate()
82+
p.wg.Wait()
8383
return nil
8484
}
8585

86-
func (d *ChannelDemultiplexer) run() {
87-
defer d.wg.Done()
86+
func (p *ChannelPoller) run() {
87+
defer p.wg.Done()
8888

8989
for {
90-
d.mutex.Lock()
91-
if len(d.entries) == 0 {
92-
d.running = false
93-
d.mutex.Unlock()
90+
p.mutex.Lock()
91+
if len(p.entries) == 0 {
92+
p.running = false
93+
p.mutex.Unlock()
9494
return
9595
}
9696

97-
cases := make([]reflect.SelectCase, 0, len(d.entries)+2)
97+
cases := make([]reflect.SelectCase, 0, len(p.entries)+2)
9898

9999
cases = append(cases, reflect.SelectCase{
100100
Dir: reflect.SelectRecv,
101-
Chan: reflect.ValueOf(d.ctx.Done()),
101+
Chan: reflect.ValueOf(p.ctx.Done()),
102102
})
103103

104104
cases = append(cases, reflect.SelectCase{
105105
Dir: reflect.SelectRecv,
106-
Chan: reflect.ValueOf(d.updateChan),
106+
Chan: reflect.ValueOf(p.updateChan),
107107
})
108108

109-
entryList := make([]*channelDemuxEntry, 0, len(d.entries))
110-
for _, entry := range d.entries {
109+
entryList := make([]*channelDemuxEntry, 0, len(p.entries))
110+
for _, entry := range p.entries {
111111
cases = append(cases, reflect.SelectCase{
112112
Dir: reflect.SelectRecv,
113113
Chan: reflect.ValueOf(entry.channel),
114114
})
115115
entryList = append(entryList, entry)
116116
}
117-
d.mutex.Unlock()
117+
p.mutex.Unlock()
118118

119119
chosen, recv, recvOK := reflect.Select(cases)
120120

121121
switch chosen {
122122
case 0:
123-
d.mutex.Lock()
124-
d.running = false
125-
d.mutex.Unlock()
123+
p.mutex.Lock()
124+
p.running = false
125+
p.mutex.Unlock()
126126
return
127127
case 1:
128128
continue
129129
default:
130130
entry := entryList[chosen-2]
131-
d.mutex.Lock()
132-
delete(d.entries, entry.channel)
133-
d.mutex.Unlock()
131+
p.mutex.Lock()
132+
delete(p.entries, entry.channel)
133+
p.mutex.Unlock()
134134

135135
if recvOK {
136136
packet := recv.Interface().(*N.PacketBuffer)

common/bufio/fd_demux_stub.go

Lines changed: 0 additions & 25 deletions
This file was deleted.

common/bufio/fd_handler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package bufio
2+
3+
// FDHandler is the interface for handling FD ready events
4+
// Implemented by both reactorStream (UDP) and streamDirection (TCP)
5+
type FDHandler interface {
6+
// HandleFDEvent is called when the FD has data ready to read
7+
// The handler should start processing data in a new goroutine
8+
HandleFDEvent()
9+
}

0 commit comments

Comments
 (0)