Skip to content

Commit 17b84d0

Browse files
authored
fix: can not receive OnJoinedGroupAdded notification (#1082)
* refactor(group):let GetGroupMembersInfo use DataFetcher. * fix: can not receive OnJoinedGroupAdded notification (#1081) * fix: can not receive OnJoinedGroupAdded notification * refactor(group):let GetGroupMembersInfo use DataFetcher.
1 parent e59db4e commit 17b84d0

File tree

4 files changed

+160
-21
lines changed

4 files changed

+160
-21
lines changed

internal/conversation_msg/conversation_msg.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ func (c *Conversation) batchAddFaceURLAndName(ctx context.Context, conversations
851851
return err
852852
}
853853

854-
groupInfoList, err := c.group.GetSpecifiedGroupsInfo(ctx, groupIDs)
854+
groupInfoList, err := c.group.GetSpecifiedGroupsInfoSafe(ctx, groupIDs)
855855
if err != nil {
856856
return err
857857
}

internal/group/api.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,74 @@ func (g *Group) GetSpecifiedGroupsInfo(ctx context.Context, groupIDs []string) (
228228
return dataFetcher.FetchMissingAndCombineLocal(ctx, groupIDs)
229229
}
230230

231+
// GetSpecifiedGroupsInfoSafe fetches group info without writing to local storage or touching version sync.
232+
func (g *Group) GetSpecifiedGroupsInfoSafe(ctx context.Context, groupIDs []string) ([]*model_struct.LocalGroup, error) {
233+
if len(groupIDs) == 0 {
234+
return nil, nil
235+
}
236+
237+
dataFetcher := datafetcher.NewDataFetcher(
238+
g.db,
239+
g.groupTableName(),
240+
g.loginUserID,
241+
func(localGroup *model_struct.LocalGroup) string {
242+
return localGroup.GroupID
243+
},
244+
func(ctx context.Context, values []*model_struct.LocalGroup) error {
245+
return nil
246+
},
247+
func(ctx context.Context, groupIDs []string) ([]*model_struct.LocalGroup, bool, error) {
248+
var (
249+
res []*model_struct.LocalGroup
250+
needDB []string
251+
)
252+
253+
for _, groupID := range groupIDs {
254+
if v, ok := g.groupInfoCache.Load(groupID); ok {
255+
res = append(res, v)
256+
} else {
257+
needDB = append(needDB, groupID)
258+
}
259+
}
260+
261+
if len(needDB) == 0 {
262+
return res, false, nil
263+
}
264+
265+
localGroups, err := g.db.GetGroups(ctx, needDB)
266+
if err != nil {
267+
return nil, false, err
268+
}
269+
270+
localMap := datautil.SliceToMap(localGroups, func(e *model_struct.LocalGroup) string {
271+
return e.GroupID
272+
})
273+
for _, info := range localGroups {
274+
g.groupInfoCache.Store(info.GroupID, info)
275+
res = append(res, info)
276+
}
277+
278+
if len(localMap) == len(needDB) {
279+
return res, false, nil
280+
}
281+
282+
return res, true, nil
283+
},
284+
func(ctx context.Context, groupIDs []string) ([]*model_struct.LocalGroup, error) {
285+
serverGroupInfo, err := g.getGroupsInfoFromServer(ctx, groupIDs)
286+
if err != nil {
287+
return nil, err
288+
}
289+
converted := datautil.Batch(ServerGroupToLocalGroup, serverGroupInfo)
290+
for _, info := range converted {
291+
g.groupInfoCache.Store(info.GroupID, info)
292+
}
293+
return converted, nil
294+
},
295+
)
296+
return dataFetcher.FetchMissingAndCombineLocal(ctx, groupIDs)
297+
}
298+
231299
func (g *Group) SearchGroups(ctx context.Context, param sdk_params_callback.SearchGroupsParam) ([]*model_struct.LocalGroup, error) {
232300
if len(param.KeywordList) == 0 || (!param.IsSearchGroupName && !param.IsSearchGroupID) {
233301
return nil, sdkerrs.ErrArgs.WrapMsg("keyword is null or search field all false")

internal/group/cache.go

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package group
33
import (
44
"context"
55

6+
"github.com/openimsdk/openim-sdk-core/v3/pkg/datafetcher"
67
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
78
"github.com/openimsdk/tools/log"
89
"github.com/openimsdk/tools/utils/datautil"
@@ -45,25 +46,79 @@ func (g *Group) GetGroupMembersInfoFunc(ctx context.Context, groupID string, use
4546
}
4647

4748
func (g *Group) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) (map[string]*model_struct.LocalGroupMember, error) {
48-
return g.GetGroupMembersInfoFunc(ctx, groupID, userIDs, func(ctx context.Context, dbKeys []string) ([]*model_struct.LocalGroupMember, error) {
49-
if len(dbKeys) == 0 {
50-
return nil, nil
51-
}
52-
dbData, err := g.db.GetGroupSomeMemberInfo(ctx, groupID, dbKeys)
53-
if err != nil {
54-
return nil, err
55-
}
56-
queryKeys := datautil.SliceSubAny(dbKeys, dbData, func(t *model_struct.LocalGroupMember) string {
57-
return t.UserID
58-
})
59-
if len(queryKeys) != 0 {
60-
queryData, err := g.getDesignatedGroupMembers(ctx, groupID, queryKeys)
49+
res := make(map[string]*model_struct.LocalGroupMember)
50+
if len(userIDs) == 0 {
51+
return res, nil
52+
}
53+
54+
dataFetcher := datafetcher.NewDataFetcher(
55+
g.db,
56+
g.groupAndMemberVersionTableName(),
57+
groupID,
58+
func(member *model_struct.LocalGroupMember) string {
59+
return member.UserID
60+
},
61+
func(ctx context.Context, values []*model_struct.LocalGroupMember) error {
62+
return nil
63+
},
64+
func(ctx context.Context, userIDs []string) ([]*model_struct.LocalGroupMember, bool, error) {
65+
var (
66+
localData []*model_struct.LocalGroupMember
67+
needDB []string
68+
)
69+
70+
for _, userID := range userIDs {
71+
key := g.buildGroupMemberKey(groupID, userID)
72+
if member, ok := g.groupMemberCache.Load(key); ok {
73+
localData = append(localData, member)
74+
} else {
75+
needDB = append(needDB, userID)
76+
}
77+
}
78+
79+
if len(needDB) == 0 {
80+
return localData, false, nil
81+
}
82+
83+
dbData, err := g.db.GetGroupSomeMemberInfo(ctx, groupID, needDB)
84+
if err != nil {
85+
return nil, false, err
86+
}
87+
88+
for _, member := range dbData {
89+
g.groupMemberCache.Store(g.buildGroupMemberKey(groupID, member.UserID), member)
90+
localData = append(localData, member)
91+
}
92+
93+
if len(dbData) == len(needDB) {
94+
return localData, false, nil
95+
}
96+
97+
return localData, true, nil
98+
},
99+
func(ctx context.Context, userIDs []string) ([]*model_struct.LocalGroupMember, error) {
100+
if len(userIDs) == 0 {
101+
return nil, nil
102+
}
103+
queryData, err := g.getDesignatedGroupMembers(ctx, groupID, userIDs)
61104
if err != nil {
62105
return nil, err
63106
}
107+
converted := datautil.Batch(ServerGroupMemberToLocalGroupMember, queryData)
108+
for _, member := range converted {
109+
g.groupMemberCache.Store(g.buildGroupMemberKey(groupID, member.UserID), member)
110+
}
111+
return converted, nil
112+
},
113+
)
64114

65-
dbData = append(dbData, datautil.Batch(ServerGroupMemberToLocalGroupMember, queryData)...)
66-
}
67-
return dbData, nil
68-
})
115+
fetchData, err := dataFetcher.FetchMissingAndCombineLocal(ctx, userIDs)
116+
if err != nil {
117+
return nil, err
118+
}
119+
120+
for _, member := range fetchData {
121+
res[member.UserID] = member
122+
}
123+
return res, nil
69124
}

internal/group/group.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func NewGroup(
5252
}
5353
g.initSyncer()
5454
g.groupMemberCache = cache.NewCache[string, *model_struct.LocalGroupMember]()
55+
g.groupInfoCache = cache.NewCache[string, *model_struct.LocalGroup]()
5556
return g
5657
}
5758

@@ -65,15 +66,21 @@ type Group struct {
6566
groupSyncMutex sync.Mutex
6667
listenerForService open_im_sdk_callback.OnListenerForService
6768
groupMemberCache *cache.Cache[string, *model_struct.LocalGroupMember]
69+
groupInfoCache *cache.Cache[string, *model_struct.LocalGroup]
6870
filter *NotificationFilter
6971
}
7072

7173
func (g *Group) initSyncer() {
7274
g.groupSyncer = syncer.New2[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](
7375
syncer.WithInsert[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](func(ctx context.Context, value *model_struct.LocalGroup) error {
74-
return g.db.InsertGroup(ctx, value)
76+
if err := g.db.InsertGroup(ctx, value); err != nil {
77+
return err
78+
}
79+
g.groupInfoCache.Store(value.GroupID, value)
80+
return nil
7581
}),
7682
syncer.WithDelete[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](func(ctx context.Context, value *model_struct.LocalGroup) error {
83+
g.groupInfoCache.Delete(value.GroupID)
7784
if err := g.db.DeleteGroupAllMembers(ctx, value.GroupID); err != nil {
7885
return err
7986
}
@@ -84,7 +91,11 @@ func (g *Group) initSyncer() {
8491
}),
8592
syncer.WithUpdate[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](func(ctx context.Context, server, local *model_struct.LocalGroup) error {
8693
log.ZInfo(ctx, "groupSyncer trigger update function", "groupID", server.GroupID, "server", server, "local", local)
87-
return g.db.UpdateGroup(ctx, server)
94+
if err := g.db.UpdateGroup(ctx, server); err != nil {
95+
return err
96+
}
97+
g.groupInfoCache.Store(server.GroupID, server)
98+
return nil
8899
}),
89100
syncer.WithUUID[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](func(value *model_struct.LocalGroup) string {
90101
return value.GroupID
@@ -130,9 +141,14 @@ func (g *Group) initSyncer() {
130141
}),
131142

132143
syncer.WithBatchInsert[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](func(ctx context.Context, values []*model_struct.LocalGroup) error {
133-
return g.db.BatchInsertGroup(ctx, values)
144+
if err := g.db.BatchInsertGroup(ctx, values); err != nil {
145+
return err
146+
}
147+
g.groupInfoCache.StoreAll(func(v *model_struct.LocalGroup) string { return v.GroupID }, values)
148+
return nil
134149
}),
135150
syncer.WithDeleteAll[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](func(ctx context.Context, _ string) error {
151+
g.groupInfoCache.DeleteAll()
136152
return g.db.DeleteAllGroup(ctx)
137153
}),
138154
syncer.WithBatchPageReq[*model_struct.LocalGroup, group.GetJoinedGroupListResp, string](func(entityID string) page.PageReq {

0 commit comments

Comments
 (0)