Skip to content

Commit 78b2553

Browse files
authored
feat: replace LongConn with ClientConn interface and simplify message handling (#3643)
* fix: performance issues with Kafka caused by encapsulating the MQ interface * fix: admin token in standalone mode * fix: full id version * fix: resolve deadlock in cache eviction and improve GetBatch implementation * refactor: replace LongConn with ClientConn interface and simplify message handling * refactor: replace LongConn with ClientConn interface and simplify message handling
1 parent 6f33c0a commit 78b2553

File tree

5 files changed

+433
-461
lines changed

5 files changed

+433
-461
lines changed

internal/msggateway/client.go

Lines changed: 10 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package msggateway
1616

1717
import (
1818
"context"
19-
"encoding/json"
2019
"fmt"
2120
"sync"
2221
"sync/atomic"
@@ -31,7 +30,6 @@ import (
3130
"github.com/openimsdk/tools/errs"
3231
"github.com/openimsdk/tools/log"
3332
"github.com/openimsdk/tools/mcontext"
34-
"github.com/openimsdk/tools/utils/stringutil"
3533
)
3634

3735
var (
@@ -64,7 +62,7 @@ type PingPongHandler func(string) error
6462

6563
type Client struct {
6664
w *sync.Mutex
67-
conn LongConn
65+
conn ClientConn
6866
PlatformID int `json:"platformID"`
6967
IsCompress bool `json:"isCompress"`
7068
UserID string `json:"userID"`
@@ -84,10 +82,10 @@ type Client struct {
8482
}
8583

8684
// ResetClient updates the client's state with new connection and context information.
87-
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer) {
85+
func (c *Client) ResetClient(ctx *UserConnContext, conn ClientConn, longConnServer LongConnServer) {
8886
c.w = new(sync.Mutex)
8987
c.conn = conn
90-
c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID())
88+
c.PlatformID = ctx.GetPlatformID()
9189
c.IsCompress = ctx.GetCompression()
9290
c.IsBackground = ctx.GetBackground()
9391
c.UserID = ctx.GetUserID()
@@ -112,22 +110,6 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
112110
c.subUserIDs = make(map[string]struct{})
113111
}
114112

115-
func (c *Client) pingHandler(appData string) error {
116-
if err := c.conn.SetReadDeadline(pongWait); err != nil {
117-
return err
118-
}
119-
120-
log.ZDebug(c.ctx, "ping Handler Success.", "appData", appData)
121-
return c.writePongMsg(appData)
122-
}
123-
124-
func (c *Client) pongHandler(_ string) error {
125-
if err := c.conn.SetReadDeadline(pongWait); err != nil {
126-
return err
127-
}
128-
return nil
129-
}
130-
131113
// readMessage continuously reads messages from the connection.
132114
func (c *Client) readMessage() {
133115
defer func() {
@@ -138,52 +120,25 @@ func (c *Client) readMessage() {
138120
c.close()
139121
}()
140122

141-
c.conn.SetReadLimit(maxMessageSize)
142-
_ = c.conn.SetReadDeadline(pongWait)
143-
c.conn.SetPongHandler(c.pongHandler)
144-
c.conn.SetPingHandler(c.pingHandler)
145-
c.activeHeartbeat(c.hbCtx)
146-
147123
for {
148124
log.ZDebug(c.ctx, "readMessage")
149-
messageType, message, returnErr := c.conn.ReadMessage()
125+
message, returnErr := c.conn.ReadMessage()
150126
if returnErr != nil {
151-
log.ZWarn(c.ctx, "readMessage", returnErr, "messageType", messageType)
127+
log.ZWarn(c.ctx, "readMessage", returnErr)
152128
c.closedErr = returnErr
153129
return
154130
}
155131

156-
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
157132
if c.closed.Load() {
158133
// The scenario where the connection has just been closed, but the coroutine has not exited
159134
c.closedErr = ErrConnClosed
160135
return
161136
}
162137

163-
switch messageType {
164-
case MessageBinary:
165-
_ = c.conn.SetReadDeadline(pongWait)
166-
parseDataErr := c.handleMessage(message)
167-
if parseDataErr != nil {
168-
c.closedErr = parseDataErr
169-
return
170-
}
171-
case MessageText:
172-
_ = c.conn.SetReadDeadline(pongWait)
173-
parseDataErr := c.handlerTextMessage(message)
174-
if parseDataErr != nil {
175-
c.closedErr = parseDataErr
176-
return
177-
}
178-
case PingMessage:
179-
err := c.writePongMsg("")
180-
log.ZError(c.ctx, "writePongMsg", err)
181-
182-
case CloseMessage:
183-
c.closedErr = ErrClientClosed
138+
parseDataErr := c.handleMessage(message)
139+
if parseDataErr != nil {
140+
c.closedErr = parseDataErr
184141
return
185-
186-
default:
187142
}
188143
}
189144
}
@@ -358,109 +313,13 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
358313
c.w.Lock()
359314
defer c.w.Unlock()
360315

361-
err = c.conn.SetWriteDeadline(writeWait)
362-
if err != nil {
363-
return err
364-
}
365-
366316
if c.IsCompress {
367317
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
368318
if compressErr != nil {
369319
return compressErr
370320
}
371-
return c.conn.WriteMessage(MessageBinary, resultBuf)
372-
}
373-
374-
return c.conn.WriteMessage(MessageBinary, encodedBuf)
375-
}
376-
377-
// Actively initiate Heartbeat when platform in Web.
378-
func (c *Client) activeHeartbeat(ctx context.Context) {
379-
if c.PlatformID == constant.WebPlatformID {
380-
go func() {
381-
defer func() {
382-
if r := recover(); r != nil {
383-
log.ZPanic(ctx, "activeHeartbeat Panic", errs.ErrPanic(r))
384-
}
385-
}()
386-
log.ZDebug(ctx, "server initiative send heartbeat start.")
387-
ticker := time.NewTicker(pingPeriod)
388-
defer ticker.Stop()
389-
390-
for {
391-
select {
392-
case <-ticker.C:
393-
if err := c.writePingMsg(); err != nil {
394-
log.ZWarn(c.ctx, "send Ping Message error.", err)
395-
return
396-
}
397-
case <-c.hbCtx.Done():
398-
return
399-
}
400-
}
401-
}()
402-
}
403-
}
404-
func (c *Client) writePingMsg() error {
405-
if c.closed.Load() {
406-
return nil
321+
return c.conn.WriteMessage(resultBuf)
407322
}
408323

409-
c.w.Lock()
410-
defer c.w.Unlock()
411-
412-
err := c.conn.SetWriteDeadline(writeWait)
413-
if err != nil {
414-
return err
415-
}
416-
417-
return c.conn.WriteMessage(PingMessage, nil)
418-
}
419-
420-
func (c *Client) writePongMsg(appData string) error {
421-
log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
422-
if c.closed.Load() {
423-
log.ZWarn(c.ctx, "is closed in server", nil, "appdata", appData, "closed err", c.closedErr)
424-
return nil
425-
}
426-
427-
c.w.Lock()
428-
defer c.w.Unlock()
429-
430-
err := c.conn.SetWriteDeadline(writeWait)
431-
if err != nil {
432-
log.ZWarn(c.ctx, "SetWriteDeadline in Server have error", errs.Wrap(err), "writeWait", writeWait, "appData", appData)
433-
return errs.Wrap(err)
434-
}
435-
err = c.conn.WriteMessage(PongMessage, []byte(appData))
436-
if err != nil {
437-
log.ZWarn(c.ctx, "Write Message have error", errs.Wrap(err), "Pong msg", PongMessage)
438-
}
439-
440-
return errs.Wrap(err)
441-
}
442-
443-
func (c *Client) handlerTextMessage(b []byte) error {
444-
var msg TextMessage
445-
if err := json.Unmarshal(b, &msg); err != nil {
446-
return err
447-
}
448-
switch msg.Type {
449-
case TextPong:
450-
return nil
451-
case TextPing:
452-
msg.Type = TextPong
453-
msgData, err := json.Marshal(msg)
454-
if err != nil {
455-
return err
456-
}
457-
c.w.Lock()
458-
defer c.w.Unlock()
459-
if err := c.conn.SetWriteDeadline(writeWait); err != nil {
460-
return err
461-
}
462-
return c.conn.WriteMessage(MessageText, msgData)
463-
default:
464-
return fmt.Errorf("not support message type %s", msg.Type)
465-
}
324+
return c.conn.WriteMessage(encodedBuf)
466325
}

0 commit comments

Comments
 (0)