Skip to content

Commit 4657bec

Browse files
authored
feat: add EventLoop interface and implementation (#700)
Fixes #689 Fixes #703
1 parent 122a1b8 commit 4657bec

16 files changed

+1234
-168
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ English | [中文](README_ZH.md)
4949
- [x] Running on `Linux`, `macOS`, `Windows`, and *BSD: `Darwin`/`DragonFlyBSD`/`FreeBSD`/`NetBSD`/`OpenBSD`
5050
- [x] **Edge-triggered** I/O support
5151
- [x] Multiple network addresses binding
52+
- [x] Support registering new connections to event-loops
5253

5354
## 🕊 Roadmap
5455

README_ZH.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
- [x] 支持 `Linux`, `macOS`, `Windows`*BSD 操作系统: `Darwin`/`DragonFlyBSD`/`FreeBSD`/`NetBSD`/`OpenBSD`
5050
- [x] **Edge-triggered** I/O 支持
5151
- [x] 多网络地址绑定
52+
- [x] 支持注册新的连接到事件循环
5253

5354
## 🕊 未来计划
5455

acceptor_windows.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"runtime"
2121

2222
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
23+
"github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
2324
)
2425

2526
func (eng *engine) listenStream(ln net.Listener) (err error) {
@@ -43,9 +44,9 @@ func (eng *engine) listenStream(ln net.Listener) (err error) {
4344
return
4445
}
4546
el := eng.eventLoops.next(tc.RemoteAddr())
46-
c := newTCPConn(tc, el)
47+
c := newTCPConn(el, tc, nil)
4748
el.ch <- &openConn{c: c}
48-
go func(c *conn, tc net.Conn, el *eventloop) {
49+
goroutine.DefaultWorkerPool.Submit(func() {
4950
var buffer [0x10000]byte
5051
for {
5152
n, err := tc.Read(buffer[:])
@@ -55,7 +56,7 @@ func (eng *engine) listenStream(ln net.Listener) (err error) {
5556
}
5657
el.ch <- packTCPConn(c, buffer[:n])
5758
}
58-
}(c, tc, el)
59+
})
5960
}
6061
}
6162

@@ -81,7 +82,7 @@ func (eng *engine) ListenUDP(pc net.PacketConn) (err error) {
8182
return
8283
}
8384
el := eng.eventLoops.next(addr)
84-
c := newUDPConn(el, pc, pc.LocalAddr(), addr)
85+
c := newUDPConn(el, pc, nil, pc.LocalAddr(), addr, nil)
8586
el.ch <- packUDPConn(c, buffer[:n])
8687
}
8788
}

client_test.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,6 @@ type testClient struct {
375375
connected int32
376376
clientActive int32
377377
disconnected int32
378-
workerPool *goPool.Pool
379378
udpReadHeader int32
380379
}
381380

@@ -404,7 +403,6 @@ func (s *testClient) OnClose(c Conn, err error) (action Action) {
404403
if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) &&
405404
atomic.LoadInt32(&s.disconnected) == int32(s.nclients) {
406405
action = Shutdown
407-
s.workerPool.Release()
408406
}
409407

410408
return
@@ -431,7 +429,8 @@ func (s *testClient) OnTraffic(c Conn) (action Action) {
431429

432430
if s.async {
433431
buf := bbPool.Get()
434-
_, _ = c.WriteTo(buf)
432+
_, err := c.WriteTo(buf)
433+
assert.NoError(s.tester, err, "WriteTo error")
435434

436435
if s.network == "tcp" || s.network == "unix" {
437436
// just for test
@@ -443,17 +442,19 @@ func (s *testClient) OnTraffic(c Conn) (action Action) {
443442
atomic.AddInt32(&s.udpReadHeader, 1)
444443
buf.Reset()
445444
}
446-
_ = s.workerPool.Submit(
445+
err = goPool.DefaultWorkerPool.Submit(
447446
func() {
448447
if buf.Len() > 0 {
449448
err := c.AsyncWrite(buf.Bytes(), nil)
450449
assert.NoError(s.tester, err)
451450
}
452451
})
452+
assert.NoError(s.tester, err)
453453
return
454454
}
455455

456-
buf, _ := c.Next(-1)
456+
buf, err := c.Next(-1)
457+
assert.NoError(s.tester, err, "Reading data error")
457458
if v == nil && bytes.Equal(buf, pingMsg) {
458459
atomic.AddInt32(&s.udpReadHeader, 1)
459460
buf = nil
@@ -475,7 +476,12 @@ func (s *testClient) OnTick() (delay time.Duration, action Action) {
475476
if i%2 == 0 {
476477
netConn = true
477478
}
478-
go startGnetClient(s.tester, s.client, s.network, s.addr, s.multicore, s.async, netConn)
479+
err := goPool.DefaultWorkerPool.Submit(
480+
func() {
481+
startGnetClient(s.tester, s.client, s.network, s.addr, s.multicore, s.async, netConn)
482+
})
483+
assert.NoError(s.tester, err)
484+
479485
}
480486
}
481487
if s.network == "udp" && atomic.LoadInt32(&s.clientActive) == 0 {
@@ -487,13 +493,12 @@ func (s *testClient) OnTick() (delay time.Duration, action Action) {
487493

488494
func runClient(t *testing.T, network, addr string, conf *testConf) {
489495
ts := &testClient{
490-
tester: t,
491-
network: network,
492-
addr: addr,
493-
multicore: conf.multicore,
494-
async: conf.async,
495-
nclients: conf.clients,
496-
workerPool: goPool.Default(),
496+
tester: t,
497+
network: network,
498+
addr: addr,
499+
multicore: conf.multicore,
500+
async: conf.async,
501+
nclients: conf.clients,
497502
}
498503
var err error
499504
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
@@ -646,7 +651,10 @@ func (ev *serverEventsForWake) OnClose(_ Conn, _ error) Action {
646651

647652
func (ev *serverEventsForWake) OnTick() (time.Duration, Action) {
648653
if atomic.CompareAndSwapInt32(&ev.started, 0, 1) {
649-
go testConnWakeImmediately(ev.tester, ev.client, ev.clientEV, ev.network, ev.addr)
654+
err := goPool.DefaultWorkerPool.Submit(func() {
655+
testConnWakeImmediately(ev.tester, ev.client, ev.clientEV, ev.network, ev.addr)
656+
})
657+
assert.NoError(ev.tester, err)
650658
}
651659
return 100 * time.Millisecond, None
652660
}
@@ -699,15 +707,19 @@ func TestClientReadOnEOF(t *testing.T) {
699707
assert.NoError(t, err)
700708
defer ln.Close() //nolint:errcheck
701709

702-
go func() {
710+
err = goPool.DefaultWorkerPool.Submit(func() {
703711
for {
704712
conn, err := ln.Accept()
705713
if err != nil {
706714
break
707715
}
708-
go process(conn)
716+
err = goPool.DefaultWorkerPool.Submit(func() {
717+
process(conn)
718+
})
719+
assert.NoError(t, err)
709720
}
710-
}()
721+
})
722+
assert.NoError(t, err)
711723

712724
ev := &clientReadOnEOF{
713725
result: make(chan struct {

client_unix.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
242242
}}
243243
err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
244244
if err != nil {
245-
_ = gc.Close()
245+
gc.Close() //nolint:errcheck
246246
return nil, err
247247
}
248248

client_windows.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
2727
"github.com/panjf2000/gnet/v2/pkg/logging"
28+
"github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
2829
)
2930

3031
type Client struct {
@@ -161,61 +162,56 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx any) (gc Conn, err error) {
161162
}
162163
}
163164

164-
c := newTCPConn(nc, cli.el)
165-
c.SetContext(ctx)
165+
c := newTCPConn(cli.el, nc, ctx)
166166
cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }}
167-
go func(c *conn, tc net.Conn, el *eventloop) {
167+
goroutine.DefaultWorkerPool.Submit(func() {
168168
var buffer [0x10000]byte
169169
for {
170-
n, err := tc.Read(buffer[:])
170+
n, err := nc.Read(buffer[:])
171171
if err != nil {
172-
el.ch <- &netErr{c, err}
172+
cli.el.ch <- &netErr{c, err}
173173
return
174174
}
175-
el.ch <- packTCPConn(c, buffer[:n])
175+
cli.el.ch <- packTCPConn(c, buffer[:n])
176176
}
177-
}(c, nc, cli.el)
177+
})
178178
gc = c
179179
case *net.UnixConn:
180-
c := newTCPConn(nc, cli.el)
181-
c.SetContext(ctx)
180+
c := newTCPConn(cli.el, nc, ctx)
182181
cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }}
183-
go func(c *conn, uc net.Conn, el *eventloop) {
182+
goroutine.DefaultWorkerPool.Submit(func() {
184183
var buffer [0x10000]byte
185184
for {
186-
n, err := uc.Read(buffer[:])
185+
n, err := nc.Read(buffer[:])
187186
if err != nil {
188-
el.ch <- &netErr{c, err}
187+
cli.el.ch <- &netErr{c, err}
189188
mu.RLock()
190-
tmpDir := unixAddrDirs[uc.LocalAddr().String()]
189+
tmpDir := unixAddrDirs[nc.LocalAddr().String()]
191190
mu.RUnlock()
192191
if err := os.RemoveAll(tmpDir); err != nil {
193192
logging.Errorf("failed to remove temporary directory for unix local address: %v", err)
194193
}
195194
return
196195
}
197-
el.ch <- packTCPConn(c, buffer[:n])
196+
cli.el.ch <- packTCPConn(c, buffer[:n])
198197
}
199-
}(c, nc, cli.el)
198+
})
200199
gc = c
201200
case *net.UDPConn:
202-
c := newUDPConn(cli.el, nil, nc.LocalAddr(), nc.RemoteAddr())
203-
c.SetContext(ctx)
204-
c.rawConn = nc
201+
c := newUDPConn(cli.el, nil, nc, nc.LocalAddr(), nc.RemoteAddr(), ctx)
205202
cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }}
206-
go func(uc net.Conn, el *eventloop) {
203+
goroutine.DefaultWorkerPool.Submit(func() {
207204
var buffer [0x10000]byte
208205
for {
209-
n, err := uc.Read(buffer[:])
206+
n, err := nc.Read(buffer[:])
210207
if err != nil {
208+
cli.el.ch <- &netErr{c, err}
211209
return
212210
}
213-
c := newUDPConn(cli.el, nil, uc.LocalAddr(), uc.RemoteAddr())
214-
c.SetContext(ctx)
215-
c.rawConn = uc
216-
el.ch <- packUDPConn(c, buffer[:n])
211+
c := newUDPConn(cli.el, nil, nc, nc.LocalAddr(), nc.RemoteAddr(), ctx)
212+
cli.el.ch <- packUDPConn(c, buffer[:n])
217213
}
218-
}(nc, cli.el)
214+
})
219215
gc = c
220216
default:
221217
return nil, errorx.ErrUnsupportedProtocol

connection_unix.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,10 @@ func (c *conn) Close() error {
519519
}, nil)
520520
}
521521

522+
func (c *conn) EventLoop() EventLoop {
523+
return c.loop
524+
}
525+
522526
func (*conn) SetDeadline(_ time.Time) error {
523527
return errorx.ErrUnsupportedOp
524528
}

connection_windows.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@ import (
2121
"syscall"
2222
"time"
2323

24-
"github.com/panjf2000/ants/v2"
2524
"golang.org/x/sys/windows"
2625

2726
"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
2827
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
2928
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
3029
bsPool "github.com/panjf2000/gnet/v2/pkg/pool/byteslice"
31-
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
30+
"github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
3231
)
3332

3433
type netErr struct {
@@ -83,8 +82,9 @@ func packUDPConn(c *conn, buf []byte) *udpConn {
8382
return &udpConn{c}
8483
}
8584

86-
func newTCPConn(nc net.Conn, el *eventloop) (c *conn) {
85+
func newTCPConn(el *eventloop, nc net.Conn, ctx any) (c *conn) {
8786
return &conn{
87+
ctx: ctx,
8888
loop: el,
8989
buffer: bbPool.Get(),
9090
rawConn: nc,
@@ -105,9 +105,11 @@ func (c *conn) release() {
105105
c.buffer = nil
106106
}
107107

108-
func newUDPConn(el *eventloop, pc net.PacketConn, localAddr, remoteAddr net.Addr) *conn {
108+
func newUDPConn(el *eventloop, pc net.PacketConn, rc net.Conn, localAddr, remoteAddr net.Addr, ctx any) *conn {
109109
return &conn{
110+
ctx: ctx,
110111
pc: pc,
112+
rawConn: rc,
111113
loop: el,
112114
buffer: bbPool.Get(),
113115
localAddr: localAddr,
@@ -419,19 +421,6 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
419421
return nil
420422
}
421423

422-
type nonBlockingPool struct {
423-
*goPool.Pool
424-
}
425-
426-
func (np *nonBlockingPool) Go(task func()) (err error) {
427-
if err = np.Submit(task); err == ants.ErrPoolOverload {
428-
go task()
429-
}
430-
return
431-
}
432-
433-
var workerPool = nonBlockingPool{Pool: goPool.Default()}
434-
435424
// Gfd return an uninitialized GFD which is not valid,
436425
// this method is only implemented for compatibility, don't use it on Windows.
437426
// func (c *conn) Gfd() gfd.GFD { return gfd.GFD{} }
@@ -450,7 +439,7 @@ func (c *conn) AsyncWrite(buf []byte, cb AsyncCallback) error {
450439
case c.loop.ch <- fn:
451440
default:
452441
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
453-
err = workerPool.Go(func() {
442+
err = goroutine.DefaultWorkerPool.Submit(func() {
454443
c.loop.ch <- fn
455444
})
456445
}
@@ -489,7 +478,7 @@ func (c *conn) Wake(cb AsyncCallback) (err error) {
489478
case c.loop.ch <- wakeFn:
490479
default:
491480
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
492-
err = workerPool.Go(func() {
481+
err = goroutine.DefaultWorkerPool.Submit(func() {
493482
c.loop.ch <- wakeFn
494483
})
495484
}
@@ -506,7 +495,7 @@ func (c *conn) Close() (err error) {
506495
case c.loop.ch <- closeFn:
507496
default:
508497
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
509-
err = workerPool.Go(func() {
498+
err = goroutine.DefaultWorkerPool.Submit(func() {
510499
c.loop.ch <- closeFn
511500
})
512501
}
@@ -527,14 +516,18 @@ func (c *conn) CloseWithCallback(cb AsyncCallback) (err error) {
527516
case c.loop.ch <- closeFn:
528517
default:
529518
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
530-
err = workerPool.Go(func() {
519+
err = goroutine.DefaultWorkerPool.Submit(func() {
531520
c.loop.ch <- closeFn
532521
})
533522
}
534523

535524
return
536525
}
537526

527+
func (c *conn) EventLoop() EventLoop {
528+
return c.loop
529+
}
530+
538531
func (*conn) SetDeadline(_ time.Time) error {
539532
return errorx.ErrUnsupportedOp
540533
}

0 commit comments

Comments
 (0)