Skip to content

Commit b48020f

Browse files
author
Jeff Yanta
committed
Implement GetChats RPC limited to anonymous chat membership
1 parent c203be6 commit b48020f

File tree

7 files changed

+339
-11
lines changed

7 files changed

+339
-11
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
firebase.google.com/go/v4 v4.8.0
77
github.com/aws/aws-sdk-go-v2 v0.17.0
88
github.com/bits-and-blooms/bloom/v3 v3.1.0
9-
github.com/code-payments/code-protobuf-api v1.16.7-0.20240611151313-ca7587f92a73
9+
github.com/code-payments/code-protobuf-api v1.16.7-0.20240613143759-2e4fbcd11dcb
1010
github.com/emirpasic/gods v1.12.0
1111
github.com/envoyproxy/protoc-gen-validate v1.0.4
1212
github.com/golang-jwt/jwt/v5 v5.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH
121121
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
122122
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
123123
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
124-
github.com/code-payments/code-protobuf-api v1.16.7-0.20240611151313-ca7587f92a73 h1:gdj/RvbLkcfxeWsrHJSu6Z8rkNtWvrIMZz/1WQlxVyg=
125-
github.com/code-payments/code-protobuf-api v1.16.7-0.20240611151313-ca7587f92a73/go.mod h1:pHQm75vydD6Cm2qHAzlimW6drysm489Z4tVxC2zHSsU=
124+
github.com/code-payments/code-protobuf-api v1.16.7-0.20240613143759-2e4fbcd11dcb h1:cSgl4rZkQqQ1cDgJFGxyRZFoHljN2Of73znDpqKXftY=
125+
github.com/code-payments/code-protobuf-api v1.16.7-0.20240613143759-2e4fbcd11dcb/go.mod h1:pHQm75vydD6Cm2qHAzlimW6drysm489Z4tVxC2zHSsU=
126126
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw=
127127
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
128128
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=

pkg/code/data/chat/v2/memory/store.go

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,34 @@ func (s *store) GetAllMembersByChatId(_ context.Context, chatId chat.ChatId) ([]
8080
return cloneMemberRecords(items), nil
8181
}
8282

83+
// GetAllMembersByPlatformId implements chat.store.GetAllMembersByPlatformId
84+
func (s *store) GetAllMembersByPlatformId(_ context.Context, platform chat.Platform, platformId string, cursor query.Cursor, direction query.Ordering, limit uint64) ([]*chat.MemberRecord, error) {
85+
s.mu.Lock()
86+
defer s.mu.Unlock()
87+
88+
items := s.findMembersByPlatformId(platform, platformId)
89+
items, err := s.getMemberRecordPage(items, cursor, direction, limit)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
if len(items) == 0 {
95+
return nil, chat.ErrMemberNotFound
96+
}
97+
return cloneMemberRecords(items), nil
98+
}
99+
100+
// GetUnreadCount implements chat.store.GetUnreadCount
101+
func (s *store) GetUnreadCount(_ context.Context, chatId chat.ChatId, readPointer chat.MessageId) (uint32, error) {
102+
s.mu.Lock()
103+
defer s.mu.Unlock()
104+
105+
items := s.findMessagesByChatId(chatId)
106+
items = s.filterMessagesAfter(items, readPointer)
107+
items = s.filterNotifiedMessages(items)
108+
return uint32(len(items)), nil
109+
}
110+
83111
// GetAllMessagesByChatId implements chat.Store.GetAllMessagesByChatId
84112
func (s *store) GetAllMessagesByChatId(_ context.Context, chatId chat.ChatId, cursor query.Cursor, direction query.Ordering, limit uint64) ([]*chat.MessageRecord, error) {
85113
s.mu.Lock()
@@ -296,6 +324,55 @@ func (s *store) findMembersByChatId(chatId chat.ChatId) []*chat.MemberRecord {
296324
return res
297325
}
298326

327+
func (s *store) findMembersByPlatformId(platform chat.Platform, platformId string) []*chat.MemberRecord {
328+
var res []*chat.MemberRecord
329+
for _, item := range s.memberRecords {
330+
if platform == item.Platform && platformId == item.PlatformId {
331+
res = append(res, item)
332+
}
333+
}
334+
return res
335+
}
336+
337+
func (s *store) getMemberRecordPage(items []*chat.MemberRecord, cursor query.Cursor, direction query.Ordering, limit uint64) ([]*chat.MemberRecord, error) {
338+
if len(items) == 0 {
339+
return nil, nil
340+
}
341+
342+
var memberIdCursor *uint64
343+
if len(cursor) > 0 {
344+
cursorValue := query.FromCursor(cursor)
345+
memberIdCursor = &cursorValue
346+
}
347+
348+
var res []*chat.MemberRecord
349+
if memberIdCursor == nil {
350+
res = items
351+
} else {
352+
for _, item := range items {
353+
if item.Id > int64(*memberIdCursor) && direction == query.Ascending {
354+
res = append(res, item)
355+
}
356+
357+
if item.Id < int64(*memberIdCursor) && direction == query.Descending {
358+
res = append(res, item)
359+
}
360+
}
361+
}
362+
363+
if direction == query.Ascending {
364+
sort.Sort(chat.MembersById(res))
365+
} else {
366+
sort.Sort(sort.Reverse(chat.MembersById(res)))
367+
}
368+
369+
if len(res) >= int(limit) {
370+
return res[:limit], nil
371+
}
372+
373+
return res, nil
374+
}
375+
299376
func (s *store) findMessage(data *chat.MessageRecord) *chat.MessageRecord {
300377
for _, item := range s.messageRecords {
301378
if data.Id == item.Id {
@@ -328,6 +405,26 @@ func (s *store) findMessagesByChatId(chatId chat.ChatId) []*chat.MessageRecord {
328405
return res
329406
}
330407

408+
func (s *store) filterMessagesAfter(items []*chat.MessageRecord, pointer chat.MessageId) []*chat.MessageRecord {
409+
var res []*chat.MessageRecord
410+
for _, item := range items {
411+
if item.MessageId.After(pointer) {
412+
res = append(res, item)
413+
}
414+
}
415+
return res
416+
}
417+
418+
func (s *store) filterNotifiedMessages(items []*chat.MessageRecord) []*chat.MessageRecord {
419+
var res []*chat.MessageRecord
420+
for _, item := range items {
421+
if !item.IsSilent {
422+
res = append(res, item)
423+
}
424+
}
425+
return res
426+
}
427+
331428
func (s *store) getMessageRecordPage(items []*chat.MessageRecord, cursor query.Cursor, direction query.Ordering, limit uint64) ([]*chat.MessageRecord, error) {
332429
if len(items) == 0 {
333430
return nil, nil
@@ -358,9 +455,9 @@ func (s *store) getMessageRecordPage(items []*chat.MessageRecord, cursor query.C
358455
}
359456

360457
if direction == query.Ascending {
361-
sort.Sort(chat.MessagesById(res))
458+
sort.Sort(chat.MessagesByMessageId(res))
362459
} else {
363-
sort.Sort(sort.Reverse(chat.MessagesById(res)))
460+
sort.Sort(sort.Reverse(chat.MessagesByMessageId(res)))
364461
}
365462

366463
if len(res) >= int(limit) {

pkg/code/data/chat/v2/model.go

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,59 @@ type MessageRecord struct {
9797
// Note: No timestamp field, since it's encoded in MessageId
9898
}
9999

100-
type MessagesById []*MessageRecord
100+
type MembersById []*MemberRecord
101101

102-
func (a MessagesById) Len() int { return len(a) }
103-
func (a MessagesById) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
104-
func (a MessagesById) Less(i, j int) bool {
102+
func (a MembersById) Len() int { return len(a) }
103+
func (a MembersById) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
104+
func (a MembersById) Less(i, j int) bool {
105+
return a[i].Id < a[j].Id
106+
}
107+
108+
type MessagesByMessageId []*MessageRecord
109+
110+
func (a MessagesByMessageId) Len() int { return len(a) }
111+
func (a MessagesByMessageId) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
112+
func (a MessagesByMessageId) Less(i, j int) bool {
105113
return a[i].MessageId.Before(a[j].MessageId)
106114
}
107115

108-
// GetChatIdFromProto gets a chat ID from the protobuf variant
116+
// GetChatTypeFromProto gets a chat type from the protobuf variant
117+
func GetChatTypeFromProto(proto chatpb.ChatMetadata_Kind) ChatType {
118+
switch proto {
119+
case chatpb.ChatMetadata_NOTIFICATION:
120+
return ChatTypeNotification
121+
case chatpb.ChatMetadata_TWO_WAY:
122+
return ChatTypeTwoWay
123+
default:
124+
return ChatTypeUnknown
125+
}
126+
}
127+
128+
// ToProto returns the proto representation of the chat type
129+
func (c ChatType) ToProto() chatpb.ChatMetadata_Kind {
130+
switch c {
131+
case ChatTypeNotification:
132+
return chatpb.ChatMetadata_NOTIFICATION
133+
case ChatTypeTwoWay:
134+
return chatpb.ChatMetadata_TWO_WAY
135+
default:
136+
return chatpb.ChatMetadata_UNKNOWN
137+
}
138+
}
139+
140+
// String returns the string representation of the chat type
141+
func (c ChatType) String() string {
142+
switch c {
143+
case ChatTypeNotification:
144+
return "notification"
145+
case ChatTypeTwoWay:
146+
return "two-way"
147+
default:
148+
return "unknown"
149+
}
150+
}
151+
152+
// GetPointerTypeFromProto gets a chat ID from the protobuf variant
109153
func GetPointerTypeFromProto(proto chatpb.Pointer_Kind) PointerType {
110154
switch proto {
111155
case chatpb.Pointer_SENT:
@@ -147,6 +191,28 @@ func (p PointerType) String() string {
147191
}
148192
}
149193

194+
// ToProto returns the proto representation of the platform
195+
func (p Platform) ToProto() chatpb.ChatMemberIdentity_Platform {
196+
switch p {
197+
case PlatformTwitter:
198+
return chatpb.ChatMemberIdentity_TWITTER
199+
default:
200+
return chatpb.ChatMemberIdentity_UNKNOWN
201+
}
202+
}
203+
204+
// String returns the string representation of the platform
205+
func (p Platform) String() string {
206+
switch p {
207+
case PlatformCode:
208+
return "code"
209+
case PlatformTwitter:
210+
return "twitter"
211+
default:
212+
return "unknown"
213+
}
214+
}
215+
150216
// Validate validates a chat Record
151217
func (r *ChatRecord) Validate() error {
152218
if err := r.ChatId.Validate(); err != nil {

pkg/code/data/chat/v2/store.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,18 @@ type Store interface {
3333
// todo: Add paging when we introduce group chats
3434
GetAllMembersByChatId(ctx context.Context, chatId ChatId) ([]*MemberRecord, error)
3535

36+
// GetAllMembersByPlatformId gets all members for a given platform user across
37+
// all chats
38+
GetAllMembersByPlatformId(ctx context.Context, platform Platform, platformId string, cursor query.Cursor, direction query.Ordering, limit uint64) ([]*MemberRecord, error)
39+
3640
// GetAllMessagesByChatId gets all messages for a given chat
3741
//
3842
// Note: Cursor is a message ID
3943
GetAllMessagesByChatId(ctx context.Context, chatId ChatId, cursor query.Cursor, direction query.Ordering, limit uint64) ([]*MessageRecord, error)
4044

45+
// GetUnreadCount gets the unread message count for a chat ID at a read pointer
46+
GetUnreadCount(ctx context.Context, chatId ChatId, readPointer MessageId) (uint32, error)
47+
4148
// PutChat creates a new chat
4249
PutChat(ctx context.Context, record *ChatRecord) error
4350

pkg/code/data/internal.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,9 @@ type DatabaseData interface {
400400
GetChatMemberByIdV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId) (*chat_v2.MemberRecord, error)
401401
GetChatMessageByIdV2(ctx context.Context, chatId chat_v2.ChatId, messageId chat_v2.MessageId) (*chat_v2.MessageRecord, error)
402402
GetAllChatMembersV2(ctx context.Context, chatId chat_v2.ChatId) ([]*chat_v2.MemberRecord, error)
403+
GetPlatformUserChatMembershipV2(ctx context.Context, platform chat_v2.Platform, platformId string, opts ...query.Option) ([]*chat_v2.MemberRecord, error)
403404
GetAllChatMessagesV2(ctx context.Context, chatId chat_v2.ChatId, opts ...query.Option) ([]*chat_v2.MessageRecord, error)
405+
GetChatUnreadCountV2(ctx context.Context, chatId chat_v2.ChatId, readPointer chat_v2.MessageId) (uint32, error)
404406
PutChatV2(ctx context.Context, record *chat_v2.ChatRecord) error
405407
PutChatMemberV2(ctx context.Context, record *chat_v2.MemberRecord) error
406408
PutChatMessageV2(ctx context.Context, record *chat_v2.MessageRecord) error
@@ -1476,13 +1478,23 @@ func (dp *DatabaseProvider) GetChatMessageByIdV2(ctx context.Context, chatId cha
14761478
func (dp *DatabaseProvider) GetAllChatMembersV2(ctx context.Context, chatId chat_v2.ChatId) ([]*chat_v2.MemberRecord, error) {
14771479
return dp.chatv2.GetAllMembersByChatId(ctx, chatId)
14781480
}
1481+
func (dp *DatabaseProvider) GetPlatformUserChatMembershipV2(ctx context.Context, platform chat_v2.Platform, platformId string, opts ...query.Option) ([]*chat_v2.MemberRecord, error) {
1482+
req, err := query.DefaultPaginationHandler(opts...)
1483+
if err != nil {
1484+
return nil, err
1485+
}
1486+
return dp.chatv2.GetAllMembersByPlatformId(ctx, platform, platformId, req.Cursor, req.SortBy, req.Limit)
1487+
}
14791488
func (dp *DatabaseProvider) GetAllChatMessagesV2(ctx context.Context, chatId chat_v2.ChatId, opts ...query.Option) ([]*chat_v2.MessageRecord, error) {
14801489
req, err := query.DefaultPaginationHandler(opts...)
14811490
if err != nil {
14821491
return nil, err
14831492
}
14841493
return dp.chatv2.GetAllMessagesByChatId(ctx, chatId, req.Cursor, req.SortBy, req.Limit)
14851494
}
1495+
func (dp *DatabaseProvider) GetChatUnreadCountV2(ctx context.Context, chatId chat_v2.ChatId, readPointer chat_v2.MessageId) (uint32, error) {
1496+
return dp.chatv2.GetUnreadCount(ctx, chatId, readPointer)
1497+
}
14861498
func (dp *DatabaseProvider) PutChatV2(ctx context.Context, record *chat_v2.ChatRecord) error {
14871499
return dp.chatv2.PutChat(ctx, record)
14881500
}

0 commit comments

Comments
 (0)