Skip to content

Commit e59db4e

Browse files
authored
fix: optimize conversation message handling and improve message synchronization (#1079)
* fix: optimize conversation message handling and improve message synchronization * fix: optimize conversation message handling and improve message synchronization * fix: optimize conversation message handling and improve message synchronization * fix: optimize conversation message handling and improve message synchronization
1 parent 31a30f4 commit e59db4e

File tree

6 files changed

+495
-51
lines changed

6 files changed

+495
-51
lines changed

internal/conversation_msg/conversation_msg.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
239239

240240
for conversationID, msgs := range allMsg {
241241
conversationIDs = append(conversationIDs, conversationID)
242+
242243
log.ZDebug(ctx, "parse message in one conversation", "conversationID", conversationID, "message length", len(msgs.Msgs), "data", msgs.Msgs)
243244

244245
clientIDs := make([]string, 0, len(msgs.Msgs))
@@ -450,27 +451,27 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
450451
}
451452
}
452453

453-
if err := c.db.BatchUpdateConversationList(ctx, append(mapConversationToList(conversationChangedSet), mapConversationToList(phConversationChangedSet)...)); err != nil {
454+
if err := c.db.BatchUpdateConversationList(ctx, append(datautil.Values(conversationChangedSet), datautil.Values(phConversationChangedSet)...)); err != nil {
454455
log.ZError(ctx, "insert changed conversation err :", err)
455456
}
456457
//New conversation storage
457458

458-
if err := c.db.BatchInsertConversationList(ctx, mapConversationToList(phNewConversationSet)); err != nil {
459+
if err := c.db.BatchInsertConversationList(ctx, datautil.Values(phNewConversationSet)); err != nil {
459460
log.ZError(ctx, "insert new conversation err:", err)
460461
}
461462
log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
462463

463464
c.newMessage(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
464465

465466
if len(newConversationSet) > 0 {
466-
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.NewConDirect, Args: utils.StructToJsonString(mapConversationToList(newConversationSet))}})
467+
c.ConversationListener().OnNewConversation(utils.StructToJsonString(datautil.Values(newConversationSet)))
467468
}
468469
if len(conversationChangedSet) > 0 {
469-
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.ConChangeDirect, Args: utils.StructToJsonString(mapConversationToList(conversationChangedSet))}})
470+
c.ConversationListener().OnConversationChanged(utils.StructToJsonString(datautil.Values(conversationChangedSet)))
470471
}
471472

472473
if isTriggerUnReadCount {
473-
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.TotalUnreadMessageChanged, Args: ""}})
474+
_ = c.OnTotalUnreadMessageCountChanged(ctx)
474475
}
475476

476477
for _, msgs := range allMsg {
@@ -488,6 +489,18 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
488489
log.ZDebug(ctx, "insert msg", "duration", fmt.Sprintf("%dms", time.Since(b)), "len", len(allMsg))
489490
}
490491

492+
func (c *Conversation) OnTotalUnreadMessageCountChanged(ctx context.Context) error {
493+
log.ZInfo(ctx, "OnTotalUnreadMessageCountChanged", "caller", common.GetCaller(2))
494+
totalUnreadCount, err := c.db.GetTotalUnreadMsgCountDB(ctx)
495+
if err != nil {
496+
log.ZWarn(ctx, "TotalUnreadMessageChanged GetTotalUnreadMsgCountDB err", err)
497+
} else {
498+
log.ZDebug(ctx, "TotalUnreadMessageChanged", "totalUnreadCount", totalUnreadCount)
499+
c.ConversationListener().OnTotalUnreadMessageCountChanged(totalUnreadCount)
500+
}
501+
return nil
502+
}
503+
491504
func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
492505
allMsg := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Msgs
493506
ctx := c2v.Ctx

internal/conversation_msg/conversion.go

Lines changed: 109 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -97,44 +97,119 @@ func MsgDataToLocalChatLog(serverMessage *sdkws.MsgData) *model_struct.LocalChat
9797
return localMessage
9898
}
9999

100-
func LocalChatLogToMsgStruct(localMessage *model_struct.LocalChatLog) *sdk_struct.MsgStruct {
101-
message := &sdk_struct.MsgStruct{
102-
ClientMsgID: localMessage.ClientMsgID,
103-
ServerMsgID: localMessage.ServerMsgID,
104-
CreateTime: localMessage.CreateTime,
105-
SendTime: localMessage.SendTime,
106-
SessionType: localMessage.SessionType,
107-
SendID: localMessage.SendID,
108-
RecvID: localMessage.RecvID,
109-
MsgFrom: localMessage.MsgFrom,
110-
ContentType: localMessage.ContentType,
111-
SenderPlatformID: localMessage.SenderPlatformID,
112-
SenderNickname: localMessage.SenderNickname,
113-
SenderFaceURL: localMessage.SenderFaceURL,
114-
Content: localMessage.Content,
115-
Seq: localMessage.Seq,
116-
IsRead: localMessage.IsRead,
117-
Status: localMessage.Status,
118-
Ex: localMessage.Ex,
119-
LocalEx: localMessage.LocalEx,
100+
func LocalChatLogToMsgStruct(local *model_struct.LocalChatLog) *sdk_struct.MsgStruct {
101+
if local == nil {
102+
return nil
120103
}
121-
var attachedInfo sdk_struct.AttachedInfoElem
122-
err := utils.JsonStringToStruct(localMessage.AttachedInfo, &attachedInfo)
123-
if err != nil {
124-
log.ZWarn(context.Background(), "JsonStringToStruct error", err, "localMessage.AttachedInfo", localMessage.AttachedInfo)
104+
msg := &sdk_struct.MsgStruct{
105+
ClientMsgID: local.ClientMsgID,
106+
ServerMsgID: local.ServerMsgID,
107+
CreateTime: local.CreateTime,
108+
SendTime: local.SendTime,
109+
SessionType: local.SessionType,
110+
SendID: local.SendID,
111+
RecvID: local.RecvID,
112+
MsgFrom: local.MsgFrom,
113+
ContentType: local.ContentType,
114+
SenderPlatformID: local.SenderPlatformID,
115+
SenderNickname: local.SenderNickname,
116+
SenderFaceURL: local.SenderFaceURL,
117+
Content: local.Content,
118+
Seq: local.Seq,
119+
IsRead: local.IsRead,
120+
Status: local.Status,
121+
Ex: local.Ex,
122+
LocalEx: local.LocalEx,
123+
AttachedInfo: local.AttachedInfo,
125124
}
126-
message.AttachedInfoElem = &attachedInfo
127-
errParse := msgHandleByContentType(message)
128-
if errParse != nil {
129-
log.ZWarn(context.Background(), "Parsing data error", err, "messageContent", message.Content)
125+
126+
if err := PopulateMsgStructByContentType(msg); err != nil {
127+
log.ZWarn(context.Background(), "Parsing data error", err, "messageContent", msg.Content)
130128
}
131-
switch localMessage.SessionType {
132-
case constant.WriteGroupChatType:
133-
fallthrough
134-
case constant.ReadGroupChatType:
135-
message.GroupID = localMessage.RecvID
129+
130+
switch local.SessionType {
131+
case constant.WriteGroupChatType, constant.ReadGroupChatType:
132+
msg.GroupID = local.RecvID
136133
}
137-
return message
134+
return msg
135+
}
136+
137+
func PopulateMsgStructByContentType(msg *sdk_struct.MsgStruct) (err error) {
138+
switch msg.ContentType {
139+
case constant.Text:
140+
elem := sdk_struct.TextElem{}
141+
err = utils.JsonStringToStruct(msg.Content, &elem)
142+
msg.TextElem = &elem
143+
case constant.Picture:
144+
elem := sdk_struct.PictureElem{}
145+
err = utils.JsonStringToStruct(msg.Content, &elem)
146+
msg.PictureElem = &elem
147+
case constant.Sound:
148+
elem := sdk_struct.SoundElem{}
149+
err = utils.JsonStringToStruct(msg.Content, &elem)
150+
msg.SoundElem = &elem
151+
case constant.Video:
152+
elem := sdk_struct.VideoElem{}
153+
err = utils.JsonStringToStruct(msg.Content, &elem)
154+
msg.VideoElem = &elem
155+
case constant.File:
156+
elem := sdk_struct.FileElem{}
157+
err = utils.JsonStringToStruct(msg.Content, &elem)
158+
msg.FileElem = &elem
159+
case constant.AdvancedText:
160+
elem := sdk_struct.AdvancedTextElem{}
161+
err = utils.JsonStringToStruct(msg.Content, &elem)
162+
msg.AdvancedTextElem = &elem
163+
case constant.AtText:
164+
elem := sdk_struct.AtTextElem{}
165+
err = utils.JsonStringToStruct(msg.Content, &elem)
166+
msg.AtTextElem = &elem
167+
case constant.Location:
168+
elem := sdk_struct.LocationElem{}
169+
err = utils.JsonStringToStruct(msg.Content, &elem)
170+
msg.LocationElem = &elem
171+
case constant.Custom, constant.CustomMsgNotTriggerConversation, constant.CustomMsgOnlineOnly:
172+
elem := sdk_struct.CustomElem{}
173+
err = utils.JsonStringToStruct(msg.Content, &elem)
174+
msg.CustomElem = &elem
175+
case constant.Typing:
176+
elem := sdk_struct.TypingElem{}
177+
err = utils.JsonStringToStruct(msg.Content, &elem)
178+
msg.TypingElem = &elem
179+
case constant.Quote:
180+
elem := sdk_struct.QuoteElem{}
181+
err = utils.JsonStringToStruct(msg.Content, &elem)
182+
msg.QuoteElem = &elem
183+
case constant.Merger:
184+
elem := sdk_struct.MergeElem{}
185+
err = utils.JsonStringToStruct(msg.Content, &elem)
186+
msg.MergeElem = &elem
187+
case constant.Face:
188+
elem := sdk_struct.FaceElem{}
189+
err = utils.JsonStringToStruct(msg.Content, &elem)
190+
msg.FaceElem = &elem
191+
case constant.Card:
192+
elem := sdk_struct.CardElem{}
193+
err = utils.JsonStringToStruct(msg.Content, &elem)
194+
msg.CardElem = &elem
195+
case constant.MarkdownText:
196+
elem := sdk_struct.MarkdownTextElem{}
197+
err = utils.JsonStringToStruct(msg.Content, &elem)
198+
msg.MarkdownTextElem = &elem
199+
default:
200+
elem := sdk_struct.NotificationElem{}
201+
err = utils.JsonStringToStruct(msg.Content, &elem)
202+
msg.NotificationElem = &elem
203+
}
204+
var attachedInfo sdk_struct.AttachedInfoElem
205+
if msg.AttachedInfo != "" {
206+
if err := utils.JsonStringToStruct(msg.AttachedInfo, &attachedInfo); err != nil {
207+
log.ZWarn(context.Background(), "JsonStringToStruct error", err, "localMessage.AttachedInfo", msg.AttachedInfo)
208+
}
209+
}
210+
msg.AttachedInfoElem = &attachedInfo
211+
msg.Content = ""
212+
return errs.Wrap(err)
138213
}
139214

140215
func msgHandleByContentType(msg *sdk_struct.MsgStruct) (err error) {

internal/interaction/long_conn_mgr.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package interaction
1616

1717
import (
18+
"bytes"
1819
"context"
1920
"encoding/json"
2021
"errors"
@@ -29,6 +30,7 @@ import (
2930

3031
"github.com/golang/protobuf/proto"
3132
"github.com/gorilla/websocket"
33+
"github.com/openimsdk/tools/mcontext"
3234

3335
"github.com/openimsdk/openim-sdk-core/v3/pkg/cliconf"
3436
"github.com/openimsdk/openim-sdk-core/v3/version"
@@ -106,6 +108,8 @@ type LongConnMgr struct {
106108
connWrite *sync.Mutex
107109

108110
sub *subscription
111+
112+
mb *MessageBatcher
109113
}
110114

111115
type Message struct {
@@ -129,6 +133,7 @@ func NewLongConnMgr(ctx context.Context, userOnline func(map[string][]int32), pu
129133
l.conn = NewWebSocket(WebSocket)
130134
l.connWrite = new(sync.Mutex)
131135
l.ctx = ctx
136+
l.mb = NewMessageBatcher(l.doBatch)
132137
return l
133138
}
134139

@@ -513,10 +518,11 @@ func (c *LongConnMgr) handleMessage(message []byte) error {
513518
if err := c.Syncer.NotifyResp(ctx, wsResp); err != nil {
514519
log.ZError(ctx, "notifyResp failed", err, "wsResp", wsResp)
515520
}
521+
c.mb.Close()
516522
return sdkerrs.ErrLoginOut
517523
case constant.KickOnlineMsg:
518524
log.ZDebug(ctx, "socket receive client kicked offline")
519-
525+
c.mb.Close()
520526
err = errs.ErrTokenKicked.WrapMsg("socket receive client kicked offline")
521527
ccontext.GetApiErrCodeCallback(ctx).OnError(ctx, err)
522528
return err
@@ -727,8 +733,37 @@ func (c *LongConnMgr) doPushMsg(ctx context.Context, wsResp GeneralWsResp) error
727733
if err != nil {
728734
return err
729735
}
730-
return common.DispatchPushMsg(ctx, &msg, c.pushMsgAndMaxSeqCh)
736+
log.ZDebug(ctx, "recv push msg", "msgNum", len(msg.Msgs), "notificationNum", len(msg.NotificationMsgs), "msg", &msg)
737+
c.mb.Enqueue(ctx, &msg)
738+
return nil
739+
}
740+
741+
func (c *LongConnMgr) doBatch(ctxs []context.Context, msg *sdkws.PushMessages) {
742+
var ctx context.Context
743+
switch len(ctxs) {
744+
case 0:
745+
return
746+
case 1:
747+
ctx = ctxs[0]
748+
default:
749+
var buf bytes.Buffer
750+
buf.WriteString("Batch_")
751+
for _, v := range ctxs {
752+
operationID := mcontext.GetOperationID(v)
753+
if operationID != "" {
754+
buf.WriteString(operationID)
755+
buf.WriteString("$")
756+
}
757+
}
758+
data := buf.Bytes()
759+
data = data[:len(data)-1]
760+
ctx = mcontext.SetOperationID(ctxs[0], string(data))
761+
}
762+
if err := common.DispatchPushMsg(ctx, msg, c.pushMsgAndMaxSeqCh); err != nil {
763+
log.ZError(ctx, "doBatch DispatchPushMsg", err, "msg", msg)
764+
}
731765
}
766+
732767
func (c *LongConnMgr) Close(ctx context.Context) {
733768
if c.GetConnectionStatus() == Connected {
734769
log.ZInfo(ctx, "network change conn close")

0 commit comments

Comments
 (0)