Skip to content

Commit 8b79a76

Browse files
authored
feat: support message cache (#3007)
* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache
1 parent 1110af9 commit 8b79a76

File tree

14 files changed

+277
-1063
lines changed

14 files changed

+277
-1063
lines changed

internal/msgtransfer/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ func Start(ctx context.Context, index int, config *Config) error {
108108
cm.Watch(ctx)
109109
}
110110

111-
msgModel := redis.NewMsgCache(rdb)
112111
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
113112
if err != nil {
114113
return err
115114
}
115+
msgModel := redis.NewMsgCache(rdb, msgDocModel)
116116
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
117117
if err != nil {
118118
return err

internal/msgtransfer/online_msg_to_mongo_handler.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,27 +77,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
7777
for _, msg := range msgFromMQ.MsgData {
7878
seqs = append(seqs, msg.Seq)
7979
}
80-
err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
81-
if err != nil {
82-
log.ZError(
83-
ctx,
84-
"remove cache msg from redis err",
85-
err,
86-
"msg",
87-
msgFromMQ.MsgData,
88-
"conversationID",
89-
msgFromMQ.ConversationID,
90-
)
91-
}
9280
}
9381

94-
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
82+
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
83+
9584
func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
9685

97-
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(
98-
sess sarama.ConsumerGroupSession,
99-
claim sarama.ConsumerGroupClaim,
100-
) error { // an instance in the consumer group
86+
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group
10187
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
10288
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
10389
for msg := range claim.Messages() {

internal/rpc/msg/delete.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,8 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
106106
return nil, err
107107
}
108108
remainTime := timeutil.GetCurrentTimestampBySecond() - req.Timestamp
109-
for _, conversationID := range req.ConversationIDs {
110-
if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, remainTime); err != nil {
111-
log.ZWarn(ctx, "DeleteConversationMsgsAndSetMinSeq error", err, "conversationID", conversationID, "err", err)
112-
}
109+
if _, err := m.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: remainTime, Limit: 9999}); err != nil {
110+
return nil, err
113111
}
114112
return &msg.DeleteMsgPhysicalResp{}, nil
115113
}

internal/rpc/msg/revoke.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
6363
log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data))
6464
var role int32
6565
if !authverify.IsAppManagerUid(ctx, m.config.Share.IMAdminUserID) {
66-
switch msgs[0].SessionType {
66+
sessionType := msgs[0].SessionType
67+
switch sessionType {
6768
case constant.SingleChatType:
6869
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config.Share.IMAdminUserID); err != nil {
6970
return nil, err
@@ -89,7 +90,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
8990
role = member.RoleLevel
9091
}
9192
default:
92-
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported")
93+
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported", "sessionType", sessionType)
9394
}
9495
}
9596
now := time.Now().UnixMilli()

internal/rpc/msg/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
8989
if err != nil {
9090
return err
9191
}
92-
msgModel := redis.NewMsgCache(rdb)
92+
msgModel := redis.NewMsgCache(rdb, msgDocModel)
9393
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
9494
if err != nil {
9595
return err

pkg/common/storage/cache/cachekey/msg.go

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,50 +15,16 @@
1515
package cachekey
1616

1717
import (
18-
"github.com/openimsdk/protocol/constant"
1918
"strconv"
2019
)
2120

2221
const (
23-
messageCache = "MESSAGE_CACHE:"
24-
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
25-
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
26-
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
27-
exTypeKeyLocker = "EX_LOCK:"
28-
reactionExSingle = "EX_SINGLE_"
29-
reactionWriteGroup = "EX_GROUP_"
30-
reactionReadGroup = "EX_SUPER_GROUP_"
31-
reactionNotification = "EX_NOTIFICATION_"
22+
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
23+
messageCache = "MSG_CACHE:"
3224
)
3325

34-
func GetMessageCacheKey(conversationID string, seq int64) string {
35-
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
36-
}
37-
38-
func GetMessageDelUserListKey(conversationID string, seq int64) string {
39-
return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq))
40-
}
41-
42-
func GetUserDelListKey(conversationID, userID string) string {
43-
return userDelMessagesList + conversationID + ":" + userID
44-
}
45-
46-
func GetMessageReactionExKey(clientMsgID string, sessionType int32) string {
47-
switch sessionType {
48-
case constant.SingleChatType:
49-
return reactionExSingle + clientMsgID
50-
case constant.WriteGroupChatType:
51-
return reactionWriteGroup + clientMsgID
52-
case constant.ReadGroupChatType:
53-
return reactionReadGroup + clientMsgID
54-
case constant.NotificationChatType:
55-
return reactionNotification + clientMsgID
56-
}
57-
58-
return ""
59-
}
60-
func GetLockMessageTypeKey(clientMsgID string, TypeKey string) string {
61-
return exTypeKeyLocker + clientMsgID + "_" + TypeKey
26+
func GetMsgCacheKey(conversationID string, seq int64) string {
27+
return messageCache + conversationID + ":" + strconv.Itoa(int(seq))
6228
}
6329

6430
func GetSendMsgKey(id string) string {

pkg/common/storage/cache/msg.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,14 @@ package cache
1616

1717
import (
1818
"context"
19-
"time"
20-
21-
"github.com/openimsdk/protocol/sdkws"
19+
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
2220
)
2321

2422
type MsgCache interface {
25-
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
26-
SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
27-
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
2823
SetSendMsgStatus(ctx context.Context, id string, status int32) error
2924
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
30-
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
31-
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
32-
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
33-
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
34-
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
35-
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
36-
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
37-
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
25+
26+
GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
27+
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
28+
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error
3829
}

pkg/common/storage/cache/redis/msg.go

Lines changed: 52 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -2,87 +2,39 @@ package redis
22

33
import (
44
"context"
5+
"encoding/json"
6+
"github.com/dtm-labs/rockscache"
57
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
68
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
7-
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
8-
"github.com/openimsdk/protocol/sdkws"
9+
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
10+
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
911
"github.com/openimsdk/tools/errs"
1012
"github.com/openimsdk/tools/utils/datautil"
1113
"github.com/redis/go-redis/v9"
1214
"time"
1315
) //
1416

1517
// msgCacheTimeout is expiration time of message cache, 86400 seconds
16-
const msgCacheTimeout = 86400
18+
const msgCacheTimeout = time.Hour * 24
1719

18-
func NewMsgCache(client redis.UniversalClient) cache.MsgCache {
19-
return &msgCache{rdb: client}
20+
func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache {
21+
return &msgCache{
22+
rdb: client,
23+
rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()),
24+
msgDocDatabase: db,
25+
}
2026
}
2127

2228
type msgCache struct {
23-
rdb redis.UniversalClient
24-
}
25-
26-
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
27-
return cachekey.GetMessageCacheKey(conversationID, seq)
28-
}
29-
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
30-
return cachekey.GetMessageDelUserListKey(conversationID, seq)
31-
}
32-
33-
func (c *msgCache) getUserDelList(conversationID, userID string) string {
34-
return cachekey.GetUserDelListKey(conversationID, userID)
29+
rdb redis.UniversalClient
30+
rcClient *rockscache.Client
31+
msgDocDatabase database.Msg
3532
}
3633

3734
func (c *msgCache) getSendMsgKey(id string) string {
3835
return cachekey.GetSendMsgKey(id)
3936
}
4037

41-
func (c *msgCache) getLockMessageTypeKey(clientMsgID string, TypeKey string) string {
42-
return cachekey.GetLockMessageTypeKey(clientMsgID, TypeKey)
43-
}
44-
45-
func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
46-
return cachekey.GetMessageReactionExKey(clientMsgID, sessionType)
47-
}
48-
49-
func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
50-
msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string {
51-
return c.getMessageCacheKey(conversationID, msg.Seq)
52-
})
53-
keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string {
54-
return c.getMessageCacheKey(conversationID, msg.Seq)
55-
})
56-
err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
57-
var values []string
58-
for _, key := range keys {
59-
if msg, ok := msgMap[key]; ok {
60-
s, err := msgprocessor.Pb2String(msg)
61-
if err != nil {
62-
return err
63-
}
64-
values = append(values, s)
65-
}
66-
}
67-
return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, msgCacheTimeout)
68-
})
69-
if err != nil {
70-
return 0, err
71-
}
72-
return len(msgs), nil
73-
}
74-
75-
func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
76-
var keys []string
77-
for _, seq := range seqs {
78-
keys = append(keys, c.getMessageCacheKey(conversationID, seq))
79-
}
80-
81-
return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
82-
return LuaDeleteBatch(ctx, c.rdb, keys)
83-
})
84-
}
85-
8638
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
8739
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
8840
}
@@ -92,81 +44,53 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
9244
return int32(result), errs.Wrap(err)
9345
}
9446

95-
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
96-
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
97-
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
98-
}
99-
100-
func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
101-
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
102-
return errs.Wrap(c.rdb.Del(ctx, key).Err())
47+
func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
48+
if len(seqs) == 0 {
49+
return nil, nil
50+
}
51+
getKey := func(seq int64) string {
52+
return cachekey.GetMsgCacheKey(conversationID, seq)
53+
}
54+
getMsgID := func(msg *model.MsgInfoModel) int64 {
55+
return msg.Msg.Seq
56+
}
57+
find := func(ctx context.Context, seqs []int64) ([]*model.MsgInfoModel, error) {
58+
return c.msgDocDatabase.FindSeqs(ctx, conversationID, seqs)
59+
}
60+
return batchGetCache2(ctx, c.rcClient, msgCacheTimeout, seqs, getKey, getMsgID, find)
10361
}
10462

105-
func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
106-
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
63+
func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error {
64+
if len(seqs) == 0 {
65+
return nil
66+
}
67+
keys := datautil.Slice(seqs, func(seq int64) string {
68+
return cachekey.GetMsgCacheKey(conversationID, seq)
69+
})
70+
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys)
10771
if err != nil {
108-
return false, errs.Wrap(err)
72+
return err
10973
}
110-
111-
return n > 0, nil
112-
}
113-
114-
func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
115-
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
116-
}
117-
118-
func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
119-
val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()
120-
return val, errs.Wrap(err)
121-
}
122-
123-
func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
124-
val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()
125-
return val, errs.Wrap(err)
126-
}
127-
128-
func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
129-
val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
130-
return val, errs.Wrap(err)
131-
}
132-
133-
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
134-
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
74+
for _, keys := range slotKeys {
75+
if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil {
76+
return err
77+
}
78+
}
79+
return nil
13580
}
13681

137-
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
138-
var keys []string
139-
keySeqMap := make(map[string]int64, 10)
140-
for _, seq := range seqs {
141-
key := c.getMessageCacheKey(conversationID, seq)
142-
keys = append(keys, key)
143-
keySeqMap[key] = seq
144-
}
145-
err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
146-
result, err := LuaGetBatch(ctx, c.rdb, keys)
82+
func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error {
83+
for _, msg := range msgs {
84+
if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 {
85+
continue
86+
}
87+
data, err := json.Marshal(msg)
14788
if err != nil {
14889
return err
14990
}
150-
for i, value := range result {
151-
seq := keySeqMap[keys[i]]
152-
if value == nil {
153-
failedSeqs = append(failedSeqs, seq)
154-
continue
155-
}
156-
157-
msg := &sdkws.MsgData{}
158-
msgString, ok := value.(string)
159-
if !ok || msgprocessor.String2Pb(msgString, msg) != nil {
160-
failedSeqs = append(failedSeqs, seq)
161-
continue
162-
}
163-
seqMsgs = append(seqMsgs, msg)
164-
91+
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil {
92+
return err
16593
}
166-
return nil
167-
})
168-
if err != nil {
169-
return nil, nil, err
17094
}
171-
return seqMsgs, failedSeqs, nil
95+
return nil
17296
}

0 commit comments

Comments
 (0)