Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Setup and run golangci-lint
uses: golangci/golangci-lint-action@v7
with:
version: v2.1.5
version: v2.1.6
args: -v -E gocritic -E misspell -E revive -E godot --timeout 5m
test:
needs: lint
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_gc_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Setup and run golangci-lint
uses: golangci/golangci-lint-action@v7
with:
version: v2.1.5
version: v2.1.6
args: -v -E gocritic -E misspell -E revive -E godot --timeout 5m
test:
needs: lint
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_poll_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Setup and run golangci-lint
uses: golangci/golangci-lint-action@v7
with:
version: v2.1.5
version: v2.1.6
args: -v -E gocritic -E misspell -E revive -E godot
test:
needs: lint
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_poll_opt_gc_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Setup and run golangci-lint
uses: golangci/golangci-lint-action@v7
with:
version: v2.1.5
version: v2.1.6
args: -v -E gocritic -E misspell -E revive -E godot
test:
needs: lint
Expand Down
42 changes: 32 additions & 10 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package gnet

import (
"time"
"runtime"

"golang.org/x/sys/unix"

Expand Down Expand Up @@ -45,15 +45,26 @@ func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive.Seconds()))
if err != nil {
network := el.listeners[fd].network
if opts := el.engine.opts; opts.TCPKeepAlive > 0 && network == "tcp" &&
(runtime.GOOS != "linux" && runtime.GOOS != "freebsd" && runtime.GOOS != "dragonfly") {
// TCP keepalive options are not inherited from the listening socket
// on platforms other than Linux, FreeBSD, or DragonFlyBSD.
// We therefore need to set them on the accepted socket explicitly.
//
// Check out https://github.com/nginx/nginx/pull/337 for details.
if err = setKeepAlive(
nfd,
true,
opts.TCPKeepAlive,
opts.TCPKeepInterval,
opts.TCPKeepCount); err != nil {
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
}
}

el := el.engine.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
c := newStreamConn(network, nfd, el, sa, el.listeners[fd].addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
el.getLogger().Errorf("failed to enqueue the accepted socket fd=%d to poller: %v", c.fd, err)
Expand All @@ -64,7 +75,8 @@ func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
}

func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
if el.listeners[fd].network == "udp" {
network := el.listeners[fd].network
if network == "udp" {
return el.readUDP(fd, ev, flags)
}

Expand All @@ -82,13 +94,23 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) e
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive/time.Second))
if err != nil {
if opts := el.engine.opts; opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" &&
(runtime.GOOS != "linux" && runtime.GOOS != "freebsd" && runtime.GOOS != "dragonfly") {
// TCP keepalive options are not inherited from the listening socket
// on platforms other than Linux, FreeBSD, or DragonFlyBSD.
// We therefore need to set them on the accepted socket explicitly.
//
// Check out https://github.com/nginx/nginx/pull/337 for details.
if err = setKeepAlive(
nfd,
true,
opts.TCPKeepAlive,
opts.TCPKeepInterval,
opts.TCPKeepCount); err != nil {
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
}
}

c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
c := newStreamConn(network, nfd, el, sa, el.listeners[fd].addr, remoteAddr)
return el.register0(c)
}
2 changes: 1 addition & 1 deletion acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (eng *engine) listenStream(ln net.Listener) (err error) {
return
}
el := eng.eventLoops.next(tc.RemoteAddr())
c := newTCPConn(el, tc, nil)
c := newStreamConn(el, tc, nil)
el.ch <- &openConn{c: c}
goroutine.DefaultWorkerPool.Submit(func() {
var buffer [0x10000]byte
Expand Down
17 changes: 10 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ type connHandler struct {

type clientEvents struct {
*BuiltinEventEngine
tester *testing.T
svr *testClient
packetLen int
tester *testing.T
svr *testClient
}

func (ev *clientEvents) OnBoot(e Engine) Action {
Expand Down Expand Up @@ -67,13 +66,14 @@ func (ev *clientEvents) OnClose(Conn, error) Action {

func (ev *clientEvents) OnTraffic(c Conn) (action Action) {
handler := c.Context().(*connHandler)
packetLen := streamLen
if handler.network == "udp" {
ev.packetLen = datagramLen
packetLen = datagramLen
}
buf, err := c.Next(-1)
assert.NoError(ev.tester, err)
handler.data = append(handler.data, buf...)
if len(handler.data) < ev.packetLen {
if len(handler.data) < packetLen {
return
}
handler.rspCh <- handler.data
Expand Down Expand Up @@ -501,9 +501,10 @@ func runClient(t *testing.T, network, addr string, conf *testConf) {
nclients: conf.clients,
}
var err error
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
clientEV := &clientEvents{tester: t, svr: ts}
ts.client, err = NewClient(
clientEV,
WithMulticore(conf.multicore),
WithEdgeTriggeredIO(conf.et),
WithEdgeTriggeredIOChunk(conf.etChunk),
WithTCPNoDelay(TCPNoDelay),
Expand All @@ -524,7 +525,9 @@ func runClient(t *testing.T, network, addr string, conf *testConf) {
WithMulticore(conf.multicore),
WithReusePort(conf.reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithTCPKeepAlive(time.Minute),
WithTCPKeepInterval(time.Second*10),
WithTCPKeepCount(10),
WithLoadBalancing(conf.lb))
assert.NoError(t, err)
}
Expand Down
112 changes: 76 additions & 36 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// Client of gnet.
type Client struct {
opts *Options
el *eventloop
eng *engine
}

// NewClient creates an instance of Client.
Expand All @@ -59,28 +59,19 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
}
logging.SetDefaultLoggerAndFlusher(logger, logFlusher)

var p *netpoll.Poller
if p, err = netpoll.OpenPoller(); err != nil {
return
}

rootCtx, shutdown := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(rootCtx)
eng := engine{
listeners: make(map[int]*listener),
opts: options,
turnOff: shutdown,
eventHandler: eh,
eventLoops: new(leastConnectionsLoadBalancer),
concurrency: struct {
*errgroup.Group
ctx context.Context
}{eg, ctx},
}
el := eventloop{
listeners: eng.listeners,
engine: &eng,
poller: p,
}

if options.EdgeTriggeredIOChunk > 0 {
options.EdgeTriggeredIO = true
Expand All @@ -107,39 +98,82 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
default:
options.WriteBufferCap = math.CeilToPowerOfTwo(wbc)
}

el.buffer = make([]byte, options.ReadBufferCap)
el.connections.init()
el.eventHandler = eh
cli.el = &el
cli.eng = &eng
return
}

// Start starts the client event-loop, handing IO events.
func (cli *Client) Start() error {
logging.Infof("Starting gnet client with 1 event-loop")
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})
cli.el.engine.concurrency.Go(cli.el.run)
numEventLoop := determineEventLoops(cli.opts)
logging.Infof("Starting gnet client with %d event loops", numEventLoop)

cli.eng.eventHandler.OnBoot(Engine{cli.eng})

var el0 *eventloop
for i := 0; i < numEventLoop; i++ {
p, err := netpoll.OpenPoller()
if err != nil {
cli.eng.closeEventLoops()
return err
}
el := eventloop{
listeners: cli.eng.listeners,
engine: cli.eng,
poller: p,
buffer: make([]byte, cli.opts.ReadBufferCap),
eventHandler: cli.eng.eventHandler,
}
el.connections.init()
cli.eng.eventLoops.register(&el)
if cli.opts.Ticker && el.idx == 0 {
el0 = &el
}
}

cli.eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
cli.eng.concurrency.Go(el.run)
return true
})

// Start the ticker.
if cli.opts.Ticker {
ctx := cli.el.engine.concurrency.ctx
cli.el.engine.concurrency.Go(func() error {
cli.el.ticker(ctx)
if el0 != nil {
ctx := cli.eng.concurrency.ctx
cli.eng.concurrency.Go(func() error {
el0.ticker(ctx)
return nil
})
}

logging.Debugf("default logging level is %s", logging.LogLevel())

return nil
}

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
err = cli.el.engine.concurrency.Wait()
logging.Error(cli.el.poller.Close())
cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
func (cli *Client) Stop() error {
cli.eng.shutdown(nil)

cli.eng.eventHandler.OnShutdown(Engine{cli.eng})

// Notify all event-loops to exit.
cli.eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
logging.Error(el.poller.Trigger(queue.HighPriority,
func(_ any) error { return errorx.ErrEngineShutdown }, nil))
return true
})

// Wait for all event-loops to exit.
err := cli.eng.concurrency.Wait()

cli.eng.closeEventLoops()

// Put the engine into the shutdown state.
cli.eng.inShutdown.Store(true)

// Flush the logger.
logging.Cleanup()
return

return err
}

// Dial is like net.Dial().
Expand All @@ -156,7 +190,7 @@ func (cli *Client) DialContext(network, address string, ctx any) (Conn, error) {
return cli.EnrollContext(c, ctx)
}

// Enroll converts a net.Conn to gnet.Conn and then adds it into Client.
// Enroll converts a net.Conn to gnet.Conn and then adds it into the Client.
func (cli *Client) Enroll(c net.Conn) (Conn, error) {
return cli.EnrollContext(c, nil)
}
Expand Down Expand Up @@ -196,6 +230,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
}
}

el := cli.eng.eventLoops.next(nil)
var (
sockAddr unix.Sockaddr
gc *conn
Expand All @@ -208,29 +243,34 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
}
ua := c.LocalAddr().(*net.UnixAddr)
ua.Name = c.RemoteAddr().String() + "." + strconv.Itoa(dupFD)
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
gc = newStreamConn("unix", dupFD, el, sockAddr, c.LocalAddr(), c.RemoteAddr())
case *net.TCPConn:
if cli.opts.TCPNoDelay == TCPNoDelay {
if err = socket.SetNoDelay(dupFD, 1); err != nil {
return nil, err
}
}
if cli.opts.TCPKeepAlive > 0 {
if err = socket.SetKeepAlivePeriod(dupFD, int(cli.opts.TCPKeepAlive.Seconds())); err != nil {
if err = setKeepAlive(
dupFD,
true,
cli.opts.TCPKeepAlive,
cli.opts.TCPKeepInterval,
cli.opts.TCPKeepCount); err != nil {
return nil, err
}
}
sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
gc = newStreamConn("tcp", dupFD, el, sockAddr, c.LocalAddr(), c.RemoteAddr())
case *net.UDPConn:
sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
gc = newUDPConn(dupFD, cli.el, c.LocalAddr(), sockAddr, true)
gc = newUDPConn(dupFD, el, c.LocalAddr(), sockAddr, true)
default:
return nil, errorx.ErrUnsupportedProtocol
}
Expand All @@ -240,12 +280,12 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
ccb := &connWithCallback{c: gc, cb: func() {
close(connOpened)
}}
err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
err = el.poller.Trigger(queue.HighPriority, el.register, ccb)
if err != nil {
gc.Close() //nolint:errcheck
return nil, err
}

<-connOpened

return gc, nil
}
Loading
Loading