Skip to content

Commit f4bc1f0

Browse files
authored
ntf server: skip duplicates when importing tokens and subscriptions (#1526)
* ntf server: skip duplicates when importing tokens and subscriptions * skip imported last notifications when no token or subscription present * fix skipped imported notifications count * all tests * fix test
1 parent 42dbb88 commit f4bc1f0

File tree

4 files changed

+111
-73
lines changed

4 files changed

+111
-73
lines changed

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ import UnliftIO.STM
7979
import GHC.Conc (listThreads)
8080
#endif
8181

82-
import qualified Data.ByteString.Base64 as B64
83-
8482
runNtfServer :: NtfServerConfig -> IO ()
8583
runNtfServer cfg = do
8684
started <- newEmptyTMVarIO

src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs

Lines changed: 95 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ import Data.Containers.ListUtils (nubOrd)
3333
import Data.Either (fromRight)
3434
import Data.Functor (($>))
3535
import Data.Int (Int64)
36-
import Data.List (foldl', intercalate)
36+
import Data.List (findIndex, foldl')
3737
import Data.List.NonEmpty (NonEmpty (..))
3838
import qualified Data.List.NonEmpty as L
3939
import qualified Data.Map.Strict as M
40-
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
40+
import Data.Maybe (fromMaybe, mapMaybe)
4141
import qualified Data.Set as S
4242
import Data.Text (Text)
4343
import qualified Data.Text as T
@@ -587,89 +587,96 @@ toLastNtf (qRow :. (ts, nonce, Binary encMeta)) =
587587

588588
importNtfSTMStore :: NtfPostgresStore -> NtfSTMStore -> IO (Int64, Int64, Int64)
589589
importNtfSTMStore NtfPostgresStore {dbStore = s} stmStore = do
590-
(tCnt, tIds) <- importTokens
591-
sCnt <- importSubscriptions tIds
592-
nCnt <- importLastNtfs
590+
(tIds, tCnt) <- importTokens
591+
subLookup <- readTVarIO $ subscriptionLookup stmStore
592+
sCnt <- importSubscriptions tIds subLookup
593+
nCnt <- importLastNtfs tIds subLookup
593594
pure (tCnt, sCnt, nCnt)
594595
where
595596
importTokens = do
596597
allTokens <- M.elems <$> readTVarIO (tokens stmStore)
597598
tokens <- filterTokens allTokens
598599
let skipped = length allTokens - length tokens
599600
when (skipped /= 0) $ putStrLn $ "Total skipped tokens " <> show skipped
600-
tCnt <- withConnection s $ \db -> foldM (insertToken db) 0 tokens
601-
void $ checkCount "token" (length tokens) tCnt
601+
-- uncomment this line instead of the next to import tokens one by one.
602+
-- tCnt <- withConnection s $ \db -> foldM (importTkn db) 0 tokens
603+
tRows <- mapM (fmap ntfTknToRow . mkTknRec) tokens
604+
tCnt <- withConnection s $ \db -> DB.executeMany db insertNtfTknQuery tRows
602605
let tokenIds = S.fromList $ map (\NtfTknData {ntfTknId} -> ntfTknId) tokens
603-
pure (tCnt, tokenIds)
606+
(tokenIds,) <$> checkCount "token" (length tokens) tCnt
604607
where
605608
filterTokens tokens = do
606609
let deviceTokens = foldl' (\m t -> M.alter (Just . (t :) . fromMaybe []) (tokenKey t) m) M.empty tokens
607610
tokenSubs <- readTVarIO (tokenSubscriptions stmStore)
608611
filterM (keepTokenRegistration deviceTokens tokenSubs) tokens
609612
tokenKey NtfTknData {token, tknVerifyKey} = strEncode token <> ":" <> C.toPubKey C.pubKeyBytes tknVerifyKey
610-
keepTokenRegistration deviceTokens tokenSubs tkn@NtfTknData {ntfTknId, token, tknStatus} =
613+
keepTokenRegistration deviceTokens tokenSubs tkn@NtfTknData {ntfTknId, tknStatus} =
611614
case M.lookup (tokenKey tkn) deviceTokens of
612615
Just ts
613-
| length ts >= 2 ->
616+
| length ts < 2 -> pure True
617+
| otherwise ->
614618
readTVarIO tknStatus >>= \case
615619
NTConfirmed -> do
616-
anyActive <- anyM $ map (\NtfTknData {tknStatus = tknStatus'} -> (NTActive ==) <$> readTVarIO tknStatus') ts
617-
noSubs <- S.null <$> maybe (pure S.empty) readTVarIO (M.lookup ntfTknId tokenSubs)
618-
if anyActive
619-
then (
620-
if noSubs
621-
then False <$ putStrLn ("Skipped inactive token " <> enc ntfTknId <> " (no subscriptions)")
622-
else pure True
623-
)
620+
hasSubs <- maybe (pure False) (\v -> not . S.null <$> readTVarIO v) $ M.lookup ntfTknId tokenSubs
621+
if hasSubs
622+
then pure True
624623
else do
625-
let noSubsStr = if noSubs then " no subscriptions" else " has subscriptions"
626-
putStrLn $ "Error: more than one registration for token " <> enc ntfTknId <> " " <> show token <> noSubsStr
627-
pure True
624+
anyActive <- anyM $ map (\NtfTknData {tknStatus = tknStatus'} -> (NTActive ==) <$> readTVarIO tknStatus') ts
625+
if anyActive
626+
then False <$ putStrLn ("Skipped duplicate inactive token " <> enc ntfTknId)
627+
else case findIndex (\NtfTknData {ntfTknId = tId} -> tId == ntfTknId) ts of
628+
Just 0 -> pure True -- keeping the first token
629+
Just _ -> False <$ putStrLn ("Skipped duplicate inactive token " <> enc ntfTknId <> " (no active token)")
630+
Nothing -> True <$ putStrLn "Error: no device token in the list"
628631
_ -> pure True
629-
| otherwise -> pure True
630632
Nothing -> True <$ putStrLn "Error: no device token in lookup map"
631-
insertToken db !n tkn@NtfTknData {ntfTknId} = do
632-
tknRow <- ntfTknToRow <$> mkTknRec tkn
633-
(DB.execute db insertNtfTknQuery tknRow >>= pure . (n + )) `E.catch` \(e :: E.SomeException) ->
634-
putStrLn ("Error inserting token " <> enc ntfTknId <> " " <> show e) $> n
635-
importSubscriptions tIds = do
636-
allSubs <- M.elems <$> readTVarIO (subscriptions stmStore)
637-
let subs = filter (\NtfSubData {tokenId} -> S.member tokenId tIds) allSubs
638-
skipped = length allSubs - length subs
639-
when (skipped /= 0) $ putStrLn $ "Skipped subscriptions (no tokens) " <> show skipped
633+
-- importTkn db !n tkn@NtfTknData {ntfTknId} = do
634+
-- tknRow <- ntfTknToRow <$> mkTknRec tkn
635+
-- (DB.execute db insertNtfTknQuery tknRow >>= pure . (n + )) `E.catch` \(e :: E.SomeException) ->
636+
-- putStrLn ("Error inserting token " <> enc ntfTknId <> " " <> show e) $> n
637+
importSubscriptions :: S.Set NtfTokenId -> M.Map SMPQueueNtf NtfSubscriptionId -> IO Int64
638+
importSubscriptions tIds subLookup = do
639+
subs <- filterSubs . M.elems =<< readTVarIO (subscriptions stmStore)
640640
srvIds <- importServers subs
641641
putStrLn $ "Importing " <> show (length subs) <> " subscriptions..."
642-
-- uncomment this line instead of the next 2 lines to import subs one by one.
643-
(sCnt, missingTkns) <- withConnection s $ \db -> foldM (importSub db srvIds) (0, M.empty) subs
644-
-- sCnt <- foldM (importSubs srvIds) 0 $ toChunks 100000 subs
645-
-- let missingTkns = M.empty
646-
putStrLn $ "Imported " <> show sCnt <> " subscriptions"
647-
unless (M.null missingTkns) $ do
648-
putStrLn $ show (M.size missingTkns) <> " missing tokens:"
649-
forM_ (M.assocs missingTkns) $ \(tId, sIds) ->
650-
putStrLn $ "Token " <> enc tId <> " " <> show (length sIds) <> " subscriptions: " <> intercalate ", " (map enc sIds)
642+
-- uncomment this line instead of the next to import subs one by one.
643+
-- (sCnt, errTkns) <- withConnection s $ \db -> foldM (importSub db srvIds) (0, M.empty) subs
644+
sCnt <- foldM (importSubs srvIds) 0 $ toChunks 500000 subs
651645
checkCount "subscription" (length subs) sCnt
652646
where
647+
filterSubs allSubs = do
648+
let subs = filter (\NtfSubData {tokenId} -> S.member tokenId tIds) allSubs
649+
skipped = length allSubs - length subs
650+
when (skipped /= 0) $ putStrLn $ "Skipped " <> show skipped <> " subscriptions of missing tokens"
651+
let (removedSubTokens, removeSubs, dupQueues) = foldl' addSubToken (S.empty, S.empty, S.empty) subs
652+
unless (null removeSubs) $ putStrLn $ "Skipped " <> show (S.size removeSubs) <> " duplicate subscriptions of " <> show (S.size removedSubTokens) <> " tokens for " <> show (S.size dupQueues) <> " queues"
653+
pure $ filter (\NtfSubData {ntfSubId} -> S.notMember ntfSubId removeSubs) subs
654+
where
655+
addSubToken acc@(!stIds, !sIds, !qs) NtfSubData {ntfSubId, smpQueue, tokenId} =
656+
case M.lookup smpQueue subLookup of
657+
Just sId | sId /= ntfSubId ->
658+
(S.insert tokenId stIds, S.insert ntfSubId sIds, S.insert smpQueue qs)
659+
_ -> acc
653660
importSubs srvIds !n subs = do
654661
rows <- mapM (ntfSubRow srvIds) subs
655662
cnt <- withConnection s $ \db -> DB.executeMany db insertNtfSubQuery $ L.toList rows
656663
let n' = n + cnt
657664
putStr $ "Imported " <> show n' <> " subscriptions" <> "\r"
658665
hFlush stdout
659666
pure n'
660-
importSub db srvIds (!n, !missingTkns) sub@NtfSubData {ntfSubId = sId, tokenId} = do
661-
subRow <- ntfSubRow srvIds sub
662-
E.try (DB.execute db insertNtfSubQuery subRow) >>= \case
663-
Right i -> do
664-
let n' = n + i
665-
when (n' `mod` 100000 == 0) $ do
666-
putStr $ "Imported " <> show n' <> " subscriptions" <> "\r"
667-
hFlush stdout
668-
pure (n', missingTkns)
669-
Left (e :: E.SomeException) -> do
670-
when (n `mod` 100000 == 0) $ putStrLn ""
671-
putStrLn $ "Error inserting subscription " <> enc sId <> " for token " <> enc tokenId <> " " <> show e
672-
pure (n, M.alter (Just . (sId :) . fromMaybe []) tokenId missingTkns)
667+
-- importSub db srvIds (!n, !errTkns) sub@NtfSubData {ntfSubId = sId, tokenId} = do
668+
-- subRow <- ntfSubRow srvIds sub
669+
-- E.try (DB.execute db insertNtfSubQuery subRow) >>= \case
670+
-- Right i -> do
671+
-- let n' = n + i
672+
-- when (n' `mod` 100000 == 0) $ do
673+
-- putStr $ "Imported " <> show n' <> " subscriptions" <> "\r"
674+
-- hFlush stdout
675+
-- pure (n', errTkns)
676+
-- Left (e :: E.SomeException) -> do
677+
-- when (n `mod` 100000 == 0) $ putStrLn ""
678+
-- putStrLn $ "Error inserting subscription " <> enc sId <> " for token " <> enc tokenId <> " " <> show e
679+
-- pure (n, M.alter (Just . maybe [sId] (sId :)) tokenId errTkns)
673680
ntfSubRow srvIds sub = case M.lookup srv srvIds of
674681
Just sId -> ntfSubToRow sId <$> mkSubRec sub
675682
Nothing -> E.throwIO $ userError $ "no matching server ID for server " <> show srv
@@ -682,19 +689,32 @@ importNtfSTMStore NtfPostgresStore {dbStore = s} stmStore = do
682689
where
683690
srvQuery = "INSERT INTO smp_servers (smp_host, smp_port, smp_keyhash) VALUES (?, ?, ?) RETURNING smp_server_id"
684691
srvs = nubOrd $ map ntfSubServer subs
685-
importLastNtfs = do
686-
subLookup <- readTVarIO $ subscriptionLookup stmStore
687-
ntfRows <- fmap concat . mapM (lastNtfRows subLookup) . M.assocs =<< readTVarIO (tokenLastNtfs stmStore)
692+
importLastNtfs :: S.Set NtfTokenId -> M.Map SMPQueueNtf NtfSubscriptionId -> IO Int64
693+
importLastNtfs tIds subLookup = do
694+
ntfs <- readTVarIO (tokenLastNtfs stmStore)
695+
ntfRows <- filterLastNtfRows ntfs
688696
nCnt <- withConnection s $ \db -> DB.executeMany db lastNtfQuery ntfRows
689697
checkCount "last notification" (length ntfRows) nCnt
690698
where
691699
lastNtfQuery = "INSERT INTO last_notifications(token_id, subscription_id, sent_at, nmsg_nonce, nmsg_data) VALUES (?,?,?,?,?)"
692-
lastNtfRows :: M.Map SMPQueueNtf NtfSubscriptionId -> (NtfTokenId, TVar (NonEmpty PNMessageData)) -> IO [(NtfTokenId, NtfSubscriptionId, SystemTime, C.CbNonce, Binary ByteString)]
693-
lastNtfRows subLookup (tId, ntfs) = fmap catMaybes . mapM ntfRow . L.toList =<< readTVarIO ntfs
700+
filterLastNtfRows ntfs = do
701+
(skippedTkns, ntfCnt, (skippedQueues, ntfRows)) <- foldM lastNtfRows (S.empty, 0, (S.empty, [])) $ M.assocs ntfs
702+
let skipped = ntfCnt - length ntfRows
703+
when (skipped /= 0) $ putStrLn $ "Skipped last notifications " <> show skipped <> " for " <> show (S.size skippedTkns) <> " missing tokens and " <> show (S.size skippedQueues) <> " missing subscriptions with token present"
704+
pure ntfRows
705+
lastNtfRows (!stIds, !cnt, !acc) (tId, ntfVar) = do
706+
ntfs <- L.toList <$> readTVarIO ntfVar
707+
let cnt' = cnt + length ntfs
708+
pure $
709+
if S.member tId tIds
710+
then (stIds, cnt', foldl' ntfRow acc ntfs)
711+
else (S.insert tId stIds, cnt', acc)
694712
where
695-
ntfRow PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} = case M.lookup smpQueue subLookup of
696-
Just ntfSubId -> pure $ Just (tId, ntfSubId, ntfTs, nmsgNonce, Binary encNMsgMeta)
697-
Nothing -> Nothing <$ putStrLn ("Error: no subscription " <> show smpQueue <> " for notification of token " <> enc tId)
713+
ntfRow (!qs, !rows) PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} = case M.lookup smpQueue subLookup of
714+
Just ntfSubId ->
715+
let row = (tId, ntfSubId, ntfTs, nmsgNonce, Binary encNMsgMeta)
716+
in (qs, row : rows)
717+
Nothing -> (S.insert smpQueue qs, rows)
698718
checkCount name expected inserted
699719
| fromIntegral expected == inserted = do
700720
putStrLn $ "Imported " <> show inserted <> " " <> name <> "s."
@@ -711,12 +731,21 @@ exportNtfDbStore NtfPostgresStore {dbStoreLog = Nothing} _ =
711731
exportNtfDbStore NtfPostgresStore {dbStore = s, dbStoreLog = Just sl} lastNtfsFile =
712732
(,,) <$> exportTokens <*> exportSubscriptions <*> exportLastNtfs
713733
where
714-
exportTokens =
715-
withConnection s $ \db -> DB.fold_ db ntfTknQuery 0 $ \ !i tkn ->
734+
exportTokens = do
735+
tCnt <- withConnection s $ \db -> DB.fold_ db ntfTknQuery 0 $ \ !i tkn ->
716736
logCreateToken sl (rowToNtfTkn tkn) $> (i + 1)
717-
exportSubscriptions =
718-
withConnection s $ \db -> DB.fold_ db ntfSubQuery 0 $ \ !i sub ->
719-
logCreateSubscription sl (toNtfSub sub) $> (i + 1)
737+
putStrLn $ "Exported " <> show tCnt <> " tokens"
738+
pure tCnt
739+
exportSubscriptions = do
740+
sCnt <- withConnection s $ \db -> DB.fold_ db ntfSubQuery 0 $ \ !i sub -> do
741+
let i' = i + 1
742+
logCreateSubscription sl (toNtfSub sub)
743+
when (i' `mod` 500000 == 0) $ do
744+
putStr $ "Exported " <> show i' <> " subscriptions" <> "\r"
745+
hFlush stdout
746+
pure i'
747+
putStrLn $ "Exported " <> show sCnt <> " subscriptions"
748+
pure sCnt
720749
where
721750
ntfSubQuery =
722751
[sql|

tests/AgentTests/NotificationTests.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,15 +570,15 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
570570
threadDelay 500000
571571
suspendAgent alice 0
572572
closeDBStore store
573-
callCommand "sync"
573+
threadDelay 500000 >> callCommand "sync" >> threadDelay 500000
574574
putStrLn "before opening the database from another agent"
575575

576576
-- aliceNtf client doesn't have subscription and is allowed to get notification message
577577
withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> do
578578
(Just SMPMsgMeta {msgFlags = MsgFlags True}) :| _ <- getConnectionMessages aliceNtf [cId]
579579
pure ()
580580

581-
callCommand "sync"
581+
threadDelay 500000 >> callCommand "sync" >> threadDelay 500000
582582
putStrLn "after closing the database in another agent"
583583
reopenDBStore store
584584
foregroundAgent alice

tests/CLITests.hs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
{-# LANGUAGE CPP #-}
22
{-# LANGUAGE LambdaCase #-}
33
{-# LANGUAGE OverloadedStrings #-}
4+
{-# LANGUAGE NamedFieldPuns #-}
45

56
module CLITests where
67

78
import AgentTests.FunctionalAPITests (runRight_)
89
import Control.Logger.Simple
910
import Control.Monad
1011
import qualified Crypto.PubKey.RSA as RSA
12+
import qualified Data.ByteString.Char8 as B
1113
import qualified Data.ByteString.Lazy as BL
1214
import qualified Data.HashMap.Strict as HM
1315
import Data.Ini (Ini (..), lookupValue, readIniFile, writeIniFile)
@@ -41,8 +43,11 @@ import UnliftIO.Concurrent (threadDelay)
4143
import UnliftIO.Exception (bracket)
4244

4345
#if defined(dbServerPostgres)
44-
import NtfClient (ntfTestServerDBConnectInfo)
46+
import qualified Database.PostgreSQL.Simple as PSQL
47+
import Database.PostgreSQL.Simple.Types (Query (..))
48+
import NtfClient (ntfTestServerDBConnectInfo, ntfTestServerDBConnstr, ntfTestStoreDBOpts)
4549
import SMPClient (postgressBracket)
50+
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
4651
import Simplex.Messaging.Notifications.Server.Main
4752
#endif
4853

@@ -77,7 +82,7 @@ cliTests = do
7782
it "with store log, no password" $ smpServerTest True False
7883
it "static files" smpServerTestStatic
7984
#if defined(dbServerPostgres)
80-
aroundAll_ (postgressBracket ntfTestServerDBConnectInfo) $
85+
around_ (postgressBracket ntfTestServerDBConnectInfo) $ before_ (createNtfSchema ntfTestServerDBConnectInfo ntfTestStoreDBOpts) $
8186
describe "Ntf server CLI" $ do
8287
it "should initialize, start and delete the server (no store log)" $ ntfServerTest False
8388
it "should initialize, start and delete the server (with store log)" $ ntfServerTest True
@@ -192,9 +197,15 @@ smpServerTestStatic = do
192197
in map (X.signedObject . X.getSigned) cc
193198

194199
#if defined(dbServerPostgres)
200+
createNtfSchema :: PSQL.ConnectInfo -> DBOpts -> IO ()
201+
createNtfSchema connInfo DBOpts {schema} = do
202+
db <- PSQL.connect connInfo
203+
void $ PSQL.execute_ db $ Query $ "CREATE SCHEMA " <> schema
204+
PSQL.close db
205+
195206
ntfServerTest :: Bool -> IO ()
196207
ntfServerTest storeLog = do
197-
capture_ (withArgs (["init"] <> ["--disable-store-log" | not storeLog]) $ ntfServerCLI ntfCfgPath ntfLogPath)
208+
capture_ (withArgs (["init", "--database=" <> B.unpack ntfTestServerDBConnstr] <> ["--disable-store-log" | not storeLog]) $ ntfServerCLI ntfCfgPath ntfLogPath)
198209
>>= (`shouldSatisfy` (("Server initialized, you can modify configuration in " <> ntfCfgPath <> "/ntf-server.ini") `isPrefixOf`))
199210
Right ini <- readIniFile $ ntfCfgPath <> "/ntf-server.ini"
200211
lookupValue "STORE_LOG" "enable" ini `shouldBe` Right (if storeLog then "on" else "off")

0 commit comments

Comments
 (0)