@@ -767,33 +767,15 @@ func (s *server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest
767767 chatLock .Lock ()
768768 defer chatLock .Unlock ()
769769
770- messageId := chat .GenerateMessageId ()
771- ts , _ := messageId .GetTimestamp ()
772-
773- chatMessage := & chatpb.ChatMessage {
774- MessageId : messageId .ToProto (),
775- SenderId : req .MemberId ,
776- Content : req .Content ,
777- Ts : timestamppb .New (ts ),
778- Cursor : & chatpb.Cursor {Value : messageId [:]},
779- }
770+ chatMessage := newProtoChatMessage (memberId , req .Content ... )
780771
781772 err = s .persistChatMessage (ctx , chatId , chatMessage )
782773 if err != nil {
783774 log .WithError (err ).Warn ("failure persisting chat message" )
784775 return nil , status .Error (codes .Internal , "" )
785776 }
786777
787- event := & chatpb.ChatStreamEvent {
788- Type : & chatpb.ChatStreamEvent_Message {
789- Message : chatMessage ,
790- },
791- }
792- if err := s .asyncNotifyAll (chatId , memberId , event ); err != nil {
793- log .WithError (err ).Warn ("failure notifying chat event" )
794- }
795-
796- // todo: send the push
778+ s .onPersistChatMessage (log , chatId , chatMessage )
797779
798780 return & chatpb.SendMessageResponse {
799781 Result : chatpb .SendMessageResponse_OK ,
@@ -946,7 +928,7 @@ func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerR
946928 Pointer : req .Pointer ,
947929 },
948930 }
949- if err := s .asyncNotifyAll (chatId , memberId , event ); err != nil {
931+ if err := s .asyncNotifyAll (chatId , event ); err != nil {
950932 log .WithError (err ).Warn ("failure notifying chat event" )
951933 }
952934 }
@@ -1061,20 +1043,17 @@ func (s *server) RevealIdentity(ctx context.Context, req *chatpb.RevealIdentityR
10611043 chatLock .Lock ()
10621044 defer chatLock .Unlock ()
10631045
1064- messageId := chat .GenerateMessageId ()
1065- ts , _ := messageId .GetTimestamp ()
1066-
1067- chatMessage := & chatpb.ChatMessage {
1068- MessageId : messageId .ToProto (),
1069- SenderId : req .MemberId ,
1070- Content : []* chatpb.Content {
1071- {
1072- Type : & chatpb.Content_IdentityRevealed {},
1046+ chatMessage := newProtoChatMessage (
1047+ memberId ,
1048+ & chatpb.Content {
1049+ Type : & chatpb.Content_IdentityRevealed {
1050+ IdentityRevealed : & chatpb.IdentityRevealedContent {
1051+ MemberId : req .MemberId ,
1052+ Identity : req .Identity ,
1053+ },
10731054 },
10741055 },
1075- Ts : timestamppb .New (ts ),
1076- Cursor : & chatpb.Cursor {Value : messageId [:]},
1077- }
1056+ )
10781057
10791058 err = s .data .ExecuteInTx (ctx , sql .LevelDefault , func (ctx context.Context ) error {
10801059 err = s .data .UpgradeChatMemberIdentityV2 (ctx , chatId , memberId , platform , req .Identity .Username )
@@ -1094,16 +1073,7 @@ func (s *server) RevealIdentity(ctx context.Context, req *chatpb.RevealIdentityR
10941073 })
10951074
10961075 if err == nil {
1097- event := & chatpb.ChatStreamEvent {
1098- Type : & chatpb.ChatStreamEvent_Message {
1099- Message : chatMessage ,
1100- },
1101- }
1102- if err := s .asyncNotifyAll (chatId , memberId , event ); err != nil {
1103- log .WithError (err ).Warn ("failure notifying chat event" )
1104- }
1105-
1106- // todo: send the push
1076+ s .onPersistChatMessage (log , chatId , chatMessage )
11071077 }
11081078
11091079 switch err {
@@ -1251,66 +1221,11 @@ func (s *server) SetSubscriptionState(ctx context.Context, req *chatpb.SetSubscr
12511221 }, nil
12521222}
12531223
1254- func (s * server ) getProtoChatMessages (ctx context.Context , chatId chat.ChatId , owner * common.Account , queryOptions ... query.Option ) ([]* chatpb.ChatMessage , error ) {
1255- messageRecords , err := s .data .GetAllChatMessagesV2 (
1256- ctx ,
1257- chatId ,
1258- queryOptions ... ,
1259- )
1260- if err == chat .ErrMessageNotFound {
1261- return nil , err
1262- }
1263-
1264- var userLocale * language.Tag // Loaded lazily when required
1265- var res []* chatpb.ChatMessage
1266- for _ , messageRecord := range messageRecords {
1267- var protoChatMessage chatpb.ChatMessage
1268- err = proto .Unmarshal (messageRecord .Data , & protoChatMessage )
1269- if err != nil {
1270- return nil , errors .Wrap (err , "error unmarshalling proto chat message" )
1271- }
1272-
1273- ts , err := messageRecord .GetTimestamp ()
1274- if err != nil {
1275- return nil , errors .Wrap (err , "error getting message timestamp" )
1276- }
1277-
1278- for _ , content := range protoChatMessage .Content {
1279- switch typed := content .Type .(type ) {
1280- case * chatpb.Content_Localized :
1281- if userLocale == nil {
1282- loadedUserLocale , err := s .data .GetUserLocale (ctx , owner .PublicKey ().ToBase58 ())
1283- if err != nil {
1284- return nil , errors .Wrap (err , "error getting user locale" )
1285- }
1286- userLocale = & loadedUserLocale
1287- }
1288-
1289- typed .Localized .KeyOrText = localization .LocalizeWithFallback (
1290- * userLocale ,
1291- localization .GetLocalizationKeyForUserAgent (ctx , typed .Localized .KeyOrText ),
1292- typed .Localized .KeyOrText ,
1293- )
1294- }
1295- }
1296-
1297- protoChatMessage .MessageId = messageRecord .MessageId .ToProto ()
1298- if messageRecord .Sender != nil {
1299- protoChatMessage .SenderId = messageRecord .Sender .ToProto ()
1300- }
1301- protoChatMessage .Ts = timestamppb .New (ts )
1302- protoChatMessage .Cursor = & chatpb.Cursor {Value : messageRecord .MessageId [:]}
1303-
1304- res = append (res , & protoChatMessage )
1305- }
1306-
1307- return res , nil
1308- }
1309-
13101224func (s * server ) toProtoChat (ctx context.Context , chatRecord * chat.ChatRecord , memberRecords []* chat.MemberRecord , myIdentitiesByPlatform map [chat.Platform ]string ) (* chatpb.ChatMetadata , error ) {
13111225 protoChat := & chatpb.ChatMetadata {
13121226 ChatId : chatRecord .ChatId .ToProto (),
13131227 Kind : chatRecord .ChatType .ToProto (),
1228+ Cursor : & chatpb.Cursor {Value : query .ToCursor (uint64 (chatRecord .Id ))},
13141229 }
13151230
13161231 switch chatRecord .ChatType {
@@ -1390,6 +1305,75 @@ func (s *server) toProtoChat(ctx context.Context, chatRecord *chat.ChatRecord, m
13901305 return protoChat , nil
13911306}
13921307
1308+ func (s * server ) getProtoChatMessages (ctx context.Context , chatId chat.ChatId , owner * common.Account , queryOptions ... query.Option ) ([]* chatpb.ChatMessage , error ) {
1309+ messageRecords , err := s .data .GetAllChatMessagesV2 (
1310+ ctx ,
1311+ chatId ,
1312+ queryOptions ... ,
1313+ )
1314+ if err == chat .ErrMessageNotFound {
1315+ return nil , err
1316+ }
1317+
1318+ var userLocale * language.Tag // Loaded lazily when required
1319+ var res []* chatpb.ChatMessage
1320+ for _ , messageRecord := range messageRecords {
1321+ var protoChatMessage chatpb.ChatMessage
1322+ err = proto .Unmarshal (messageRecord .Data , & protoChatMessage )
1323+ if err != nil {
1324+ return nil , errors .Wrap (err , "error unmarshalling proto chat message" )
1325+ }
1326+
1327+ ts , err := messageRecord .GetTimestamp ()
1328+ if err != nil {
1329+ return nil , errors .Wrap (err , "error getting message timestamp" )
1330+ }
1331+
1332+ for _ , content := range protoChatMessage .Content {
1333+ switch typed := content .Type .(type ) {
1334+ case * chatpb.Content_Localized :
1335+ if userLocale == nil {
1336+ loadedUserLocale , err := s .data .GetUserLocale (ctx , owner .PublicKey ().ToBase58 ())
1337+ if err != nil {
1338+ return nil , errors .Wrap (err , "error getting user locale" )
1339+ }
1340+ userLocale = & loadedUserLocale
1341+ }
1342+
1343+ typed .Localized .KeyOrText = localization .LocalizeWithFallback (
1344+ * userLocale ,
1345+ localization .GetLocalizationKeyForUserAgent (ctx , typed .Localized .KeyOrText ),
1346+ typed .Localized .KeyOrText ,
1347+ )
1348+ }
1349+ }
1350+
1351+ protoChatMessage .MessageId = messageRecord .MessageId .ToProto ()
1352+ if messageRecord .Sender != nil {
1353+ protoChatMessage .SenderId = messageRecord .Sender .ToProto ()
1354+ }
1355+ protoChatMessage .Ts = timestamppb .New (ts )
1356+ protoChatMessage .Cursor = & chatpb.Cursor {Value : messageRecord .MessageId [:]}
1357+
1358+ res = append (res , & protoChatMessage )
1359+ }
1360+
1361+ return res , nil
1362+ }
1363+
1364+ func (s * server ) onPersistChatMessage (log * logrus.Entry , chatId chat.ChatId , chatMessage * chatpb.ChatMessage ) {
1365+ event := & chatpb.ChatStreamEvent {
1366+ Type : & chatpb.ChatStreamEvent_Message {
1367+ Message : chatMessage ,
1368+ },
1369+ }
1370+ if err := s .asyncNotifyAll (chatId , event ); err != nil {
1371+ log .WithError (err ).Warn ("failure notifying chat event" )
1372+ }
1373+
1374+ // todo: send the push
1375+ }
1376+
13931377func (s * server ) getAllIdentities (ctx context.Context , owner * common.Account ) (map [chat.Platform ]string , error ) {
13941378 identities := map [chat.Platform ]string {
13951379 chat .PlatformCode : owner .PublicKey ().ToBase58 (),
@@ -1466,3 +1450,16 @@ func (s *server) getOwnedTwitterUsername(ctx context.Context, owner *common.Acco
14661450 return "" , false , errors .Wrap (err , "error getting twitter user" )
14671451 }
14681452}
1453+
1454+ func newProtoChatMessage (sender chat.MemberId , content ... * chatpb.Content ) * chatpb.ChatMessage {
1455+ messageId := chat .GenerateMessageId ()
1456+ ts , _ := messageId .GetTimestamp ()
1457+
1458+ return & chatpb.ChatMessage {
1459+ MessageId : messageId .ToProto (),
1460+ SenderId : sender .ToProto (),
1461+ Content : content ,
1462+ Ts : timestamppb .New (ts ),
1463+ Cursor : & chatpb.Cursor {Value : messageId [:]},
1464+ }
1465+ }
0 commit comments