Skip to content

Commit 122a1b8

Browse files
authored
feat: implement Engine.DupListener for multiple listeners (#701)
Fixes #699
1 parent dc150c1 commit 122a1b8

File tree

4 files changed

+177
-121
lines changed

4 files changed

+177
-121
lines changed

client_test.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"time"
1616

1717
"github.com/stretchr/testify/assert"
18-
"github.com/stretchr/testify/require"
1918
"go.uber.org/zap"
2019

2120
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
@@ -39,7 +38,12 @@ type clientEvents struct {
3938

4039
func (ev *clientEvents) OnBoot(e Engine) Action {
4140
fd, err := e.Dup()
42-
require.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
41+
assert.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
42+
errorx.ErrUnsupportedOp, err)
43+
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
44+
45+
fd, err = e.DupListener("tcp", "abc")
46+
assert.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
4347
errorx.ErrUnsupportedOp, err)
4448
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
4549
return None
@@ -84,7 +88,12 @@ func (ev *clientEvents) OnTick() (delay time.Duration, action Action) {
8488

8589
func (ev *clientEvents) OnShutdown(e Engine) {
8690
fd, err := e.Dup()
87-
require.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
91+
assert.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
92+
errorx.ErrUnsupportedOp, err)
93+
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
94+
95+
fd, err = e.DupListener("tcp", "abc")
96+
assert.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
8897
errorx.ErrUnsupportedOp, err)
8998
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
9099
}
@@ -378,8 +387,8 @@ func (s *testClient) OnBoot(eng Engine) (action Action) {
378387
func (s *testClient) OnOpen(c Conn) (out []byte, action Action) {
379388
c.SetContext(&sync.Once{})
380389
atomic.AddInt32(&s.connected, 1)
381-
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
382-
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
390+
assert.NotNil(s.tester, c.LocalAddr(), "nil local addr")
391+
assert.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
383392
return
384393
}
385394

@@ -388,7 +397,7 @@ func (s *testClient) OnClose(c Conn, err error) (action Action) {
388397
logging.Debugf("error occurred on closed, %v\n", err)
389398
}
390399
if s.network != "udp" {
391-
require.IsType(s.tester, c.Context(), new(sync.Once), "invalid context")
400+
assert.IsType(s.tester, c.Context(), new(sync.Once), "invalid context")
392401
}
393402

394403
atomic.AddInt32(&s.disconnected, 1)
@@ -403,17 +412,17 @@ func (s *testClient) OnClose(c Conn, err error) (action Action) {
403412

404413
func (s *testClient) OnShutdown(Engine) {
405414
if s.network == "udp" {
406-
require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader))
415+
assert.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader))
407416
}
408417
}
409418

410419
func (s *testClient) OnTraffic(c Conn) (action Action) {
411420
readHeader := func() {
412421
ping := make([]byte, len(pingMsg))
413422
n, err := io.ReadFull(c, ping)
414-
require.NoError(s.tester, err)
415-
require.EqualValues(s.tester, len(pingMsg), n)
416-
require.Equal(s.tester, string(pingMsg), string(ping), "bad header")
423+
assert.NoError(s.tester, err)
424+
assert.EqualValues(s.tester, len(pingMsg), n)
425+
assert.Equal(s.tester, string(pingMsg), string(ping), "bad header")
417426
}
418427
v := c.Context()
419428
if v != nil {
@@ -438,7 +447,7 @@ func (s *testClient) OnTraffic(c Conn) (action Action) {
438447
func() {
439448
if buf.Len() > 0 {
440449
err := c.AsyncWrite(buf.Bytes(), nil)
441-
require.NoError(s.tester, err)
450+
assert.NoError(s.tester, err)
442451
}
443452
})
444453
return
@@ -451,8 +460,8 @@ func (s *testClient) OnTraffic(c Conn) (action Action) {
451460
}
452461
if len(buf) > 0 {
453462
n, err := c.Write(buf)
454-
require.NoError(s.tester, err)
455-
require.EqualValues(s.tester, len(buf), n)
463+
assert.NoError(s.tester, err)
464+
assert.EqualValues(s.tester, len(buf), n)
456465
}
457466
return
458467
}
@@ -527,15 +536,15 @@ func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore,
527536
if netDial {
528537
var netConn net.Conn
529538
netConn, err = stdDial(network, addr)
530-
require.NoError(t, err)
539+
assert.NoError(t, err)
531540
c, err = cli.EnrollContext(netConn, handler)
532541
} else {
533542
c, err = cli.DialContext(network, addr, handler)
534543
}
535-
require.NoError(t, err)
544+
assert.NoError(t, err)
536545
defer c.Close() //nolint:errcheck
537546
err = c.Wake(nil)
538-
require.NoError(t, err)
547+
assert.NoError(t, err)
539548
rspCh := handler.rspCh
540549
duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2
541550
logging.Debugf("test duration: %v", duration)
@@ -546,14 +555,14 @@ func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore,
546555
reqData = reqData[:datagramLen]
547556
}
548557
_, err = crand.Read(reqData)
549-
require.NoError(t, err)
558+
assert.NoError(t, err)
550559
err = c.AsyncWrite(reqData, nil)
551-
require.NoError(t, err)
560+
assert.NoError(t, err)
552561
respData := <-rspCh
553-
require.NoError(t, err)
562+
assert.NoError(t, err)
554563
if !async {
555-
// require.Equalf(t, reqData, respData, "response mismatch with protocol:%s, multi-core:%t, content of bytes: %d vs %d", network, multicore, string(reqData), string(respData))
556-
require.Equalf(
564+
// assert.Equalf(t, reqData, respData, "response mismatch with protocol:%s, multi-core:%t, content of bytes: %d vs %d", network, multicore, string(reqData), string(respData))
565+
assert.Equalf(
557566
t,
558567
reqData,
559568
respData,

gnet.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,40 @@ func (e Engine) CountConnections() (count int) {
8080
// Dup returns a copy of the underlying file descriptor of listener.
8181
// It is the caller's responsibility to close dupFD when finished.
8282
// Closing listener does not affect dupFD, and closing dupFD does not affect listener.
83+
//
84+
// Note that this method is only available when the engine has only one listener.
8385
func (e Engine) Dup() (fd int, err error) {
8486
if err := e.Validate(); err != nil {
8587
return -1, err
8688
}
89+
8790
if len(e.eng.listeners) > 1 {
8891
return -1, errors.ErrUnsupportedOp
8992
}
93+
9094
for _, ln := range e.eng.listeners {
9195
fd, err = ln.dup()
9296
}
97+
9398
return
9499
}
95100

101+
// DupListener is like Dup, but it duplicates the listener with the given network and address.
102+
// This is useful when there are multiple listeners.
103+
func (e Engine) DupListener(network, addr string) (int, error) {
104+
if err := e.Validate(); err != nil {
105+
return -1, err
106+
}
107+
108+
for _, ln := range e.eng.listeners {
109+
if ln.network == network && ln.address == addr {
110+
return ln.dup()
111+
}
112+
}
113+
114+
return -1, errors.ErrInvalidNetworkAddress
115+
}
116+
96117
// Stop gracefully shuts down this Engine without interrupting any active event-loops,
97118
// it waits indefinitely for connections and event-loops to be closed and then shuts down.
98119
func (e Engine) Stop(ctx context.Context) error {

0 commit comments

Comments
 (0)