@@ -20,14 +20,14 @@ const dropBlockTimeout = 10 * time.Minute
2020// Conn connection
2121type Conn struct {
2222 sync.RWMutex
23- cfg * global.Configure
24- conn * network.Conn
23+ cfg * global.Configure // configure
24+ conn * network.Conn // connection wrap, read write with timeout
2525 read map [string ]chan * network.Msg // link id => channel
2626 unknownRead chan * network.Msg // read message without link
27- onDisconnect chan string
28- write chan * network.Msg
29- lockDrop sync.RWMutex
30- drop map [string ]time.Time
27+ onDisconnect chan string // on disconnect channel, the value is clientid
28+ write chan * network.Msg // write queue
29+ lockDrop sync.RWMutex // drop mutex
30+ drop map [string ]time.Time // drop cache, drop message when this link is closed
3131 // runtime
3232 ctx context.Context
3333 cancel context.CancelFunc
@@ -52,11 +52,12 @@ func New(cfg *global.Configure) *Conn {
5252 return conn
5353}
5454
55+ // connect connect server and write handshake packet
5556func (conn * Conn ) connect () error {
5657 var dial net.Conn
5758 var err error
5859 if conn .cfg .UseSSL {
59- if conn .cfg .SSLInsecure {
60+ if conn .cfg .SSLInsecure { // disable sni
6061 rawConn , err := net .Dial ("tcp" , conn .cfg .Server )
6162 if err != nil {
6263 logging .Error ("raw dial: %v" , err )
@@ -96,6 +97,7 @@ func (conn *Conn) close() {
9697 }
9798}
9899
100+ // writeHandshake send handshake message, default timeout is 5 seconds
99101func writeHandshake (conn * network.Conn , cfg * global.Configure ) error {
100102 var msg network.Msg
101103 msg .XType = network .Msg_handshake
@@ -109,13 +111,22 @@ func writeHandshake(conn *network.Conn, cfg *global.Configure) error {
109111 return conn .WriteMessage (& msg , 5 * time .Second )
110112}
111113
114+ // isDrop check the message is dropped by linkid
112115func (conn * Conn ) isDrop (linkID string ) bool {
113116 conn .lockDrop .RLock ()
114117 defer conn .lockDrop .RUnlock ()
115118 _ , ok := conn .drop [linkID ]
116119 return ok
117120}
118121
122+ // addDrop add to drop queue
123+ func (conn * Conn ) addDrop (linkID string ) {
124+ conn .lockDrop .Lock ()
125+ defer conn .lockDrop .Unlock ()
126+ conn .drop [linkID ] = time .Now ().Add (dropBlockTimeout )
127+ }
128+
129+ // getChan get read channel by linkid
119130func (conn * Conn ) getChan (linkID string ) chan * network.Msg {
120131 conn .RLock ()
121132 ch := conn .read [linkID ]
@@ -126,50 +137,66 @@ func (conn *Conn) getChan(linkID string) chan *network.Msg {
126137 return ch
127138}
128139
129- func (conn * Conn ) hookDispatch (ch chan * network.Msg , msg * network.Msg ) bool {
140+ // hookDispatch hook message before dispatcher
141+ func (conn * Conn ) hookDispatch (msg * network.Msg ) bool {
130142 switch msg .GetXType () {
143+ // if disconnected add linkid to drop list, and break the handle chain
131144 case network .Msg_disconnect :
132- conn .lockDrop .Lock ()
133- conn .drop [msg .GetLinkId ()] = time .Now ().Add (dropBlockTimeout )
134- conn .lockDrop .Unlock ()
135- conn .onDisconnect <- msg .GetLinkId ()
145+ conn .addDrop (msg .GetLinkId ())
146+ // TODO: no need will block
147+ // conn.onDisconnect <- msg.GetLinkId()
136148 logging .Info ("connection %s disconnected" , msg .GetLinkId ())
137149 return false
138150 }
139151 return true
140152}
141153
154+ // handleLinkedMessage linked message handler, return false means break read loop
155+ func (conn * Conn ) handleLinkedMessage (msg * network.Msg ) bool {
156+ linkID := msg .GetLinkId ()
157+ if conn .isDrop (linkID ) {
158+ return true
159+ }
160+ if ! conn .hookDispatch (msg ) {
161+ return true
162+ }
163+ ch := conn .getChan (linkID )
164+ select {
165+ case ch <- msg :
166+ case <- time .After (conn .cfg .WriteTimeout ):
167+ logging .Error ("drop message: %s" , msg .GetXType ().String ())
168+ conn .addDrop (linkID )
169+ case <- conn .ctx .Done ():
170+ return false
171+ }
172+ return true
173+ }
174+
175+ // handleUnlinkedMessage unlinked message handler, return false means break read loop
176+ func (conn * Conn ) handleUnlinkedMessage (msg * network.Msg ) bool {
177+ // TODO
178+ return true
179+ }
180+
181+ // loopRead loop read message
142182func (conn * Conn ) loopRead () {
143183 defer utils .Recover ("loopRead" )
144184 defer conn .close ()
145185 defer conn .cancel ()
146186 var timeout int
147187 run := func (msg * network.Msg ) bool {
148188 timeout = 0
189+ // skip keepalive message
149190 if msg .GetXType () == network .Msg_keepalive {
150191 return true
151192 }
152193 logging .Debug ("read message %s(%s) from %s" ,
153194 msg .GetXType ().String (), msg .GetLinkId (), msg .GetFrom ())
154195 linkID := msg .GetLinkId ()
155- if conn .isDrop (linkID ) {
156- return true
157- }
158- ch := conn .getChan (linkID )
159- if ! conn .hookDispatch (ch , msg ) {
160- return true
161- }
162- select {
163- case ch <- msg :
164- case <- time .After (conn .cfg .WriteTimeout ):
165- logging .Error ("drop message: %s" , msg .GetXType ().String ())
166- conn .lockDrop .Lock ()
167- conn .drop [msg .GetLinkId ()] = time .Now ().Add (dropBlockTimeout )
168- conn .lockDrop .Unlock ()
169- case <- conn .ctx .Done ():
170- return false
196+ if len (linkID ) > 0 {
197+ return conn .handleLinkedMessage (msg )
171198 }
172- return true
199+ return conn . handleUnlinkedMessage ( msg )
173200 }
174201 for {
175202 msg , _ , err := conn .conn .ReadMessage (conn .cfg .ReadTimeout )
@@ -191,6 +218,7 @@ func (conn *Conn) loopRead() {
191218 }
192219}
193220
221+ // loopWrite loop write message
194222func (conn * Conn ) loopWrite () {
195223 defer utils .Recover ("loopWrite" )
196224 defer conn .close ()
@@ -212,6 +240,7 @@ func (conn *Conn) loopWrite() {
212240 }
213241}
214242
243+ // keepalive loop send keepalive message
215244func (conn * Conn ) keepalive () {
216245 defer utils .Recover ("keepalive" )
217246 defer conn .close ()
@@ -237,8 +266,8 @@ func (conn *Conn) AddLink(id string) {
237266 conn .Unlock ()
238267}
239268
240- // Reset reset message next read
241- func (conn * Conn ) Reset (id string , msg * network.Msg ) {
269+ // Requeue requeue for next read
270+ func (conn * Conn ) Requeue (id string , msg * network.Msg ) {
242271 conn .RLock ()
243272 ch := conn .read [id ]
244273 conn .RUnlock ()
@@ -262,6 +291,7 @@ func (conn *Conn) ChanDisconnect() <-chan string {
262291 return conn .onDisconnect
263292}
264293
294+ // checkDrop clear timeouted drop queue
265295func (conn * Conn ) checkDrop () {
266296 for {
267297 time .Sleep (time .Second )
@@ -298,7 +328,5 @@ func (conn *Conn) ChanClose(id string) {
298328 delete (conn .read , id )
299329 conn .Unlock ()
300330
301- conn .lockDrop .Lock ()
302- conn .drop [id ] = time .Now ().Add (dropBlockTimeout )
303- conn .lockDrop .Unlock ()
331+ conn .addDrop (id )
304332}
0 commit comments