diff --git a/connector/connection.go b/connector/connection.go new file mode 100644 index 0000000..4cadaf7 --- /dev/null +++ b/connector/connection.go @@ -0,0 +1,287 @@ +package connector + +import ( + "errors" + "fmt" + "net" + "runtime" + "sync" + "syscall" + "time" + + "github.com/Allenxuxu/gev/connection" + "github.com/Allenxuxu/gev/eventloop" + "github.com/Allenxuxu/gev/log" + "github.com/Allenxuxu/gev/poller" + "github.com/RussellLuo/timingwheel" + reuseport "github.com/libp2p/go-reuseport" + "golang.org/x/net/context" + "golang.org/x/sys/unix" +) + +var ( + ErrDialTimeout = errors.New("i/o timeout") + ErrConnectionHandle = errors.New("cannot handle connection") + ErrInvalidArguments = errors.New("invalid arguments") +) + +type connectionSocketState uint8 + +const ( + connectingConnectionSocketState connectionSocketState = iota + 1 + connectedConnectionSocketState + disconnectedConnectionSocketState +) + +type Connection struct { + state connectionSocketState + stateMu sync.Mutex + + loop *eventloop.EventLoop + *connection.Connection + ctx context.Context + result chan error + fd int + protocol connection.Protocol + tw *timingwheel.TimingWheel + idleTime time.Duration + callBack connection.CallBack +} + +func newConnection( + ctx context.Context, + network, address string, + loop *eventloop.EventLoop, + protocol connection.Protocol, + tw *timingwheel.TimingWheel, + idleTime time.Duration, + callBack connection.CallBack) (*Connection, error) { + + connectResult := make(chan error) + + conn := &Connection{ + state: connectingConnectionSocketState, + loop: loop, + ctx: ctx, + result: connectResult, + protocol: protocol, + tw: tw, + idleTime: idleTime, + callBack: callBack, + } + + fd, err := unixOpenConnect(network, address) + conn.fd = fd + switch err { + case unix.EINPROGRESS, unix.EALREADY, unix.EINTR: + conn.state = connectingConnectionSocketState + loop.QueueInLoop(func() { + if err := loop.AddSocketAndEnableRead(fd, conn); err != nil { + log.Info("addSocketAndEnableRead error:", err) + } + if err := loop.EnableReadWrite(fd); err != nil { + log.Info("EnableReadWrite error: ", err) + } + }) + case nil, syscall.EISCONN: + runtime.KeepAlive(fd) + conn.state = connectedConnectionSocketState + if err := checkConn(fd); err != nil { + conn.closeUnconnected() + return nil, fmt.Errorf("checkConn err: %v", err) + } + + sa, err := unix.Getpeername(fd) + if err != nil { + conn.closeUnconnected() + return nil, fmt.Errorf("getPeerName err: %v", err) + } + + conn.Connection = connection.New(fd, loop, sa, protocol, tw, idleTime, callBack) + + loop.QueueInLoop(func() { + if err := loop.AddSocketAndEnableRead(fd, conn.Connection); err != nil { + log.Info("[AddSocketAndEnableRead] error: ", err) + return + } + }) + return conn, nil + default: + return nil, err + } + + defer close(connectResult) + + select { + case e := <-connectResult: + if e != nil { + return nil, e + } + + loop.QueueInLoop(func() { + if err := loop.AddSocketAndEnableRead(fd, conn.Connection); err != nil { + log.Info("[AddSocketAndEnableRead] error: ", err) + return + } + }) + return conn, nil + case <-ctx.Done(): + conn.stateMu.Lock() + defer conn.stateMu.Unlock() + + switch conn.state { + case connectingConnectionSocketState: + conn.state = disconnectedConnectionSocketState + conn.closeUnconnected() + return nil, ErrDialTimeout + case connectedConnectionSocketState: + loop.QueueInLoop(func() { + if err := loop.AddSocketAndEnableRead(fd, conn.Connection); err != nil { + log.Info("[AddSocketAndEnableRead] error: ", err) + return + } + }) + return conn, nil + default: + return nil, ErrDialTimeout + } + } +} + +func parseError(errorNo unix.Errno) error { + switch errorNo { + case unix.EINVAL: + return ErrInvalidArguments + default: + return errors.New(unix.ErrnoName(errorNo)) + } +} + +func (c *Connection) HandleEvent(fd int, events poller.Event) { + if c.state == connectingConnectionSocketState { + c.stateMu.Lock() + defer c.stateMu.Unlock() + + if c.state != connectingConnectionSocketState { + return + } + + if events&poller.EventErr != 0 { + c.state = disconnectedConnectionSocketState + c.closeUnconnected() + c.result <- ErrConnectionHandle + } else if events&poller.EventWrite != 0 { + if err := checkConn(fd); err != nil { + c.closeUnconnected() + c.result <- err + return + } + + sa, err := unix.Getpeername(fd) + if err != nil { + c.closeUnconnected() + c.result <- parseError(err.(unix.Errno)) + return + } + + c.Connection = connection.New(c.fd, c.loop, sa, c.protocol, c.tw, c.idleTime, c.callBack) + c.state = connectedConnectionSocketState + c.loop.DeleteFdInLoop(fd) + c.result <- nil + } else { + c.state = disconnectedConnectionSocketState + c.closeUnconnected() + + c.result <- fmt.Errorf("wrong_event %v", events) + } + } +} + +func (c *Connection) closeUnconnected() { + c.loop.DeleteFdInLoop(c.fd) + _ = unix.Close(c.fd) +} + +func (c *Connection) Close() error { + if err := c.Connection.Close(); err != nil { + return err + } + + return nil +} + +func checkConn(fd int) error { + nerr, err := unix.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_ERROR) + if err != nil { + return err + } + + unixError := unix.Errno(nerr) + if unixError != unix.Errno(0) { + return err + } + + return nil +} + +func unixOpenConnect(network, address string) (fd int, err error) { + defer func() { + if fd > 0 { + switch err { + case unix.EINPROGRESS, unix.EALREADY, unix.EINTR: + default: + _ = unix.Close(fd) + } + } + }() + + addr, err := reuseport.ResolveAddr(network, address) + if err != nil { + return + } + + var sa unix.Sockaddr + + // net dial.go + var domain, typ int + switch ra := addr.(type) { + case *net.TCPAddr: + domain = unix.AF_INET + typ = unix.SOCK_STREAM + ipaddr := ra.IP.To4() + if len(ipaddr) == net.IPv4len { + addr := &unix.SockaddrInet4{Port: ra.Port} + copy(addr.Addr[:], ipaddr) + sa = addr + } else if len(ipaddr) == net.IPv6len { + addr := &unix.SockaddrInet6{Port: ra.Port} + copy(addr.Addr[:], ipaddr) + sa = addr + } + case *net.UnixAddr: + domain = unix.AF_UNIX + typ = unix.SOCK_STREAM + sa = &unix.SockaddrUnix{Name: ra.Name} + + default: + return 0, errors.New("unsupported network/address type") + } + + fd, err = unix.Socket(domain, typ, unix.PROT_NONE) + if err != nil { + return + } + + if fd == 0 { + err = errors.New("wrong fd value") + return + } + + if err = unix.SetNonblock(fd, true); err != nil { + err = fmt.Errorf("SetNonblock error: %v", err) + return + } + + err = unix.Connect(fd, sa) + return +} diff --git a/connector/connector.go b/connector/connector.go new file mode 100644 index 0000000..3c65cd0 --- /dev/null +++ b/connector/connector.go @@ -0,0 +1,118 @@ +package connector + +import ( + "errors" + "runtime" + "time" + + "golang.org/x/net/context" + + "github.com/Allenxuxu/gev/connection" + "github.com/Allenxuxu/gev/eventloop" + "github.com/Allenxuxu/gev/log" + "github.com/Allenxuxu/toolkit/sync" + "github.com/Allenxuxu/toolkit/sync/atomic" + "github.com/RussellLuo/timingwheel" +) + +type Connector struct { + workLoops []*eventloop.EventLoop + opts *Options + timingWheel *timingwheel.TimingWheel + running atomic.Bool +} + +func (c *Connector) Dial(network, address string, callback connection.CallBack, protocol connection.Protocol, idleTime time.Duration) (*Connection, error) { + return c.DialWithTimeout(0, network, address, callback, protocol, idleTime) +} + +func (c *Connector) DialWithTimeout(timeout time.Duration, network, address string, callback connection.CallBack, protocol connection.Protocol, idleTime time.Duration) (*Connection, error) { + if callback == nil { + return nil, errors.New("callback is nil") + } + + if protocol == nil { + protocol = &connection.DefaultProtocol{} + } + + loop := c.opts.Strategy(c.workLoops) + + dialCtx := context.Background() + if timeout > 0 { + subCtx, cancel := context.WithDeadline(dialCtx, time.Now().Add(timeout)) + dialCtx = subCtx + defer cancel() + } + + for { + select { + case <-dialCtx.Done(): + return nil, ErrDialTimeout + default: + conn, err := newConnection(dialCtx, network, address, loop, protocol, c.timingWheel, idleTime, callback) + if err != nil { + time.Sleep(time.Millisecond * 100) + continue + } + + return conn, nil + } + + } +} + +func NewConnector(opts ...Option) (connector *Connector, err error) { + connector = new(Connector) + connector.opts = newOptions(opts...) + + connector.timingWheel = timingwheel.NewTimingWheel(connector.opts.tick, connector.opts.wheelSize) + if connector.opts.NumLoops <= 0 { + connector.opts.NumLoops = runtime.NumCPU() + } + + wloops := make([]*eventloop.EventLoop, connector.opts.NumLoops) + for i := 0; i < connector.opts.NumLoops; i++ { + l, err := eventloop.New() + if err != nil { + for j := 0; j < i; j++ { + _ = wloops[j].Stop() + } + return nil, err + } + wloops[i] = l + } + + connector.workLoops = wloops + return +} + +func (c *Connector) Start() { + sw := sync.WaitGroupWrapper{} + c.timingWheel.Start() + + length := len(c.workLoops) + for i := 0; i < length; i++ { + sw.AddAndRun(c.workLoops[i].RunLoop) + } + + c.running.Set(true) + sw.Wait() +} + +func (c *Connector) Stop() { + if c.running.Get() { + c.running.Set(false) + + c.timingWheel.Stop() + + for k := range c.workLoops { + if err := c.workLoops[k].Stop(); err != nil { + log.Error(err) + } + } + } +} + +func (c *Connector) Options() Options { + return *c.opts +} diff --git a/connector/connector_test.go b/connector/connector_test.go new file mode 100644 index 0000000..c74f83b --- /dev/null +++ b/connector/connector_test.go @@ -0,0 +1,42 @@ +package connector + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/Allenxuxu/gev/connection" +) + +var ( + dialer *Connector +) + +type exampleCallback struct { +} + +func (e exampleCallback) OnMessage(c *connection.Connection, ctx interface{}, data []byte) []byte { + panic("implement me") +} + +func (e exampleCallback) OnClose(c *connection.Connection) { + panic("implement ") +} + +func init() { + var err error + + if dialer, err = NewConnector(); err != nil { + panic(err) + } + + go dialer.Start() + time.Sleep(time.Second * 3) +} + +func TestConnection_ListenerNotExist(t *testing.T) { + cb := new(exampleCallback) + _, err := dialer.DialWithTimeout(time.Second*5, "tcp", "127.0.0.1:9001", cb, nil, 0) + assert.Equal(t, ErrDialTimeout, err) +} diff --git a/connector/options.go b/connector/options.go new file mode 100644 index 0000000..c85cb7b --- /dev/null +++ b/connector/options.go @@ -0,0 +1,61 @@ +package connector + +import ( + "time" + + "github.com/Allenxuxu/gev/eventloop" +) + +// Options 服务配置 +type Options struct { + NumLoops int + IdleTime time.Duration + Strategy eventloop.LoadBalanceStrategy + + tick time.Duration + wheelSize int64 +} + +// Option ... +type Option func(*Options) + +func newOptions(opt ...Option) *Options { + opts := Options{} + + for _, o := range opt { + o(&opts) + } + + if opts.tick == 0 { + opts.tick = 1 * time.Millisecond + } + if opts.wheelSize == 0 { + opts.wheelSize = 1000 + } + + if opts.Strategy == nil { + opts.Strategy = eventloop.RoundRobin() + } + + return &opts +} + +// NumLoops work eventloop 的数量 +func NumLoops(n int) Option { + return func(o *Options) { + o.NumLoops = n + } +} + +// IdleTime 最大空闲时间(秒) +func IdleTime(t time.Duration) Option { + return func(o *Options) { + o.IdleTime = t + } +} + +func LoadBalance(strategy eventloop.LoadBalanceStrategy) Option { + return func(o *Options) { + o.Strategy = strategy + } +} diff --git a/benchmark_load_balance_test.go b/eventloop/benchmark_load_balance_test.go similarity index 98% rename from benchmark_load_balance_test.go rename to eventloop/benchmark_load_balance_test.go index 018b7c4..153d94f 100644 --- a/benchmark_load_balance_test.go +++ b/eventloop/benchmark_load_balance_test.go @@ -1,4 +1,4 @@ -package gev +package eventloop import ( "container/heap" diff --git a/load_balance.go b/eventloop/load_balance.go similarity index 57% rename from load_balance.go rename to eventloop/load_balance.go index f32dbbd..419cfca 100644 --- a/load_balance.go +++ b/eventloop/load_balance.go @@ -1,12 +1,10 @@ -package gev +package eventloop -import "github.com/Allenxuxu/gev/eventloop" - -type LoadBalanceStrategy func([]*eventloop.EventLoop) *eventloop.EventLoop +type LoadBalanceStrategy func([]*EventLoop) *EventLoop func RoundRobin() LoadBalanceStrategy { var nextLoopIndex int - return func(loops []*eventloop.EventLoop) *eventloop.EventLoop { + return func(loops []*EventLoop) *EventLoop { l := loops[nextLoopIndex] nextLoopIndex = (nextLoopIndex + 1) % len(loops) return l @@ -14,7 +12,7 @@ func RoundRobin() LoadBalanceStrategy { } func LeastConnection() LoadBalanceStrategy { - return func(loops []*eventloop.EventLoop) *eventloop.EventLoop { + return func(loops []*EventLoop) *EventLoop { l := loops[0] for i := 1; i < len(loops); i++ { diff --git a/load_balance_test.go b/eventloop/load_balance_test.go similarity index 80% rename from load_balance_test.go rename to eventloop/load_balance_test.go index 8e99208..1e3331f 100644 --- a/load_balance_test.go +++ b/eventloop/load_balance_test.go @@ -1,23 +1,21 @@ -package gev +package eventloop import ( "math/rand" "testing" "github.com/stretchr/testify/assert" - - "github.com/Allenxuxu/gev/eventloop" ) func TestLeastConnection(t *testing.T) { var ( - loops []*eventloop.EventLoop + loops []*EventLoop n = 100 min int64 ) for i := 0; i < n; i++ { - l := &eventloop.EventLoop{} + l := &EventLoop{} connCount := int64(rand.Intn(n)) l.ConnCunt.Swap(connCount) loops = append(loops, l) @@ -37,12 +35,12 @@ func TestLeastConnection(t *testing.T) { func TestRoundRobin(t *testing.T) { var ( - loops []*eventloop.EventLoop + loops []*EventLoop n = 100 ) for i := 0; i < n; i++ { - l := &eventloop.EventLoop{} + l := &EventLoop{} l.ConnCunt.Swap(int64(i)) loops = append(loops, l) } diff --git a/options.go b/options.go index 4f72288..b2fbb44 100644 --- a/options.go +++ b/options.go @@ -3,6 +3,8 @@ package gev import ( "time" + "github.com/Allenxuxu/gev/eventloop" + "github.com/Allenxuxu/gev/connection" ) @@ -14,7 +16,7 @@ type Options struct { ReusePort bool IdleTime time.Duration Protocol connection.Protocol - Strategy LoadBalanceStrategy + Strategy eventloop.LoadBalanceStrategy tick time.Duration wheelSize int64 @@ -47,7 +49,7 @@ func newOptions(opt ...Option) *Options { opts.Protocol = &connection.DefaultProtocol{} } if opts.Strategy == nil { - opts.Strategy = RoundRobin() + opts.Strategy = eventloop.RoundRobin() } return &opts @@ -95,7 +97,7 @@ func IdleTime(t time.Duration) Option { } } -func LoadBalance(strategy LoadBalanceStrategy) Option { +func LoadBalance(strategy eventloop.LoadBalanceStrategy) Option { return func(o *Options) { o.Strategy = strategy } diff --git a/plugins/protobuf/protobuf.go b/plugins/protobuf/protobuf.go index 7ea1435..f93f256 100644 --- a/plugins/protobuf/protobuf.go +++ b/plugins/protobuf/protobuf.go @@ -5,11 +5,11 @@ import "encoding/binary" // PackMessage 按自定义协议打包数据 func PackMessage(msgType string, data []byte) []byte { typeLen := len(msgType) - len := len(data) + typeLen + 2 + length := len(data) + typeLen + 2 - ret := make([]byte, len+4) + ret := make([]byte, length+4) - binary.BigEndian.PutUint32(ret, uint32(len)) + binary.BigEndian.PutUint32(ret, uint32(length)) binary.BigEndian.PutUint16(ret[4:], uint16(typeLen)) copy(ret[6:], msgType) copy(ret[6+typeLen:], data) diff --git a/plugins/protobuf/protocol.go b/plugins/protobuf/protocol.go index 698e26a..bc71764 100644 --- a/plugins/protobuf/protocol.go +++ b/plugins/protobuf/protocol.go @@ -26,8 +26,8 @@ func New() *Protocol { // UnPacket ... func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) { if buffer.Length() > 6 { - len := int(buffer.PeekUint32()) - if buffer.Length() >= len+4 { + length := int(buffer.PeekUint32()) + if buffer.Length() >= length+4 { buffer.Retrieve(4) typeLen := int(buffer.PeekUint16()) @@ -36,7 +36,7 @@ func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuf typeByte := pbytes.GetLen(typeLen) _, _ = buffer.Read(typeByte) - dataLen := len - 2 - typeLen + dataLen := length - 2 - typeLen data := make([]byte, dataLen) _, _ = buffer.Read(data) diff --git a/server_conn_test.go b/server_conn_test.go index 1facf8e..0d200e4 100644 --- a/server_conn_test.go +++ b/server_conn_test.go @@ -6,12 +6,11 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - - "github.com/Allenxuxu/toolkit/sync" - "github.com/Allenxuxu/gev/connection" + "github.com/Allenxuxu/gev/eventloop" "github.com/Allenxuxu/gev/log" + "github.com/Allenxuxu/toolkit/sync" + "github.com/stretchr/testify/assert" ) type example2 struct { @@ -131,7 +130,7 @@ func TestConnLoadBalanceLeastConnection(t *testing.T) { Address(":1840"), NumLoops(4), ReusePort(true), - LoadBalance(LeastConnection())) + LoadBalance(eventloop.LeastConnection())) if err != nil { t.Fatal(err) } @@ -163,7 +162,7 @@ func TestConnLoadBalanceRoundRobin(t *testing.T) { Address(":1841"), NumLoops(4), ReusePort(true), - LoadBalance(RoundRobin())) + LoadBalance(eventloop.RoundRobin())) if err != nil { t.Fatal(err) } diff --git a/server_test.go b/server_test.go index 3607a59..f06353b 100644 --- a/server_test.go +++ b/server_test.go @@ -10,24 +10,27 @@ import ( "time" "github.com/Allenxuxu/gev/connection" + "github.com/Allenxuxu/gev/connector" "github.com/Allenxuxu/gev/log" "github.com/Allenxuxu/toolkit/sync" "github.com/Allenxuxu/toolkit/sync/atomic" ) type example struct { - Count atomic.Int64 + Count atomic.Int64 + JustCount atomic.Int64 } func (s *example) OnConnect(c *connection.Connection) { s.Count.Add(1) - //log.Println(" OnConnect : ", c.PeerAddr()) + s.JustCount.Add(1) + // log.Info(" OnConnect : ", c.PeerAddr()) } func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) { - //log.Println("OnMessage") + // log.Info("OnMessage") - //out = data + // out = data msg := append([]byte{}, data...) if err := c.Send(msg); err != nil { panic(err) @@ -37,7 +40,6 @@ func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []by func (s *example) OnClose(c *connection.Connection) { s.Count.Add(-1) - //log.Println("OnClose") } func TestServer_Start(t *testing.T) { @@ -97,6 +99,147 @@ func startClient(network, addr string) { } } +func TestServer_StopWithClient(t *testing.T) { + handler := new(example) + + s, err := NewServer(handler, + Network("tcp"), + Address("127.0.0.1:1835"), + NumLoops(8), + ReusePort(true)) + if err != nil { + t.Fatal(err) + } + + go s.Start() + + cb := new(clientCallback) + var success, failed atomic.Int64 + + connector, err := connector.NewConnector() + if err != nil { + t.Fatal(err) + } + defer connector.Stop() + go connector.Start() + + time.Sleep(time.Second * 3) + + wg := &sync.WaitGroupWrapper{} + for i := 0; i < 100; i++ { + wg.AddAndRun(func() { + conn, err := connector.DialWithTimeout(time.Second*10, "tcp", "127.0.0.1:1835", cb, nil, 0) + if err != nil { + failed.Add(1) + log.Info("error", err) + return + } + success.Add(1) + if err := conn.Close(); err != nil { + panic(err) + } + }) + } + + wg.Wait() + time.Sleep(time.Second * 3) + log.Infof("Success: %d Failed: %d\n", success, failed) + + count := handler.Count.Get() + if count != 0 { + t.Fatal(count) + } + + if failed.Get() > 0 { + t.Fatal(failed.Get()) + } + + connCount := handler.JustCount.Get() + if connCount != 100 { + t.Fatal(connCount) + } + + s.Stop() +} + +func TestServer_StopAndSendWithClient(t *testing.T) { + handler := new(example) + + s, err := NewServer(handler, + Network("tcp"), + Address("127.0.0.1:1836"), + NumLoops(8), + ReusePort(true)) + if err != nil { + t.Fatal(err) + } + + go s.Start() + cb := new(clientCallback) + var success, failed atomic.Int64 + wg := &sync.WaitGroupWrapper{} + + connector, err := connector.NewConnector() + if err != nil { + t.Fatal(err) + } + + defer connector.Stop() + go connector.Start() + + log.Info("start handling") + time.Sleep(time.Second * 3) + for i := 0; i < 100; i++ { + wg.AddAndRun(func() { + conn, err := connector.DialWithTimeout(time.Second*10, "tcp", "127.0.0.1:1836", cb, nil, 0) + if err != nil { + failed.Add(1) + log.Info("error", err) + return + } + + err = conn.Send([]byte("data_test")) + if err != nil { + panic(err) + } + // waiting for callback executed + time.Sleep(time.Second * 2) + if err := conn.Close(); err != nil { + panic(err) + } + success.Add(1) + }) + } + + wg.Wait() + log.Infof("Success: %d Failed: %d\n", success, failed) + + time.Sleep(time.Second * 3) + count := handler.Count.Get() + if count != 0 { + t.Fatal(count) + } + if cb.reqCount.Get() != 100 { + t.Fatal(cb.reqCount.Get()) + } + + s.Stop() +} + +type clientCallback struct { + reqCount atomic.Int64 +} + +func (cc *clientCallback) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) { + // log.Info("client OnMessage", string(data)) + cc.reqCount.Add(1) + return +} + +func (cc *clientCallback) OnClose(c *connection.Connection) { + // log.Info("client OnClose") +} + func ExampleServer_RunAfter() { handler := new(example) @@ -204,13 +347,13 @@ type example1 struct { func (s *example1) OnConnect(c *connection.Connection) { s.Count.Add(1) _ = c.Send([]byte("hello gev")) - //log.Println(" OnConnect : ", c.PeerAddr()) + // log.Println(" OnConnect : ", c.PeerAddr()) } func (s *example1) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) { - //log.Println("OnMessage") + // log.Println("OnMessage") - //out = data + // out = data if err := c.Send(data); err != nil { panic(err) } @@ -219,7 +362,7 @@ func (s *example1) OnMessage(c *connection.Connection, ctx interface{}, data []b func (s *example1) OnClose(c *connection.Connection) { s.Count.Add(-1) - //log.Println("OnClose") + // log.Println("OnClose") } func TestServer_Stop1(t *testing.T) {