Skip to content

Commit 0e3879a

Browse files
authored
feat: implement DeleteConversations interface. (#3549)
* feat: implement DeleteConversations interface. * remove unused comment. * update logic and rename method.
1 parent 390d253 commit 0e3879a

File tree

12 files changed

+115
-10
lines changed

12 files changed

+115
-10
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/gorilla/websocket v1.5.1
1313
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
1414
github.com/mitchellh/mapstructure v1.5.0
15-
github.com/openimsdk/protocol v0.0.73-alpha.14
15+
github.com/openimsdk/protocol v0.0.73-alpha.17
1616
github.com/openimsdk/tools v0.0.50-alpha.103
1717
github.com/pkg/errors v0.9.1 // indirect
1818
github.com/prometheus/client_golang v1.18.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
347347
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
348348
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
349349
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
350-
github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE=
351-
github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
350+
github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE=
351+
github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
352352
github.com/openimsdk/tools v0.0.50-alpha.103 h1:jYvI86cWiVu8a8iw1panw+pwIiStuUHF76h3fxA6ESI=
353353
github.com/openimsdk/tools v0.0.50-alpha.103/go.mod h1:qCExFBqXpQBMzZck3XGIFwivBayAn2KNqB3WAd++IJw=
354354
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=

internal/api/conversation.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
7676
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
7777
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
7878
}
79+
80+
func (o *ConversationApi) DeleteConversations(c *gin.Context) {
81+
a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client)
82+
}

internal/api/router.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
277277
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
278278
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
279279
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
280+
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
280281
}
281282

282283
{

internal/msgtransfer/online_history_msg_handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"encoding/json"
2020
"errors"
21+
2122
"github.com/openimsdk/tools/mq"
2223

2324
"sync"

internal/rpc/conversation/conversation.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
3838
"github.com/openimsdk/protocol/constant"
3939
pbconversation "github.com/openimsdk/protocol/conversation"
40+
"github.com/openimsdk/protocol/msg"
4041
"github.com/openimsdk/protocol/sdkws"
4142
"github.com/openimsdk/tools/discovery"
4243
"github.com/openimsdk/tools/errs"
@@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
795796
}
796797
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
797798
for i, conversation := range conversations {
798-
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
799+
if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 {
799800
continue
800801
}
801802
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
@@ -835,3 +836,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
835836
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
836837
return nil
837838
}
839+
840+
func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) {
841+
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
842+
return nil, err
843+
}
844+
if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 {
845+
return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set")
846+
}
847+
848+
if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 {
849+
return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set")
850+
}
851+
852+
var needDeleteConversationIDs []string
853+
854+
if len(req.ConversationIDs) == 0 {
855+
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
856+
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
857+
if err != nil {
858+
return nil, err
859+
}
860+
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
861+
UserID: req.OwnerUserID,
862+
ConversationIDs: conversationIDs,
863+
})
864+
if err != nil {
865+
return nil, err
866+
}
867+
868+
for conversationID, msg := range latestMsgs.Msgs {
869+
if msg.SendTime < deleteTimeThreshold {
870+
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
871+
}
872+
}
873+
874+
if len(needDeleteConversationIDs) == 0 {
875+
return &pbconversation.DeleteConversationsResp{}, nil
876+
}
877+
} else {
878+
needDeleteConversationIDs = req.ConversationIDs
879+
}
880+
881+
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
882+
return nil, err
883+
}
884+
885+
// c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
886+
887+
return &pbconversation.DeleteConversationsResp{}, nil
888+
}

internal/rpc/conversation/notification.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
7373

7474
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
7575
}
76+
77+
func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) {
78+
tips := &sdkws.ConversationDeleteTips{
79+
UserID: userID,
80+
ConversationIDs: conversationIDs,
81+
}
82+
83+
c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips)
84+
}

pkg/common/storage/cache/conversation.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package cache
1616

1717
import (
1818
"context"
19+
1920
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
2021
)
2122

@@ -57,7 +58,7 @@ type ConversationCache interface {
5758
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
5859
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
5960
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
60-
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
61+
DelUserPinnedConversations(userIDs ...string) ConversationCache
6162
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
6263

6364
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)

pkg/common/storage/cache/redis/conversation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
253253
return cache
254254
}
255255

256-
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
256+
func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache {
257257
cache := c.CloneConversationCache()
258258
for _, userID := range userIDs {
259259
cache.AddKeys(c.getPinnedConversationIDsKey(userID))

pkg/common/storage/controller/conversation.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ type ConversationDatabase interface {
7878
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
7979
// FindRandConversation finds random conversations based on the specified timestamp and limit.
8080
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
81+
82+
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
8183
}
8284

8385
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@@ -120,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
120122
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
121123
}
122124
if _, ok := fieldMap["is_pinned"]; ok {
123-
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
125+
cache = cache.DelUserPinnedConversations(userIDs...)
124126
}
125127
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
126128
}
@@ -172,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
172174
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
173175
}
174176
if _, ok := args["is_pinned"]; ok {
175-
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
177+
cache = cache.DelUserPinnedConversations(userIDs...)
176178
}
177179
return cache.ChainExecDel(ctx)
178180
}
@@ -203,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
203205
DelUserConversationIDsHash(userIDs...).
204206
DelConversationVersionUserIDs(userIDs...).
205207
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
206-
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
208+
DelUserPinnedConversations(pinnedUserIDs...).
207209
ChainExecDel(ctx)
208210
}
209211

@@ -259,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
259261
cache := c.cache.CloneConversationCache()
260262
cache = cache.DelConversationVersionUserIDs(ownerUserID).
261263
DelConversationNotNotifyMessageUserIDs(ownerUserID).
262-
DelConversationPinnedMessageUserIDs(ownerUserID)
264+
DelUserPinnedConversations(ownerUserID)
263265

264266
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
265267
return e.GroupID, e.GroupID != ""
@@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
429431
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
430432
return c.conversationDB.FindRandConversation(ctx, ts, limit)
431433
}
434+
435+
func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
436+
return c.tx.Transaction(ctx, func(ctx context.Context) error {
437+
err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs)
438+
if err != nil {
439+
return err
440+
}
441+
cache := c.cache.CloneConversationCache()
442+
cache = cache.DelConversations(userID, conversationIDs...).
443+
DelConversationVersionUserIDs(userID).
444+
DelConversationIDs(userID).
445+
DelUserConversationIDsHash(userID).
446+
DelConversationNotNotifyMessageUserIDs(userID).
447+
DelUserPinnedConversations(userID)
448+
449+
return cache.ChainExecDel(ctx)
450+
})
451+
}

0 commit comments

Comments
 (0)