@@ -54,18 +54,18 @@ type server struct {
5454 chatLocks * sync_util.StripedLock
5555 chatEventChans * sync_util.StripedChannel
5656
57- streamsMu sync.RWMutex
58- streams map [string ]* chatEventStream
59-
6057 chatpb.UnimplementedChatServer
6158}
6259
63- func NewChatServer (data code_data.Provider , auth * auth_util.RPCSignatureVerifier ) chatpb.ChatServer {
64- return & server {
65- log : logrus .StandardLogger ().WithField ("type" , "chat/server" ),
66- data : data ,
67- auth : auth ,
68- streams : make (map [string ]* chatEventStream ),
60+ func NewChatServer (data code_data.Provider , auth * auth_util.RPCSignatureVerifier , pusher push_lib.Provider ) chatpb.ChatServer {
61+ s := & server {
62+ log : logrus .StandardLogger ().WithField ("type" , "chat/v1/server" ),
63+ data : data ,
64+ auth : auth ,
65+ pusher : pusher ,
66+ streams : make (map [string ]* chatEventStream ),
67+ chatLocks : sync_util .NewStripedLock (64 ), // todo: configurable parameters
68+ chatEventChans : sync_util .NewStripedChannel (64 , 100_000 ), // todo: configurable parameters
6969 }
7070
7171 for i , channel := range s .chatEventChans .GetChannels () {
@@ -403,21 +403,9 @@ func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerR
403403 Pointers : []* chatpb.Pointer {req .Pointer },
404404 }
405405
406- s .streamsMu .RLock ()
407- for key , stream := range s .streams {
408- if ! strings .HasPrefix (key , chatId .String ()) {
409- continue
410- }
411-
412- if strings .HasSuffix (key , owner .PublicKey ().ToBase58 ()) {
413- continue
414- }
415-
416- if err := stream .notify (event , streamNotifyTimeout ); err != nil {
417- log .WithError (err ).Warnf ("failed to notify session stream, closing streamer (stream=%p)" , stream )
418- }
406+ if err := s .asyncNotifyAll (chatId , owner , event ); err != nil {
407+ log .WithError (err ).Warn ("failure notifying chat event" )
419408 }
420- s .streamsMu .RUnlock ()
421409
422410 return & chatpb.AdvancePointerResponse {
423411 Result : chatpb .AdvancePointerResponse_OK ,
@@ -428,7 +416,7 @@ func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerR
428416 return nil , status .Error (codes .InvalidArgument , "Pointer.Kind must be READ" )
429417 }
430418
431- chatRecord , err := s .data .GetChatById (ctx , chatId )
419+ chatRecord , err := s .data .GetChatByIdV1 (ctx , chatId )
432420 if err == chat .ErrChatNotFound {
433421 return & chatpb.AdvancePointerResponse {
434422 Result : chatpb .AdvancePointerResponse_CHAT_NOT_FOUND ,
@@ -656,6 +644,22 @@ func (s *server) StreamChatEvents(streamer chatpb.Chat_StreamChatEventsServer) e
656644
657645 s .streamsMu .Unlock ()
658646
647+ defer func () {
648+ s .streamsMu .Lock ()
649+
650+ log .Tracef ("closing streamer (stream=%s)" , streamRef )
651+
652+ // We check to see if the current active stream is the one that we created.
653+ // If it is, we can just remove it since it's closed. Otherwise, we leave it
654+ // be, as another OpenMessageStream() call is handling it.
655+ liveStream , exists := s .streams [streamKey ]
656+ if exists && liveStream == stream {
657+ delete (s .streams , streamKey )
658+ }
659+
660+ s .streamsMu .Unlock ()
661+ }()
662+
659663 sendPingCh := time .After (0 )
660664 streamHealthCh := monitorChatEventStreamHealth (ctx , log , streamRef , streamer )
661665
@@ -734,6 +738,10 @@ func (s *server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest
734738 return nil , status .Error (codes .InvalidArgument , "content[0] must be Text or ThankYou" )
735739 }
736740
741+ chatLock := s .chatLocks .Get (chatId [:])
742+ chatLock .Lock ()
743+ defer chatLock .Unlock ()
744+
737745 // todo: Revisit message IDs
738746 messageId , err := common .NewRandomAccount ()
739747 if err != nil {
@@ -749,28 +757,54 @@ func (s *server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest
749757 Cursor : nil , // todo: Don't have cursor until we save it to the DB
750758 }
751759
760+ // todo: Save the message to the DB
761+
752762 event := & chatpb.ChatStreamEvent {
753763 Messages : []* chatpb.ChatMessage {chatMessage },
754764 }
755765
756- s .streamsMu .RLock ()
757- for key , stream := range s .streams {
758- if ! strings .HasPrefix (key , chatId .String ()) {
759- continue
760- }
761-
762- if strings .HasSuffix (key , owner .PublicKey ().ToBase58 ()) {
763- continue
764- }
765-
766- if err := stream .notify (event , streamNotifyTimeout ); err != nil {
767- log .WithError (err ).Warnf ("failed to notify session stream, closing streamer (stream=%p)" , stream )
768- }
766+ if err := s .asyncNotifyAll (chatId , owner , event ); err != nil {
767+ log .WithError (err ).Warn ("failure notifying chat event" )
769768 }
770- s .streamsMu .RUnlock ()
769+
770+ s .asyncPushChatMessage (owner , chatId , chatMessage )
771771
772772 return & chatpb.SendMessageResponse {
773773 Result : chatpb .SendMessageResponse_OK ,
774774 Message : chatMessage ,
775775 }, nil
776776}
777+
778+ // todo: doesn't respect mute/unsubscribe rules
779+ // todo: only sends pushes to active stream listeners instead of all message recipients
780+ func (s * server ) asyncPushChatMessage (sender * common.Account , chatId chat.ChatId , chatMessage * chatpb.ChatMessage ) {
781+ ctx := context .TODO ()
782+
783+ go func () {
784+ s .streamsMu .RLock ()
785+ for key := range s .streams {
786+ if ! strings .HasPrefix (key , chatId .String ()) {
787+ continue
788+ }
789+
790+ receiver , err := common .NewAccountFromPublicKeyString (strings .Split (key , ":" )[1 ])
791+ if err != nil {
792+ continue
793+ }
794+
795+ if bytes .Equal (sender .PublicKey ().ToBytes (), receiver .PublicKey ().ToBytes ()) {
796+ continue
797+ }
798+
799+ go push_util .SendChatMessagePushNotification (
800+ ctx ,
801+ s .data ,
802+ s .pusher ,
803+ "TontonTwitch" ,
804+ receiver ,
805+ chatMessage ,
806+ )
807+ }
808+ s .streamsMu .RUnlock ()
809+ }()
810+ }
0 commit comments