Skip to content

Commit afb338a

Browse files
authored
ntf server: optimize in-memory storage (#1516)
* ntf server: optimize in-memory storage * test * ntf server: fix store log parser for token status
1 parent 1e29f7c commit afb338a

File tree

4 files changed

+79
-51
lines changed

4 files changed

+79
-51
lines changed

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ smpSubscribeQueues party ca smp srv subs = do
358358
pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingSrvSubs ca)
359359
let acc@(_, _, oks, notPending) = foldr (groupSub pending) (False, [], [], []) (L.zip subs rs)
360360
unless (null oks) $ addSubscriptions ca srv party oks
361-
unless (null notPending) $ removePendingSubs ca srv party notPending
361+
unless (null notPending) $ removePendingSubs ca srv party $ S.fromList notPending
362362
pure acc
363363
sessId = sessionId $ thParams smp
364364
groupSub :: Map SMPSub C.APrivateAuthKey -> ((QueueId, C.APrivateAuthKey), Either SMPClientError ()) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId])
@@ -412,14 +412,22 @@ removeSubscription :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
412412
removeSubscription = removeSub_ . srvSubs
413413
{-# INLINE removeSubscription #-}
414414

415+
removePendingSub :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
416+
removePendingSub = removeSub_ . pendingSrvSubs
417+
{-# INLINE removePendingSub #-}
418+
415419
removeSub_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSub -> STM ()
416420
removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s)
417421

418-
removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
422+
removeSubscriptions :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
423+
removeSubscriptions = removeSubs_ . srvSubs
424+
{-# INLINE removeSubscriptions #-}
425+
426+
removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
419427
removePendingSubs = removeSubs_ . pendingSrvSubs
420428
{-# INLINE removePendingSubs #-}
421429

422-
removeSubs_ :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey) -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
430+
removeSubs_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
423431
removeSubs_ subs srv party qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` ss))
424432
where
425-
ss = S.fromList $ map (party,) qs
433+
ss = S.map (party,) qs

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -691,9 +691,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
691691
TDEL -> do
692692
logDebug "TDEL"
693693
st <- asks store
694-
qs <- atomically $ deleteNtfToken st tknId
695-
forM_ qs $ \SMPQueueNtf {smpServer, notifierId} ->
696-
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
694+
ss <- atomically $ deleteNtfToken st tknId
695+
forM_ (M.assocs ss) $ \(smpServer, nIds) -> do
696+
atomically $ removeSubscriptions ca smpServer SPNotifier nIds
697+
atomically $ removePendingSubs ca smpServer SPNotifier nIds
697698
cancelInvervalNotifications tknId
698699
withNtfLog (`logDeleteToken` tknId)
699700
incNtfStatT token tknDeleted
@@ -732,9 +733,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
732733
subId <- getId
733734
sub <- atomically $ mkNtfSubData subId newSub
734735
resp <-
735-
atomically (addNtfSubscription st subId sub) >>= \case
736-
Just _ -> atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId
737-
_ -> pure $ NRErr AUTH
736+
ifM
737+
(atomically $ addNtfSubscription st subId sub)
738+
(atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId)
739+
(pure $ NRErr AUTH)
738740
withNtfLog (`logCreateSubscription` sub)
739741
incNtfStat subCreated
740742
pure (corrId, NoEntity, resp)
@@ -756,6 +758,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
756758
st <- asks store
757759
atomically $ deleteNtfSubscription st subId
758760
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
761+
atomically $ removePendingSub ca smpServer (SPNotifier, notifierId)
759762
withNtfLog (`logDeleteSubscription` subId)
760763
incNtfStat subDeleted
761764
pure NROk

src/Simplex/Messaging/Notifications/Server/Store.hs

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
{-# LANGUAGE OverloadedLists #-}
99
{-# LANGUAGE OverloadedStrings #-}
1010
{-# LANGUAGE ScopedTypeVariables #-}
11+
{-# LANGUAGE StandaloneDeriving #-}
1112

1213
module Simplex.Messaging.Notifications.Server.Store where
1314

@@ -16,15 +17,16 @@ import Control.Monad
1617
import Data.ByteString.Char8 (ByteString)
1718
import Data.Functor (($>))
1819
import Data.List.NonEmpty (NonEmpty (..), (<|))
20+
import Data.Map.Strict (Map)
1921
import qualified Data.Map.Strict as M
20-
import Data.Maybe (catMaybes)
22+
import Data.Maybe (isNothing)
2123
import Data.Set (Set)
2224
import qualified Data.Set as S
2325
import Data.Word (Word16)
2426
import qualified Simplex.Messaging.Crypto as C
2527
import Simplex.Messaging.Encoding.String
2628
import Simplex.Messaging.Notifications.Protocol
27-
import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
29+
import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
2830
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
2931
import Simplex.Messaging.TMap (TMap)
3032
import qualified Simplex.Messaging.TMap as TM
@@ -35,8 +37,11 @@ data NtfStore = NtfStore
3537
-- multiple registrations exist to protect from malicious registrations if token is compromised
3638
tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId),
3739
subscriptions :: TMap NtfSubscriptionId NtfSubData,
38-
tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)),
39-
subscriptionLookup :: TMap SMPQueueNtf NtfSubscriptionId,
40+
-- the first set is used to delete from `subscriptions` when token is deleted, the second - to cancel SMP subsriptions.
41+
-- TODO [notifications] it can be simplified once NtfSubData is fully removed.
42+
tokenSubscriptions :: TMap NtfTokenId (TMap SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId))),
43+
-- TODO [notifications] for subscriptions that "migrated" to server subscription, we may replace NtfSubData with NtfTokenId here (Either NtfSubData NtfTokenId).
44+
subscriptionLookup :: TMap SMPServer (TMap NotifierId NtfSubData),
4045
tokenLastNtfs :: TMap NtfTokenId (TVar (NonEmpty PNMessageData))
4146
}
4247

@@ -134,7 +139,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} =
134139
>>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs)
135140
k = C.toPubKey C.pubKeyBytes tknVerifyKey
136141

137-
deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
142+
deleteNtfToken :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
138143
deleteNtfToken st tknId = do
139144
void $
140145
TM.lookupDelete tknId (tokens st) $>>= \NtfTknData {token, tknVerifyKey} ->
@@ -147,25 +152,25 @@ deleteNtfToken st tknId = do
147152
regs = tokenRegistrations st
148153
regKey = C.toPubKey C.pubKeyBytes
149154

150-
deleteTokenSubs :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
151-
deleteTokenSubs st tknId = do
152-
qs <-
153-
TM.lookupDelete tknId (tokenSubscriptions st)
154-
>>= mapM (readTVar >=> mapM deleteSub . S.toList)
155-
pure $ maybe [] catMaybes qs
155+
deleteTokenSubs :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
156+
deleteTokenSubs st tknId =
157+
TM.lookupDelete tknId (tokenSubscriptions st)
158+
>>= maybe (pure M.empty) (readTVar >=> deleteSrvSubs)
156159
where
157-
deleteSub subId = do
158-
TM.lookupDelete subId (subscriptions st)
159-
$>>= \NtfSubData {smpQueue} ->
160-
TM.delete smpQueue (subscriptionLookup st) $> Just smpQueue
160+
deleteSrvSubs :: Map SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Map SMPServer (Set NotifierId))
161+
deleteSrvSubs = M.traverseWithKey $ \smpServer (sVar, nVar) -> do
162+
sIds <- readTVar sVar
163+
modifyTVar' (subscriptions st) (`M.withoutKeys` sIds)
164+
nIds <- readTVar nVar
165+
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (`modifyTVar'` (`M.withoutKeys` nIds))
166+
pure nIds
161167

162168
getNtfSubscriptionIO :: NtfStore -> NtfSubscriptionId -> IO (Maybe NtfSubData)
163169
getNtfSubscriptionIO st subId = TM.lookupIO subId (subscriptions st)
164170

165171
findNtfSubscription :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfSubData)
166-
findNtfSubscription st smpQueue = do
167-
TM.lookup smpQueue (subscriptionLookup st)
168-
$>>= \subId -> TM.lookup subId (subscriptions st)
172+
findNtfSubscription st SMPQueueNtf {smpServer, notifierId} =
173+
TM.lookup smpServer (subscriptionLookup st) $>>= TM.lookup notifierId
169174

170175
findNtfSubscriptionToken :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfTknData)
171176
findNtfSubscriptionToken st smpQueue = do
@@ -183,30 +188,45 @@ mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do
183188
subStatus <- newTVar NSNew
184189
pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey}
185190

186-
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ())
187-
addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} =
188-
TM.lookup tokenId (tokenSubscriptions st) >>= maybe newTokenSub pure >>= insertSub
191+
-- returns False if subscription existed before
192+
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM Bool
193+
addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} =
194+
TM.lookup tokenId (tokenSubscriptions st)
195+
>>= maybe newTokenSubs pure
196+
>>= \ts -> TM.lookup smpServer ts
197+
>>= maybe (newTokenSrvSubs ts) pure
198+
>>= insertSub
189199
where
190-
newTokenSub = do
191-
ts <- newTVar S.empty
200+
newTokenSubs = do
201+
ts <- newTVar M.empty
192202
TM.insert tokenId ts $ tokenSubscriptions st
193203
pure ts
194-
insertSub ts = do
195-
modifyTVar' ts $ S.insert subId
204+
newTokenSrvSubs ts = do
205+
tss <- (,) <$> newTVar S.empty <*> newTVar S.empty
206+
TM.insert smpServer tss ts
207+
pure tss
208+
insertSub :: (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM Bool
209+
insertSub (sIds, nIds) = do
210+
modifyTVar' sIds $ S.insert subId
211+
modifyTVar' nIds $ S.insert notifierId
196212
TM.insert subId sub $ subscriptions st
197-
TM.insert smpQueue subId (subscriptionLookup st)
198-
-- return Nothing if subscription existed before
199-
pure $ Just ()
213+
TM.lookup smpServer (subscriptionLookup st)
214+
>>= maybe newSubs pure
215+
>>= fmap isNothing . TM.lookupInsert notifierId sub
216+
newSubs = do
217+
ss <- newTVar M.empty
218+
TM.insert smpServer ss $ subscriptionLookup st
219+
pure ss
200220

201221
deleteNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM ()
202-
deleteNtfSubscription st subId = do
203-
TM.lookupDelete subId (subscriptions st)
204-
>>= mapM_
205-
( \NtfSubData {smpQueue, tokenId} -> do
206-
TM.delete smpQueue $ subscriptionLookup st
207-
ts_ <- TM.lookup tokenId (tokenSubscriptions st)
208-
forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId
209-
)
222+
deleteNtfSubscription st subId = TM.lookupDelete subId (subscriptions st) >>= mapM_ deleteSubIndices
223+
where
224+
deleteSubIndices NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do
225+
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (TM.delete notifierId)
226+
tss_ <- TM.lookup tokenId (tokenSubscriptions st) $>>= TM.lookup smpServer
227+
forM_ tss_ $ \(sIds, nIds) -> do
228+
modifyTVar' sIds $ S.delete subId
229+
modifyTVar' nIds $ S.delete notifierId
210230

211231
addTokenLastNtf :: NtfStore -> NtfTokenId -> PNMessageData -> IO (NonEmpty PNMessageData)
212232
addTokenLastNtf st tknId newNtf =

tests/ServerTests/SchemaDump.hs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations
1818
import Simplex.Messaging.Util (ifM)
1919
import System.Directory (doesFileExist, removeFile)
2020
import System.Environment (lookupEnv)
21-
import System.Process (readCreateProcess, readCreateProcessWithExitCode, shell)
21+
import System.Process (readCreateProcess, shell)
2222
import Test.Hspec
2323

2424
testDBSchema :: B.ByteString
@@ -87,10 +87,7 @@ getSchema schemaPath = do
8787
("pg_dump " <> B.unpack testServerDBConnstr <> " --schema " <> B.unpack testDBSchema)
8888
<> " --schema-only --no-owner --no-privileges --no-acl --no-subscriptions --no-tablespaces > "
8989
<> schemaPath
90-
(code, out, err) <- readCreateProcessWithExitCode (shell cmd) ""
91-
print code
92-
putStrLn $ "out: " <> out
93-
putStrLn $ "err: " <> err
90+
void $ readCreateProcess (shell cmd) ""
9491
threadDelay 20000
9592
let sed = (if ci then "sed -i" else "sed -i ''")
9693
void $ readCreateProcess (shell $ sed <> " '/^--/d' " <> schemaPath) ""

0 commit comments

Comments
 (0)