Skip to content

Commit 8842f5f

Browse files
committed
fix: optimize conversation message handling and improve message synchronization
1 parent 1b22456 commit 8842f5f

File tree

2 files changed

+58
-20
lines changed

2 files changed

+58
-20
lines changed

internal/conversation_msg/conversation_msg.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"math"
99
"sync"
1010

11+
"sort"
12+
"time"
13+
1114
"github.com/openimsdk/openim-sdk-core/v3/internal/group"
1215
"github.com/openimsdk/openim-sdk-core/v3/internal/interaction"
1316
"github.com/openimsdk/openim-sdk-core/v3/internal/relation"
@@ -30,9 +33,6 @@ import (
3033
"github.com/openimsdk/tools/log"
3134
"github.com/openimsdk/tools/utils/datautil"
3235

33-
"sort"
34-
"time"
35-
3636
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
3737
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
3838
)
@@ -225,6 +225,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
225225

226226
for conversationID, msgs := range allMsg {
227227
conversationIDs = append(conversationIDs, conversationID)
228+
228229
log.ZDebug(ctx, "parse message in one conversation", "conversationID", conversationID, "message length", len(msgs.Msgs), "data", msgs.Msgs)
229230

230231
clientIDs := make([]string, 0, len(msgs.Msgs))
@@ -436,27 +437,30 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
436437
}
437438
}
438439

439-
if err := c.db.BatchUpdateConversationList(ctx, append(mapConversationToList(conversationChangedSet), mapConversationToList(phConversationChangedSet)...)); err != nil {
440+
if err := c.db.BatchUpdateConversationList(ctx, append(datautil.Values(conversationChangedSet), datautil.Values(phConversationChangedSet)...)); err != nil {
440441
log.ZError(ctx, "insert changed conversation err :", err)
441442
}
442443
//New conversation storage
443444

444-
if err := c.db.BatchInsertConversationList(ctx, mapConversationToList(phNewConversationSet)); err != nil {
445+
if err := c.db.BatchInsertConversationList(ctx, datautil.Values(phNewConversationSet)); err != nil {
445446
log.ZError(ctx, "insert new conversation err:", err)
446447
}
447448
log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
448449

449-
c.newMessage(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
450-
450+
if c.batchMsgListener() != nil {
451+
c.batchNewMessages(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
452+
} else {
453+
c.newMessage(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
454+
}
451455
if len(newConversationSet) > 0 {
452-
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.NewConDirect, Args: utils.StructToJsonString(mapConversationToList(newConversationSet))}})
456+
c.ConversationListener().OnNewConversation(utils.StructToJsonString(datautil.Values(newConversationSet)))
453457
}
454458
if len(conversationChangedSet) > 0 {
455-
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.ConChangeDirect, Args: utils.StructToJsonString(mapConversationToList(conversationChangedSet))}})
459+
c.ConversationListener().OnConversationChanged(utils.StructToJsonString(datautil.Values(conversationChangedSet)))
456460
}
457461

458462
if isTriggerUnReadCount {
459-
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.TotalUnreadMessageChanged, Args: ""}})
463+
_ = c.OnTotalUnreadMessageCountChanged(ctx)
460464
}
461465

462466
for _, msgs := range allMsg {
@@ -474,6 +478,18 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
474478
log.ZDebug(ctx, "insert msg", "duration", fmt.Sprintf("%dms", time.Since(b)), "len", len(allMsg))
475479
}
476480

481+
func (c *Conversation) OnTotalUnreadMessageCountChanged(ctx context.Context) error {
482+
log.ZInfo(ctx, "OnTotalUnreadMessageCountChanged", "caller", common.GetCaller(2))
483+
totalUnreadCount, err := c.db.GetTotalUnreadMsgCountDB(ctx)
484+
if err != nil {
485+
log.ZWarn(ctx, "TotalUnreadMessageChanged GetTotalUnreadMsgCountDB err", err)
486+
} else {
487+
log.ZDebug(ctx, "TotalUnreadMessageChanged", "totalUnreadCount", totalUnreadCount)
488+
c.ConversationListener().OnTotalUnreadMessageCountChanged(totalUnreadCount)
489+
}
490+
return nil
491+
}
492+
477493
func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
478494
allMsg := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Msgs
479495
ctx := c2v.Ctx

internal/interaction/msg_sync.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -317,31 +317,53 @@ func (m *MsgSyncer) doPushMsg(ctx context.Context, push *sdkws.PushMessages) {
317317
m.pushTriggerAndSync(ctx, push.NotificationMsgs, m.triggerNotification)
318318
}
319319

320-
func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pushMessages map[string]*sdkws.PullMsgs, triggerFunc func(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error) {
320+
func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pushMessages map[string]*sdkws.PullMsgs,
321+
triggerFunc func(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error) {
321322
if len(pushMessages) == 0 {
322323
return
323324
}
325+
324326
needSyncSeqMap := make(map[string][2]int64)
325-
var lastSeq int64
326-
var storageMsgs []*sdkws.MsgData
327+
res := make(map[string]*sdkws.PullMsgs)
328+
327329
for conversationID, msgs := range pushMessages {
330+
var (
331+
lastSeq int64
332+
storageMsgs []*sdkws.MsgData
333+
)
334+
328335
for _, msg := range msgs.Msgs {
329336
if msg.Seq == 0 {
330-
_ = triggerFunc(ctx, map[string]*sdkws.PullMsgs{conversationID: {Msgs: []*sdkws.MsgData{msg}}})
337+
_ = triggerFunc(ctx, map[string]*sdkws.PullMsgs{
338+
conversationID: {Msgs: []*sdkws.MsgData{msg}},
339+
})
331340
continue
332341
}
333342
lastSeq = msg.Seq
334343
storageMsgs = append(storageMsgs, msg)
335344
}
336-
if lastSeq == m.syncedMaxSeqs[conversationID]+int64(len(storageMsgs)) && lastSeq != 0 {
337-
log.ZDebug(ctx, "trigger msgs", "msgs", storageMsgs)
338-
_ = triggerFunc(ctx, map[string]*sdkws.PullMsgs{conversationID: {Msgs: storageMsgs}})
345+
346+
if len(storageMsgs) == 0 {
347+
continue
348+
}
349+
350+
expectedLast := m.syncedMaxSeqs[conversationID] + int64(len(storageMsgs))
351+
if lastSeq == expectedLast {
352+
log.ZDebug(ctx, "trigger msgs", "conversationID", conversationID, "msgs", storageMsgs)
353+
res[conversationID] = &sdkws.PullMsgs{Msgs: storageMsgs}
339354
m.syncedMaxSeqs[conversationID] = lastSeq
340-
} else if lastSeq != 0 && lastSeq > m.syncedMaxSeqs[conversationID] {
341-
//must pull message when message type is notification
342-
needSyncSeqMap[conversationID] = [2]int64{m.syncedMaxSeqs[conversationID] + 1, lastSeq}
355+
} else if lastSeq > m.syncedMaxSeqs[conversationID] {
356+
// must pull message when message type is notification
357+
needSyncSeqMap[conversationID] = [2]int64{
358+
m.syncedMaxSeqs[conversationID] + 1,
359+
lastSeq,
360+
}
343361
}
344362
}
363+
364+
if len(res) > 0 {
365+
_ = triggerFunc(ctx, res)
366+
}
345367
m.syncAndTriggerMsgs(ctx, needSyncSeqMap, defaultPullNums)
346368
}
347369

0 commit comments

Comments
 (0)