Skip to content

Commit 26bf2b7

Browse files
author
Jeff Yanta
committed
Have chat v2 store indicate if pointer was advanced
1 parent febc5f2 commit 26bf2b7

File tree

4 files changed

+27
-27
lines changed

4 files changed

+27
-27
lines changed

pkg/code/data/chat/v2/memory/store.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -201,19 +201,19 @@ func (s *store) PutMessage(_ context.Context, record *chat.MessageRecord) error
201201
}
202202

203203
// AdvancePointer implements chat.Store.AdvancePointer
204-
func (s *store) AdvancePointer(_ context.Context, chatId chat.ChatId, memberId chat.MemberId, pointerType chat.PointerType, pointer chat.MessageId) error {
204+
func (s *store) AdvancePointer(_ context.Context, chatId chat.ChatId, memberId chat.MemberId, pointerType chat.PointerType, pointer chat.MessageId) (bool, error) {
205205
switch pointerType {
206206
case chat.PointerTypeDelivered, chat.PointerTypeRead:
207207
default:
208-
return chat.ErrInvalidPointerType
208+
return false, chat.ErrInvalidPointerType
209209
}
210210

211211
s.mu.Lock()
212212
defer s.mu.Unlock()
213213

214214
item := s.findMemberById(chatId, memberId)
215215
if item == nil {
216-
return chat.ErrMemberNotFound
216+
return false, chat.ErrMemberNotFound
217217
}
218218

219219
var currentPointer *chat.MessageId
@@ -224,20 +224,19 @@ func (s *store) AdvancePointer(_ context.Context, chatId chat.ChatId, memberId c
224224
currentPointer = item.ReadPointer
225225
}
226226

227-
if currentPointer != nil && currentPointer.After(pointer) {
228-
return nil
229-
}
227+
if currentPointer == nil || currentPointer.Before(pointer) {
228+
switch pointerType {
229+
case chat.PointerTypeDelivered:
230+
cloned := pointer.Clone()
231+
item.DeliveryPointer = &cloned
232+
case chat.PointerTypeRead:
233+
cloned := pointer.Clone()
234+
item.ReadPointer = &cloned
235+
}
230236

231-
switch pointerType {
232-
case chat.PointerTypeDelivered:
233-
cloned := pointer.Clone()
234-
item.DeliveryPointer = &cloned
235-
case chat.PointerTypeRead:
236-
cloned := pointer.Clone()
237-
item.ReadPointer = &cloned
237+
return true, nil
238238
}
239-
240-
return nil
239+
return false, nil
241240
}
242241

243242
// SetMuteState implements chat.Store.SetMuteState

pkg/code/data/chat/v2/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type Store interface {
5555
PutMessage(ctx context.Context, record *MessageRecord) error
5656

5757
// AdvancePointer advances a chat pointer for a chat member
58-
AdvancePointer(ctx context.Context, chatId ChatId, memberId MemberId, pointerType PointerType, pointer MessageId) error
58+
AdvancePointer(ctx context.Context, chatId ChatId, memberId MemberId, pointerType PointerType, pointer MessageId) (bool, error)
5959

6060
// SetMuteState updates the mute state for a chat member
6161
SetMuteState(ctx context.Context, chatId ChatId, memberId MemberId, isMuted bool) error

pkg/code/data/internal.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ type DatabaseData interface {
406406
PutChatV2(ctx context.Context, record *chat_v2.ChatRecord) error
407407
PutChatMemberV2(ctx context.Context, record *chat_v2.MemberRecord) error
408408
PutChatMessageV2(ctx context.Context, record *chat_v2.MessageRecord) error
409-
AdvanceChatPointerV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, pointerType chat_v2.PointerType, pointer chat_v2.MessageId) error
409+
AdvanceChatPointerV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, pointerType chat_v2.PointerType, pointer chat_v2.MessageId) (bool, error)
410410
SetChatMuteStateV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, isMuted bool) error
411411
SetChatSubscriptionStateV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, isSubscribed bool) error
412412

@@ -1504,7 +1504,7 @@ func (dp *DatabaseProvider) PutChatMemberV2(ctx context.Context, record *chat_v2
15041504
func (dp *DatabaseProvider) PutChatMessageV2(ctx context.Context, record *chat_v2.MessageRecord) error {
15051505
return dp.chatv2.PutMessage(ctx, record)
15061506
}
1507-
func (dp *DatabaseProvider) AdvanceChatPointerV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, pointerType chat_v2.PointerType, pointer chat_v2.MessageId) error {
1507+
func (dp *DatabaseProvider) AdvanceChatPointerV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, pointerType chat_v2.PointerType, pointer chat_v2.MessageId) (bool, error) {
15081508
return dp.chatv2.AdvancePointer(ctx, chatId, memberId, pointerType, pointer)
15091509
}
15101510
func (dp *DatabaseProvider) SetChatMuteStateV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, isMuted bool) error {

pkg/code/server/grpc/chat/v2/server.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -851,20 +851,21 @@ func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerR
851851
return nil, status.Error(codes.Internal, "")
852852
}
853853

854-
// Note: Guarantees that pointer will never be advanced to some point in the past
855-
err = s.data.AdvanceChatPointerV2(ctx, chatId, memberId, pointerType, pointerValue)
854+
isAdvanced, err := s.data.AdvanceChatPointerV2(ctx, chatId, memberId, pointerType, pointerValue)
856855
if err != nil {
857856
log.WithError(err).Warn("failure advancing chat pointer")
858857
return nil, status.Error(codes.Internal, "")
859858
}
860859

861-
event := &chatpb.ChatStreamEvent{
862-
Type: &chatpb.ChatStreamEvent_Pointer{
863-
Pointer: req.Pointer,
864-
},
865-
}
866-
if err := s.asyncNotifyAll(chatId, memberId, event); err != nil {
867-
log.WithError(err).Warn("failure notifying chat event")
860+
if isAdvanced {
861+
event := &chatpb.ChatStreamEvent{
862+
Type: &chatpb.ChatStreamEvent_Pointer{
863+
Pointer: req.Pointer,
864+
},
865+
}
866+
if err := s.asyncNotifyAll(chatId, memberId, event); err != nil {
867+
log.WithError(err).Warn("failure notifying chat event")
868+
}
868869
}
869870

870871
return &chatpb.AdvancePointerResponse{

0 commit comments

Comments
 (0)