Skip to content

Commit 7ab8f15

Browse files
authored
Merge pull request #285 from xiongyuhmba/master
connection模块修复与优化
2 parents e22f95c + 153fe8d commit 7ab8f15

File tree

2 files changed

+72
-45
lines changed

2 files changed

+72
-45
lines changed

go.mod

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,23 @@ go 1.17
55
require (
66
github.com/golang/protobuf v1.5.3
77
github.com/gorilla/websocket v1.5.0
8+
github.com/stretchr/testify v1.8.1
9+
github.com/xtaci/kcp-go v5.4.20+incompatible
10+
)
11+
12+
require (
13+
github.com/davecgh/go-spew v1.1.1 // indirect
14+
github.com/klauspost/cpuid/v2 v2.1.1 // indirect
815
github.com/klauspost/reedsolomon v1.11.8 // indirect
916
github.com/pkg/errors v0.9.1 // indirect
10-
github.com/stretchr/testify v1.8.1
17+
github.com/pmezard/go-difflib v1.0.0 // indirect
1118
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect
1219
github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b // indirect
1320
github.com/tjfoc/gmsm v1.4.1 // indirect
14-
github.com/xtaci/kcp-go v5.4.20+incompatible
1521
github.com/xtaci/lossyconn v0.0.0-20200209145036-adba10fffc37 // indirect
22+
golang.org/x/crypto v0.11.0 // indirect
1623
golang.org/x/net v0.12.0 // indirect
24+
golang.org/x/sys v0.10.0 // indirect
25+
google.golang.org/protobuf v1.26.0 // indirect
26+
gopkg.in/yaml.v3 v3.0.1 // indirect
1727
)

znet/connection.go

Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net"
88
"strconv"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/aceld/zinx/zconf"
@@ -54,9 +55,9 @@ type Connection struct {
5455
// (有缓冲管道,用于读、写两个goroutine之间的消息通信)
5556
msgBuffChan chan []byte
5657

57-
// Lock for user message reception and transmission
58-
// (用户收发消息的Lock)
59-
msgLock sync.RWMutex
58+
// Go StartWriter Flag
59+
// (开始初始化写协程标志)
60+
startWriterFlag int32
6061

6162
// Connection properties
6263
// (链接属性)
@@ -68,7 +69,7 @@ type Connection struct {
6869

6970
// The current connection's close state
7071
// (当前连接的关闭状态)
71-
isClosed bool
72+
closed int32
7273

7374
// Which Connection Manager the current connection belongs to
7475
// (当前链接是属于哪个Connection Manager的)
@@ -117,15 +118,16 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I
117118

118119
// Initialize Conn properties
119120
c := &Connection{
120-
conn: conn,
121-
connID: connID,
122-
connIdStr: strconv.FormatUint(connID, 10),
123-
isClosed: false,
124-
msgBuffChan: nil,
125-
property: nil,
126-
name: server.ServerName(),
127-
localAddr: conn.LocalAddr().String(),
128-
remoteAddr: conn.RemoteAddr().String(),
121+
conn: conn,
122+
connID: connID,
123+
connIdStr: strconv.FormatUint(connID, 10),
124+
closed: 0,
125+
startWriterFlag: 0,
126+
msgBuffChan: nil,
127+
property: nil,
128+
name: server.ServerName(),
129+
localAddr: conn.LocalAddr().String(),
130+
remoteAddr: conn.RemoteAddr().String(),
129131
}
130132

131133
lengthField := server.GetLengthField()
@@ -154,15 +156,16 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I
154156
// (创建一个Client服务端特性的连接的方法)
155157
func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection {
156158
c := &Connection{
157-
conn: conn,
158-
connID: 0, // client ignore
159-
connIdStr: "", // client ignore
160-
isClosed: false,
161-
msgBuffChan: nil,
162-
property: nil,
163-
name: client.GetName(),
164-
localAddr: conn.LocalAddr().String(),
165-
remoteAddr: conn.RemoteAddr().String(),
159+
conn: conn,
160+
connID: 0, // client ignore
161+
connIdStr: "", // client ignore
162+
closed: 0,
163+
startWriterFlag: 0,
164+
msgBuffChan: nil,
165+
property: nil,
166+
name: client.GetName(),
167+
localAddr: conn.LocalAddr().String(),
168+
remoteAddr: conn.RemoteAddr().String(),
166169
}
167170

168171
lengthField := client.GetLengthField()
@@ -346,9 +349,7 @@ func (c *Connection) LocalAddr() net.Addr {
346349
}
347350

348351
func (c *Connection) Send(data []byte) error {
349-
c.msgLock.RLock()
350-
defer c.msgLock.RUnlock()
351-
if c.isClosed == true {
352+
if c.isClosed() == true {
352353
return errors.New("connection closed when send msg")
353354
}
354355

@@ -362,10 +363,8 @@ func (c *Connection) Send(data []byte) error {
362363
}
363364

364365
func (c *Connection) SendToQueue(data []byte) error {
365-
c.msgLock.RLock()
366-
defer c.msgLock.RUnlock()
367366

368-
if c.msgBuffChan == nil {
367+
if c.msgBuffChan == nil && c.setStartWriterFlag() {
369368
c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen)
370369
// Start a Goroutine to write data back to the client
371370
// This method only reads data from the MsgBuffChan without allocating memory or starting a Goroutine
@@ -377,7 +376,7 @@ func (c *Connection) SendToQueue(data []byte) error {
377376
idleTimeout := time.NewTimer(5 * time.Millisecond)
378377
defer idleTimeout.Stop()
379378

380-
if c.isClosed == true {
379+
if c.isClosed() == true {
381380
return errors.New("Connection closed when send buff msg")
382381
}
383382

@@ -398,7 +397,7 @@ func (c *Connection) SendToQueue(data []byte) error {
398397
// SendMsg directly sends Message data to the remote TCP client.
399398
// (直接将Message数据发送数据给远程的TCP客户端)
400399
func (c *Connection) SendMsg(msgID uint32, data []byte) error {
401-
if c.isClosed == true {
400+
if c.isClosed() == true {
402401
return errors.New("connection closed when send msg")
403402
}
404403
// Pack data and send it
@@ -418,10 +417,10 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error {
418417
}
419418

420419
func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
421-
if c.isClosed == true {
420+
if c.isClosed() == true {
422421
return errors.New("connection closed when send buff msg")
423422
}
424-
if c.msgBuffChan == nil {
423+
if c.msgBuffChan == nil && c.setStartWriterFlag() {
425424
c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen)
426425
// Start a Goroutine to write data back to the client
427426
// This method only reads data from the MsgBuffChan without allocating memory or starting a Goroutine
@@ -481,18 +480,20 @@ func (c *Connection) Context() context.Context {
481480
}
482481

483482
func (c *Connection) finalizer() {
484-
// Call the callback function registered by the user when closing the connection if it exists
485-
// (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用)
486-
c.callOnConnStop()
487-
488-
c.msgLock.Lock()
489-
defer c.msgLock.Unlock()
490-
491483
// If the connection has already been closed
492-
if c.isClosed == true {
484+
if c.isClosed() == true {
493485
return
494486
}
495487

488+
//set closed
489+
if !c.setClose() {
490+
return
491+
}
492+
493+
// Call the callback function registered by the user when closing the connection if it exists
494+
// (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用)
495+
c.callOnConnStop()
496+
496497
// Stop the heartbeat detector associated with the connection
497498
if c.hc != nil {
498499
c.hc.Stop()
@@ -511,8 +512,6 @@ func (c *Connection) finalizer() {
511512
close(c.msgBuffChan)
512513
}
513514

514-
c.isClosed = true
515-
516515
zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID)
517516
}
518517

@@ -531,7 +530,7 @@ func (c *Connection) callOnConnStop() {
531530
}
532531

533532
func (c *Connection) IsAlive() bool {
534-
if c.isClosed {
533+
if c.isClosed() {
535534
return false
536535
}
537536
// Check the last activity time of the connection. If it's beyond the heartbeat interval,
@@ -563,3 +562,21 @@ func (c *Connection) GetName() string {
563562
func (c *Connection) GetMsgHandler() ziface.IMsgHandle {
564563
return c.msgHandler
565564
}
565+
566+
func (c *Connection) isClosed() bool {
567+
return atomic.LoadInt32(&c.closed) != 0
568+
}
569+
570+
func (c *Connection) setClose() bool {
571+
if atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
572+
return true
573+
}
574+
return false
575+
}
576+
577+
func (c *Connection) setStartWriterFlag() bool {
578+
if atomic.CompareAndSwapInt32(&c.startWriterFlag, 0, 1) {
579+
return true
580+
}
581+
return false
582+
}

0 commit comments

Comments
 (0)