Skip to content

Commit 6b60f8b

Browse files
committed
Revert "ntf server: optimize in-memory storage (#1516)"
This reverts commit afb338a.
1 parent aa9b93e commit 6b60f8b

File tree

4 files changed

+51
-79
lines changed

4 files changed

+51
-79
lines changed

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ smpSubscribeQueues party ca smp srv subs = do
358358
pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingSrvSubs ca)
359359
let acc@(_, _, oks, notPending) = foldr (groupSub pending) (False, [], [], []) (L.zip subs rs)
360360
unless (null oks) $ addSubscriptions ca srv party oks
361-
unless (null notPending) $ removePendingSubs ca srv party $ S.fromList notPending
361+
unless (null notPending) $ removePendingSubs ca srv party notPending
362362
pure acc
363363
sessId = sessionId $ thParams smp
364364
groupSub :: Map SMPSub C.APrivateAuthKey -> ((QueueId, C.APrivateAuthKey), Either SMPClientError ()) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId])
@@ -412,22 +412,14 @@ removeSubscription :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
412412
removeSubscription = removeSub_ . srvSubs
413413
{-# INLINE removeSubscription #-}
414414

415-
removePendingSub :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
416-
removePendingSub = removeSub_ . pendingSrvSubs
417-
{-# INLINE removePendingSub #-}
418-
419415
removeSub_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSub -> STM ()
420416
removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s)
421417

422-
removeSubscriptions :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
423-
removeSubscriptions = removeSubs_ . srvSubs
424-
{-# INLINE removeSubscriptions #-}
425-
426-
removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
418+
removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
427419
removePendingSubs = removeSubs_ . pendingSrvSubs
428420
{-# INLINE removePendingSubs #-}
429421

430-
removeSubs_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
422+
removeSubs_ :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey) -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
431423
removeSubs_ subs srv party qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` ss))
432424
where
433-
ss = S.map (party,) qs
425+
ss = S.fromList $ map (party,) qs

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -691,10 +691,9 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
691691
TDEL -> do
692692
logDebug "TDEL"
693693
st <- asks store
694-
ss <- atomically $ deleteNtfToken st tknId
695-
forM_ (M.assocs ss) $ \(smpServer, nIds) -> do
696-
atomically $ removeSubscriptions ca smpServer SPNotifier nIds
697-
atomically $ removePendingSubs ca smpServer SPNotifier nIds
694+
qs <- atomically $ deleteNtfToken st tknId
695+
forM_ qs $ \SMPQueueNtf {smpServer, notifierId} ->
696+
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
698697
cancelInvervalNotifications tknId
699698
withNtfLog (`logDeleteToken` tknId)
700699
incNtfStatT token tknDeleted
@@ -733,10 +732,9 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
733732
subId <- getId
734733
sub <- atomically $ mkNtfSubData subId newSub
735734
resp <-
736-
ifM
737-
(atomically $ addNtfSubscription st subId sub)
738-
(atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId)
739-
(pure $ NRErr AUTH)
735+
atomically (addNtfSubscription st subId sub) >>= \case
736+
Just _ -> atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId
737+
_ -> pure $ NRErr AUTH
740738
withNtfLog (`logCreateSubscription` sub)
741739
incNtfStat subCreated
742740
pure (corrId, NoEntity, resp)
@@ -758,7 +756,6 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
758756
st <- asks store
759757
atomically $ deleteNtfSubscription st subId
760758
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
761-
atomically $ removePendingSub ca smpServer (SPNotifier, notifierId)
762759
withNtfLog (`logDeleteSubscription` subId)
763760
incNtfStat subDeleted
764761
pure NROk

src/Simplex/Messaging/Notifications/Server/Store.hs

Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
{-# LANGUAGE OverloadedLists #-}
99
{-# LANGUAGE OverloadedStrings #-}
1010
{-# LANGUAGE ScopedTypeVariables #-}
11-
{-# LANGUAGE StandaloneDeriving #-}
1211

1312
module Simplex.Messaging.Notifications.Server.Store where
1413

@@ -17,16 +16,15 @@ import Control.Monad
1716
import Data.ByteString.Char8 (ByteString)
1817
import Data.Functor (($>))
1918
import Data.List.NonEmpty (NonEmpty (..), (<|))
20-
import Data.Map.Strict (Map)
2119
import qualified Data.Map.Strict as M
22-
import Data.Maybe (isNothing)
20+
import Data.Maybe (catMaybes)
2321
import Data.Set (Set)
2422
import qualified Data.Set as S
2523
import Data.Word (Word16)
2624
import qualified Simplex.Messaging.Crypto as C
2725
import Simplex.Messaging.Encoding.String
2826
import Simplex.Messaging.Notifications.Protocol
29-
import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
27+
import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
3028
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
3129
import Simplex.Messaging.TMap (TMap)
3230
import qualified Simplex.Messaging.TMap as TM
@@ -37,11 +35,8 @@ data NtfStore = NtfStore
3735
-- multiple registrations exist to protect from malicious registrations if token is compromised
3836
tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId),
3937
subscriptions :: TMap NtfSubscriptionId NtfSubData,
40-
-- the first set is used to delete from `subscriptions` when token is deleted, the second - to cancel SMP subsriptions.
41-
-- TODO [notifications] it can be simplified once NtfSubData is fully removed.
42-
tokenSubscriptions :: TMap NtfTokenId (TMap SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId))),
43-
-- TODO [notifications] for subscriptions that "migrated" to server subscription, we may replace NtfSubData with NtfTokenId here (Either NtfSubData NtfTokenId).
44-
subscriptionLookup :: TMap SMPServer (TMap NotifierId NtfSubData),
38+
tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)),
39+
subscriptionLookup :: TMap SMPQueueNtf NtfSubscriptionId,
4540
tokenLastNtfs :: TMap NtfTokenId (TVar (NonEmpty PNMessageData))
4641
}
4742

@@ -139,7 +134,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} =
139134
>>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs)
140135
k = C.toPubKey C.pubKeyBytes tknVerifyKey
141136

142-
deleteNtfToken :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
137+
deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
143138
deleteNtfToken st tknId = do
144139
void $
145140
TM.lookupDelete tknId (tokens st) $>>= \NtfTknData {token, tknVerifyKey} ->
@@ -152,25 +147,25 @@ deleteNtfToken st tknId = do
152147
regs = tokenRegistrations st
153148
regKey = C.toPubKey C.pubKeyBytes
154149

155-
deleteTokenSubs :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
156-
deleteTokenSubs st tknId =
157-
TM.lookupDelete tknId (tokenSubscriptions st)
158-
>>= maybe (pure M.empty) (readTVar >=> deleteSrvSubs)
150+
deleteTokenSubs :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
151+
deleteTokenSubs st tknId = do
152+
qs <-
153+
TM.lookupDelete tknId (tokenSubscriptions st)
154+
>>= mapM (readTVar >=> mapM deleteSub . S.toList)
155+
pure $ maybe [] catMaybes qs
159156
where
160-
deleteSrvSubs :: Map SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Map SMPServer (Set NotifierId))
161-
deleteSrvSubs = M.traverseWithKey $ \smpServer (sVar, nVar) -> do
162-
sIds <- readTVar sVar
163-
modifyTVar' (subscriptions st) (`M.withoutKeys` sIds)
164-
nIds <- readTVar nVar
165-
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (`modifyTVar'` (`M.withoutKeys` nIds))
166-
pure nIds
157+
deleteSub subId = do
158+
TM.lookupDelete subId (subscriptions st)
159+
$>>= \NtfSubData {smpQueue} ->
160+
TM.delete smpQueue (subscriptionLookup st) $> Just smpQueue
167161

168162
getNtfSubscriptionIO :: NtfStore -> NtfSubscriptionId -> IO (Maybe NtfSubData)
169163
getNtfSubscriptionIO st subId = TM.lookupIO subId (subscriptions st)
170164

171165
findNtfSubscription :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfSubData)
172-
findNtfSubscription st SMPQueueNtf {smpServer, notifierId} =
173-
TM.lookup smpServer (subscriptionLookup st) $>>= TM.lookup notifierId
166+
findNtfSubscription st smpQueue = do
167+
TM.lookup smpQueue (subscriptionLookup st)
168+
$>>= \subId -> TM.lookup subId (subscriptions st)
174169

175170
findNtfSubscriptionToken :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfTknData)
176171
findNtfSubscriptionToken st smpQueue = do
@@ -188,45 +183,30 @@ mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do
188183
subStatus <- newTVar NSNew
189184
pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey}
190185

191-
-- returns False if subscription existed before
192-
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM Bool
193-
addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} =
194-
TM.lookup tokenId (tokenSubscriptions st)
195-
>>= maybe newTokenSubs pure
196-
>>= \ts -> TM.lookup smpServer ts
197-
>>= maybe (newTokenSrvSubs ts) pure
198-
>>= insertSub
186+
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ())
187+
addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} =
188+
TM.lookup tokenId (tokenSubscriptions st) >>= maybe newTokenSub pure >>= insertSub
199189
where
200-
newTokenSubs = do
201-
ts <- newTVar M.empty
190+
newTokenSub = do
191+
ts <- newTVar S.empty
202192
TM.insert tokenId ts $ tokenSubscriptions st
203193
pure ts
204-
newTokenSrvSubs ts = do
205-
tss <- (,) <$> newTVar S.empty <*> newTVar S.empty
206-
TM.insert smpServer tss ts
207-
pure tss
208-
insertSub :: (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM Bool
209-
insertSub (sIds, nIds) = do
210-
modifyTVar' sIds $ S.insert subId
211-
modifyTVar' nIds $ S.insert notifierId
194+
insertSub ts = do
195+
modifyTVar' ts $ S.insert subId
212196
TM.insert subId sub $ subscriptions st
213-
TM.lookup smpServer (subscriptionLookup st)
214-
>>= maybe newSubs pure
215-
>>= fmap isNothing . TM.lookupInsert notifierId sub
216-
newSubs = do
217-
ss <- newTVar M.empty
218-
TM.insert smpServer ss $ subscriptionLookup st
219-
pure ss
197+
TM.insert smpQueue subId (subscriptionLookup st)
198+
-- return Nothing if subscription existed before
199+
pure $ Just ()
220200

221201
deleteNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM ()
222-
deleteNtfSubscription st subId = TM.lookupDelete subId (subscriptions st) >>= mapM_ deleteSubIndices
223-
where
224-
deleteSubIndices NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do
225-
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (TM.delete notifierId)
226-
tss_ <- TM.lookup tokenId (tokenSubscriptions st) $>>= TM.lookup smpServer
227-
forM_ tss_ $ \(sIds, nIds) -> do
228-
modifyTVar' sIds $ S.delete subId
229-
modifyTVar' nIds $ S.delete notifierId
202+
deleteNtfSubscription st subId = do
203+
TM.lookupDelete subId (subscriptions st)
204+
>>= mapM_
205+
( \NtfSubData {smpQueue, tokenId} -> do
206+
TM.delete smpQueue $ subscriptionLookup st
207+
ts_ <- TM.lookup tokenId (tokenSubscriptions st)
208+
forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId
209+
)
230210

231211
addTokenLastNtf :: NtfStore -> NtfTokenId -> PNMessageData -> IO (NonEmpty PNMessageData)
232212
addTokenLastNtf st tknId newNtf =

tests/ServerTests/SchemaDump.hs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations
1818
import Simplex.Messaging.Util (ifM)
1919
import System.Directory (doesFileExist, removeFile)
2020
import System.Environment (lookupEnv)
21-
import System.Process (readCreateProcess, shell)
21+
import System.Process (readCreateProcess, readCreateProcessWithExitCode, shell)
2222
import Test.Hspec
2323

2424
testDBSchema :: B.ByteString
@@ -87,7 +87,10 @@ getSchema schemaPath = do
8787
("pg_dump " <> B.unpack testServerDBConnstr <> " --schema " <> B.unpack testDBSchema)
8888
<> " --schema-only --no-owner --no-privileges --no-acl --no-subscriptions --no-tablespaces > "
8989
<> schemaPath
90-
void $ readCreateProcess (shell cmd) ""
90+
(code, out, err) <- readCreateProcessWithExitCode (shell cmd) ""
91+
print code
92+
putStrLn $ "out: " <> out
93+
putStrLn $ "err: " <> err
9194
threadDelay 20000
9295
let sed = (if ci then "sed -i" else "sed -i ''")
9396
void $ readCreateProcess (shell $ sed <> " '/^--/d' " <> schemaPath) ""

0 commit comments

Comments
 (0)