Skip to content

Commit 579db3b

Browse files
authored
bugfix(conversation):removed unexpectedly called functions and itself to avoid out of index query. (#3668)
# Conflicts: # internal/rpc/conversation/conversation.go # pkg/common/storage/database/mgo/conversation.go
1 parent a0e6d9a commit 579db3b

File tree

6 files changed

+10
-87
lines changed

6 files changed

+10
-87
lines changed

internal/rpc/conversation/conversation.go

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -513,14 +513,6 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req
513513
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
514514
}
515515

516-
func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) {
517-
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs)
518-
if err != nil {
519-
return nil, err
520-
}
521-
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
522-
}
523-
524516
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
525517
if req.ConversationID == "" {
526518
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
@@ -717,56 +709,6 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
717709
}, nil
718710
}
719711

720-
func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) {
721-
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
722-
if err != nil {
723-
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
724-
return nil, err
725-
}
726-
const batchNum = 100
727-
728-
if num == 0 {
729-
return nil, errs.New("Need Destruct Msg is nil").Wrap()
730-
}
731-
732-
maxPage := (num + batchNum - 1) / batchNum
733-
734-
temp := make([]*dbModel.Conversation, 0, maxPage*batchNum)
735-
736-
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
737-
pagination := &sdkws.RequestPagination{
738-
PageNumber: int32(pageNumber),
739-
ShowNumber: batchNum,
740-
}
741-
742-
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
743-
if err != nil {
744-
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
745-
continue
746-
}
747-
748-
// log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
749-
if len(conversationIDs) == 0 {
750-
continue
751-
}
752-
753-
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
754-
if err != nil {
755-
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
756-
continue
757-
}
758-
759-
for _, conversation := range conversations {
760-
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
761-
conversation.LatestMsgDestructTime.IsZero()) {
762-
temp = append(temp, conversation)
763-
}
764-
}
765-
}
766-
767-
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
768-
}
769-
770712
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
771713
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
772714
return nil, err

internal/rpc/msg/delete.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919

2020
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
2121
"github.com/openimsdk/protocol/constant"
22-
"github.com/openimsdk/protocol/conversation"
2322
"github.com/openimsdk/protocol/msg"
2423
"github.com/openimsdk/protocol/sdkws"
2524
"github.com/openimsdk/tools/log"
@@ -74,7 +73,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
7473
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
7574
return nil, err
7675
}
77-
conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID)
76+
conv, err := m.conversationClient.GetConversation(ctx, req.ConversationID, req.UserID)
7877
if err != nil {
7978
return nil, err
8079
}
@@ -116,14 +115,12 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
116115
}
117116

118117
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
119-
conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs)
118+
conversations, err := m.conversationClient.GetConversations(ctx, conversationIDs, userID)
120119
if err != nil {
121120
return err
122121
}
123-
var existConversations []*conversation.Conversation
124122
var existConversationIDs []string
125123
for _, conversation := range conversations {
126-
existConversations = append(existConversations, conversation)
127124
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
128125
}
129126
log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs)
@@ -152,7 +149,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
152149
if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil {
153150
return err
154151
}
155-
for _, conversation := range existConversations {
152+
for _, conversation := range conversations {
156153
tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}}
157154
m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips)
158155
}

pkg/common/storage/controller/conversation.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ type ConversationDatabase interface {
6161
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
6262
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
6363
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
64-
// GetConversationsByConversationID retrieves conversations by their IDs.
65-
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error)
6664
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
6765
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error)
6866
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
@@ -375,10 +373,6 @@ func (c *conversationDatabase) PageConversationIDs(ctx context.Context, paginati
375373
return c.conversationDB.PageConversationIDs(ctx, pagination)
376374
}
377375

378-
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) {
379-
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
380-
}
381-
382376
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) {
383377
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
384378
}

pkg/common/storage/database/conversation.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ type Conversation interface {
3939
GetAllConversationIDs(ctx context.Context) ([]string, error)
4040
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
4141
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
42-
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
4342
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
4443
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
4544
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)

pkg/common/storage/database/mgo/conversation.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
4747
},
4848
Options: options.Index(),
4949
},
50+
{
51+
Keys: bson.D{
52+
{Key: "conversation_id", Value: 1},
53+
},
54+
Options: options.Index().SetUnique(true),
55+
},
5056
})
5157
if err != nil {
5258
return nil, errs.Wrap(err)
@@ -232,10 +238,6 @@ func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pa
232238
return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1}))
233239
}
234240

235-
func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
236-
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}})
237-
}
238-
239241
func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) {
240242
// "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"
241243
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{

pkg/rpcli/conversation.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rpcli
22

33
import (
44
"context"
5+
56
"github.com/openimsdk/protocol/conversation"
67
"google.golang.org/grpc"
78
)
@@ -30,18 +31,6 @@ func (x *ConversationClient) SetConversations(ctx context.Context, ownerUserIDs
3031
return ignoreResp(x.ConversationClient.SetConversations(ctx, req))
3132
}
3233

33-
func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
34-
if len(conversationIDs) == 0 {
35-
return nil, nil
36-
}
37-
req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs}
38-
return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations)
39-
}
40-
41-
func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) {
42-
return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID}))
43-
}
44-
4534
func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
4635
if len(ownerUserIDs) == 0 {
4736
return nil

0 commit comments

Comments
 (0)