Skip to content

Commit a3d1a72

Browse files
authored
agent: optimize subscriptions memory usage (#1651)
* agent: optimize subscriptions memory usage more (do not store subscribed queues in memory) WIP * use new session subscriptions data * version * remove old data structure * remove version * batch deletions * test TSessionSubs * comment
1 parent 17b71cf commit a3d1a72

File tree

10 files changed

+499
-456
lines changed

10 files changed

+499
-456
lines changed

simplexmq.cabal

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ library
108108
Simplex.Messaging.Agent.Store.Migrations.App
109109
Simplex.Messaging.Agent.Store.Postgres.Options
110110
Simplex.Messaging.Agent.Store.Shared
111-
Simplex.Messaging.Agent.TRcvQueues
111+
Simplex.Messaging.Agent.TSessionSubs
112112
Simplex.Messaging.Client
113113
Simplex.Messaging.Client.Agent
114114
Simplex.Messaging.Compression
@@ -474,7 +474,7 @@ test-suite simplexmq-test
474474
CoreTests.RetryIntervalTests
475475
CoreTests.SOCKSSettings
476476
CoreTests.StoreLogTests
477-
CoreTests.TRcvQueuesTests
477+
CoreTests.TSessionSubs
478478
CoreTests.UtilTests
479479
CoreTests.VersionRangeTests
480480
FileDescriptionTests

src/Simplex/Messaging/Agent.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -555,13 +555,14 @@ testProtocolServer c nm userId srv = withAgentEnv' c $ case protocolTypeI @p of
555555
-- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network
556556
setNetworkConfig :: AgentClient -> NetworkConfig -> IO ()
557557
setNetworkConfig c@AgentClient {useNetworkConfig, proxySessTs} cfg' = do
558-
(spChanged, changed) <- atomically $ do
558+
ts <- getCurrentTime
559+
changed <- atomically $ do
559560
(_, cfg) <- readTVar useNetworkConfig
560561
let changed = cfg /= cfg'
561562
!cfgSlow = slowNetworkConfig cfg'
562563
when changed $ writeTVar useNetworkConfig (cfgSlow, cfg')
563-
pure (socksProxy cfg /= socksProxy cfg', changed)
564-
when spChanged $ getCurrentTime >>= atomically . writeTVar proxySessTs
564+
when (socksProxy cfg /= socksProxy cfg') $ writeTVar proxySessTs ts
565+
pure changed
565566
when changed $ reconnectAllServers c
566567

567568
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()
@@ -1270,7 +1271,7 @@ subscribeConnections_ c conns = do
12701271
let (subRs, cs) = foldr partitionResultsConns ([], []) conns
12711272
resumeDelivery cs
12721273
resumeConnCmds c $ map fst cs
1273-
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs)
1274+
rcvRs <- lift $ connResults <$> subscribeQueues c (concatMap rcvQueues cs) False
12741275
rcvRs' <- storeClientServiceAssocs rcvRs
12751276
ns <- asks ntfSupervisor
12761277
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 175 additions & 128 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Agent/TRcvQueues.hs

Lines changed: 0 additions & 104 deletions
This file was deleted.
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
{-# LANGUAGE FlexibleInstances #-}
2+
{-# LANGUAGE LambdaCase #-}
3+
{-# LANGUAGE OverloadedStrings #-}
4+
5+
module Simplex.Messaging.Agent.TSessionSubs
6+
( TSessionSubs (sessionSubs),
7+
SessSubs (..),
8+
emptyIO,
9+
clear,
10+
hasActiveSub,
11+
hasPendingSub,
12+
addPendingSub,
13+
setSessionId,
14+
addActiveSub,
15+
batchAddPendingSubs,
16+
deletePendingSub,
17+
deleteSub,
18+
batchDeleteSubs,
19+
hasPendingSubs,
20+
getPendingSubs,
21+
getActiveSubs,
22+
setSubsPending,
23+
foldSessionSubs,
24+
mapSubs,
25+
)
26+
where
27+
28+
import Control.Concurrent.STM
29+
import Control.Monad
30+
import Data.Map.Strict (Map)
31+
import qualified Data.Map.Strict as M
32+
import Data.Maybe (isJust)
33+
import qualified Data.Set as S
34+
import Simplex.Messaging.Agent.Protocol (SMPQueue (..))
35+
import Simplex.Messaging.Agent.Store (RcvQueueSub (..), SomeRcvQueue)
36+
import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..))
37+
import Simplex.Messaging.Protocol (RecipientId)
38+
import Simplex.Messaging.TMap (TMap)
39+
import qualified Simplex.Messaging.TMap as TM
40+
import Simplex.Messaging.Transport
41+
import Simplex.Messaging.Util (($>>=))
42+
43+
data TSessionSubs = TSessionSubs
44+
{ sessionSubs :: TMap SMPTransportSession SessSubs
45+
}
46+
47+
data SessSubs = SessSubs
48+
{ subsSessId :: TVar (Maybe SessionId),
49+
activeSubs :: TMap RecipientId RcvQueueSub,
50+
pendingSubs :: TMap RecipientId RcvQueueSub
51+
}
52+
53+
emptyIO :: IO TSessionSubs
54+
emptyIO = TSessionSubs <$> TM.emptyIO
55+
{-# INLINE emptyIO #-}
56+
57+
clear :: TSessionSubs -> STM ()
58+
clear = TM.clear . sessionSubs
59+
{-# INLINE clear #-}
60+
61+
lookupSubs :: SMPTransportSession -> TSessionSubs -> STM (Maybe SessSubs)
62+
lookupSubs tSess = TM.lookup tSess . sessionSubs
63+
{-# INLINE lookupSubs #-}
64+
65+
getSessSubs :: SMPTransportSession -> TSessionSubs -> STM SessSubs
66+
getSessSubs tSess ss = lookupSubs tSess ss >>= maybe new pure
67+
where
68+
new = do
69+
s <- SessSubs <$> newTVar Nothing <*> newTVar M.empty <*> newTVar M.empty
70+
TM.insert tSess s $ sessionSubs ss
71+
pure s
72+
73+
hasActiveSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
74+
hasActiveSub = hasQueue_ activeSubs
75+
{-# INLINE hasActiveSub #-}
76+
77+
hasPendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
78+
hasPendingSub = hasQueue_ pendingSubs
79+
{-# INLINE hasPendingSub #-}
80+
81+
hasQueue_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> RecipientId -> SMPTransportSession -> TSessionSubs -> STM Bool
82+
hasQueue_ subs rId tSess ss = isJust <$> (lookupSubs tSess ss $>>= TM.lookup rId . subs)
83+
{-# INLINE hasQueue_ #-}
84+
85+
addPendingSub :: RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
86+
addPendingSub rq tSess ss = getSessSubs tSess ss >>= TM.insert (rcvId rq) rq . pendingSubs
87+
88+
setSessionId :: SessionId -> SMPTransportSession -> TSessionSubs -> STM ()
89+
setSessionId sessId tSess ss = do
90+
s <- getSessSubs tSess ss
91+
readTVar (subsSessId s) >>= \case
92+
Nothing -> writeTVar (subsSessId s) (Just sessId)
93+
Just sessId' -> unless (sessId == sessId') $ void $ setSubsPending_ s $ Just sessId
94+
95+
addActiveSub :: SessionId -> RcvQueueSub -> SMPTransportSession -> TSessionSubs -> STM ()
96+
addActiveSub sessId rq tSess ss = do
97+
s <- getSessSubs tSess ss
98+
sessId' <- readTVar $ subsSessId s
99+
let rId = rcvId rq
100+
if Just sessId == sessId'
101+
then do
102+
TM.insert rId rq $ activeSubs s
103+
TM.delete rId $ pendingSubs s
104+
else TM.insert rId rq $ pendingSubs s
105+
106+
batchAddPendingSubs :: [RcvQueueSub] -> SMPTransportSession -> TSessionSubs -> STM ()
107+
batchAddPendingSubs rqs tSess ss = do
108+
s <- getSessSubs tSess ss
109+
modifyTVar' (pendingSubs s) $ M.union $ M.fromList $ map (\rq -> (rcvId rq, rq)) rqs
110+
111+
deletePendingSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM ()
112+
deletePendingSub rId tSess = lookupSubs tSess >=> mapM_ (TM.delete rId . pendingSubs)
113+
114+
deleteSub :: RecipientId -> SMPTransportSession -> TSessionSubs -> STM ()
115+
deleteSub rId tSess = lookupSubs tSess >=> mapM_ (\s -> TM.delete rId (activeSubs s) >> TM.delete rId (pendingSubs s))
116+
117+
batchDeleteSubs :: SomeRcvQueue q => [q] -> SMPTransportSession -> TSessionSubs -> STM ()
118+
batchDeleteSubs rqs tSess = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs s) >> delete (pendingSubs s))
119+
where
120+
rIds = S.fromList $ map queueId rqs
121+
delete = (`modifyTVar'` (`M.withoutKeys` rIds))
122+
123+
hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool
124+
hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (fmap (not . null) . readTVar . pendingSubs)
125+
126+
getPendingSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
127+
getPendingSubs = getSubs_ pendingSubs
128+
{-# INLINE getPendingSubs #-}
129+
130+
getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
131+
getActiveSubs = getSubs_ activeSubs
132+
{-# INLINE getActiveSubs #-}
133+
134+
getSubs_ :: (SessSubs -> TMap RecipientId RcvQueueSub) -> SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
135+
getSubs_ subs tSess = lookupSubs tSess >=> maybe (pure M.empty) (readTVar . subs)
136+
137+
setSubsPending :: TransportSessionMode -> SMPTransportSession -> SessionId -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
138+
setSubsPending mode tSess@(uId, srv, connId_) sessId tss@(TSessionSubs ss)
139+
| entitySession == isJust connId_ =
140+
TM.lookup tSess ss >>= withSessSubs (`setSubsPending_` Nothing)
141+
| otherwise =
142+
TM.lookupDelete tSess ss >>= withSessSubs setPendingChangeMode
143+
where
144+
entitySession = mode == TSMEntity
145+
sessEntId = if entitySession then Just else const Nothing
146+
withSessSubs run = \case
147+
Nothing -> pure M.empty
148+
Just s -> do
149+
sessId' <- readTVar $ subsSessId s
150+
if Just sessId == sessId' then run s else pure M.empty
151+
setPendingChangeMode s = do
152+
subs <- M.union <$> readTVar (activeSubs s) <*> readTVar (pendingSubs s)
153+
unless (null subs) $
154+
forM_ subs $ \rq -> addPendingSub rq (uId, srv, sessEntId (connId rq)) tss
155+
pure subs
156+
157+
setSubsPending_ :: SessSubs -> Maybe SessionId -> STM (Map RecipientId RcvQueueSub)
158+
setSubsPending_ s sessId_ = do
159+
writeTVar (subsSessId s) sessId_
160+
let as = activeSubs s
161+
subs <- readTVar as
162+
unless (null subs) $ do
163+
writeTVar as M.empty
164+
modifyTVar' (pendingSubs s) $ M.union subs
165+
pure subs
166+
167+
foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a
168+
foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs
169+
170+
mapSubs :: (Map RecipientId RcvQueueSub -> a) -> SessSubs -> IO (a, a)
171+
mapSubs f s = do
172+
active <- readTVarIO $ activeSubs s
173+
pending <- readTVarIO $ pendingSubs s
174+
pure (f active, f pending)

src/Simplex/Messaging/Client.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
module Simplex.Messaging.Client
3030
( -- * Connect (disconnect) client to (from) SMP server
3131
TransportSession,
32+
SMPTransportSession,
3233
ProtocolClient (thParams, sessionTs),
3334
SMPClient,
3435
ProxiedRelay (..),
@@ -549,6 +550,8 @@ type UserId = Int64
549550
-- Please note that for SMP connection ID is used as entity ID, not queue ID.
550551
type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString)
551552

553+
type SMPTransportSession = TransportSession BrokerMsg
554+
552555
-- | Connects to 'ProtocolServer' using passed client configuration
553556
-- and queue for messages and notifications.
554557
--

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3569,6 +3569,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
35693569
liftIO $ threadDelay 250000
35703570
("", "", DOWN _ _) <- nGet a
35713571
("", "", UP _ _) <- nGet a
3572+
("", "", UP _ _) <- nGet a
35723573
a `hasClients` 2
35733574

35743575
exchangeGreetingsMsgId 4 a bId1 b aId1
@@ -3595,6 +3596,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do
35953596
("", "", DOWN _ _) <- nGet a
35963597
("", "", UP _ _) <- nGet a
35973598
("", "", UP _ _) <- nGet a
3599+
("", "", UP _ _) <- nGet a
3600+
("", "", UP _ _) <- nGet a
35983601
a `hasClients` 4
35993602
exchangeGreetingsMsgId 6 a bId1 b aId1
36003603
exchangeGreetingsMsgId 6 a bId1' b aId1'

0 commit comments

Comments
 (0)