1818#endif
1919{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
2020
21- module Simplex.Messaging.Server.Env.STM where
21+ module Simplex.Messaging.Server.Env.STM
22+ ( ServerConfig (.. ),
23+ ServerStoreCfg (.. ),
24+ AServerStoreCfg (.. ),
25+ StorePaths (.. ),
26+ StartOptions (.. ),
27+ Env (.. ),
28+ Server (.. ),
29+ ServerSubscribers (.. ),
30+ SubscribedClients ,
31+ ProxyAgent (.. ),
32+ Client (.. ),
33+ AClient (.. ),
34+ ClientId ,
35+ Subscribed ,
36+ Sub (.. ),
37+ ServerSub (.. ),
38+ SubscriptionThread (.. ),
39+ MsgStore ,
40+ AMsgStore (.. ),
41+ AStoreType (.. ),
42+ newEnv ,
43+ mkJournalStoreConfig ,
44+ newClient ,
45+ getServerClients ,
46+ getServerClient ,
47+ insertServerClient ,
48+ deleteServerClient ,
49+ getSubscribedClients ,
50+ getSubscribedClient ,
51+ upsertSubscribedClient ,
52+ lookupDeleteSubscribedClient ,
53+ deleteSubcribedClient ,
54+ sameClientId ,
55+ sameClient ,
56+ clientId' ,
57+ newSubscription ,
58+ newProhibitedSub ,
59+ defaultMsgQueueQuota ,
60+ defMsgExpirationDays ,
61+ defNtfExpirationHours ,
62+ defaultMessageExpiration ,
63+ defaultNtfExpiration ,
64+ defaultInactiveClientExpiration ,
65+ defaultProxyClientConcurrency ,
66+ defaultMaxJournalMsgCount ,
67+ defaultMaxJournalStateLines ,
68+ defaultIdleQueueInterval ,
69+ journalMsgStoreDepth ,
70+ readWriteQueueStore ,
71+ noPostgresExit ,
72+ )
73+ where
2274
2375import Control.Concurrent (ThreadId )
2476import Control.Logger.Simple
@@ -29,9 +81,12 @@ import Data.ByteString.Char8 (ByteString)
2981import Data.Int (Int64 )
3082import Data.IntMap.Strict (IntMap )
3183import qualified Data.IntMap.Strict as IM
84+ import Data.IntSet (IntSet )
85+ import qualified Data.IntSet as IS
3286import Data.Kind (Constraint )
3387import Data.List (intercalate )
3488import Data.List.NonEmpty (NonEmpty )
89+ import Data.Map.Strict (Map )
3590import Data.Maybe (isJust )
3691import qualified Data.Text as T
3792import Data.Time.Clock (getCurrentTime , nominalDay )
@@ -66,6 +121,7 @@ import Simplex.Messaging.TMap (TMap)
66121import qualified Simplex.Messaging.TMap as TM
67122import Simplex.Messaging.Transport (ATransport , VersionRangeSMP , VersionSMP )
68123import Simplex.Messaging.Transport.Server
124+ import Simplex.Messaging.Util (ifM , whenM , ($>>=) )
69125import System.Directory (doesFileExist )
70126import System.Exit (exitFailure )
71127import System.IO (IOMode (.. ))
@@ -203,7 +259,6 @@ data Env = Env
203259 serverStats :: ServerStats ,
204260 sockets :: TVar [(ServiceName , SocketState )],
205261 clientSeq :: TVar ClientId ,
206- clients :: TVar (IntMap (Maybe AClient )),
207262 proxyAgent :: ProxyAgent -- senders served on this proxy
208263 }
209264
@@ -236,17 +291,72 @@ data AMsgStore =
236291type Subscribed = Bool
237292
238293data Server = Server
239- { subscribedQ :: TQueue (RecipientId , ClientId , Subscribed ),
240- subscribers :: TMap RecipientId (TVar AClient ),
241- ntfSubscribedQ :: TQueue (NotifierId , ClientId , Subscribed ),
242- notifiers :: TMap NotifierId (TVar AClient ),
243- subClients :: TVar (IntMap AClient ), -- clients with SMP subscriptions
244- ntfSubClients :: TVar (IntMap AClient ), -- clients with Ntf subscriptions
245- pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId , Subscribed ))),
246- pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId , Subscribed ))),
294+ { clients :: ServerClients ,
295+ subscribers :: ServerSubscribers ,
296+ ntfSubscribers :: ServerSubscribers ,
247297 savingLock :: Lock
248298 }
249299
300+ -- not exported, to prevent concurrent IntMap lookups inside STM transactions.
301+ newtype ServerClients = ServerClients { serverClients :: TVar (IntMap AClient )}
302+
303+ data ServerSubscribers = ServerSubscribers
304+ { subQ :: TQueue (QueueId , ClientId , Subscribed ),
305+ queueSubscribers :: SubscribedClients ,
306+ subClients :: TVar IntSet ,
307+ pendingEvents :: TVar (IntMap (NonEmpty (EntityId , BrokerMsg )))
308+ }
309+
310+ -- not exported, to prevent accidental concurrent Map lookups inside STM transactions.
311+ -- Map stores TVars with pointers to the clients rather than client ID to allow reading the same TVar
312+ -- inside transactions to ensure that transaction is re-evaluated in case subscriber changes.
313+ -- Storing Maybe allows to have continuity of subscription when the same user client disconnects and re-connects -
314+ -- any STM transaction that reads subscribed client will re-evaluate in this case.
315+ -- The subscriptions that were made at any point are not removed -
316+ -- this is a better trade-off with intermittently connected mobile clients.
317+ data SubscribedClients = SubscribedClients (TMap EntityId (TVar (Maybe AClient )))
318+
319+ getSubscribedClients :: SubscribedClients -> IO (Map EntityId (TVar (Maybe AClient )))
320+ getSubscribedClients (SubscribedClients cs) = readTVarIO cs
321+
322+ getSubscribedClient :: EntityId -> SubscribedClients -> IO (Maybe (TVar (Maybe AClient )))
323+ getSubscribedClient entId (SubscribedClients cs) = TM. lookupIO entId cs
324+ {-# INLINE getSubscribedClient #-}
325+
326+ -- insert subscribed and current client, return previously subscribed client if it is different
327+ upsertSubscribedClient :: EntityId -> AClient -> SubscribedClients -> STM (Maybe AClient )
328+ upsertSubscribedClient entId ac@ (AClient _ _ c) (SubscribedClients cs) =
329+ TM. lookup entId cs >>= \ case
330+ Nothing -> Nothing <$ TM. insertM entId (newTVar (Just ac)) cs
331+ Just cv ->
332+ readTVar cv >>= \ case
333+ Just c' | sameClientId c c' -> pure Nothing
334+ c_ -> c_ <$ writeTVar cv (Just ac)
335+
336+ -- lookup and delete currently subscribed client
337+ lookupDeleteSubscribedClient :: EntityId -> SubscribedClients -> STM (Maybe AClient )
338+ lookupDeleteSubscribedClient entId (SubscribedClients cs) =
339+ TM. lookupDelete entId cs $>>= (`swapTVar` Nothing )
340+
341+ deleteSubcribedClient :: EntityId -> Client s -> SubscribedClients -> IO ()
342+ deleteSubcribedClient entId c (SubscribedClients cs) =
343+ -- lookup of the subscribed client TVar can be in separate transaction,
344+ -- as long as the client is read in the same transaction -
345+ -- it prevents removing the next subscribed client and also avoids STM contention for the Map.
346+ TM. lookupIO entId cs >>= mapM_ (\ cv -> atomically $ whenM (sameClient c cv) $ delete cv)
347+ where
348+ delete cv = do
349+ writeTVar cv Nothing
350+ TM. delete entId cs
351+
352+ sameClientId :: Client s -> AClient -> Bool
353+ sameClientId Client {clientId} ac = clientId == clientId' ac
354+ {-# INLINE sameClientId #-}
355+
356+ sameClient :: Client s -> TVar (Maybe AClient ) -> STM Bool
357+ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
358+ {-# INLINE sameClient #-}
359+
250360newtype ProxyAgent = ProxyAgent
251361 { smpAgent :: SMPClientAgent
252362 }
@@ -288,16 +398,40 @@ data Sub = Sub
288398
289399newServer :: IO Server
290400newServer = do
291- subscribedQ <- newTQueueIO
292- subscribers <- TM. emptyIO
293- ntfSubscribedQ <- newTQueueIO
294- notifiers <- TM. emptyIO
295- subClients <- newTVarIO IM. empty
296- ntfSubClients <- newTVarIO IM. empty
297- pendingSubEvents <- newTVarIO IM. empty
298- pendingNtfSubEvents <- newTVarIO IM. empty
401+ clients <- ServerClients <$> newTVarIO mempty
402+ subscribers <- newServerSubscribers
403+ ntfSubscribers <- newServerSubscribers
299404 savingLock <- createLockIO
300- return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock}
405+ return Server {clients, subscribers, ntfSubscribers, savingLock}
406+
407+ getServerClients :: Server -> IO (IntMap AClient )
408+ getServerClients = readTVarIO . serverClients . clients
409+ {-# INLINE getServerClients #-}
410+
411+ getServerClient :: ClientId -> Server -> IO (Maybe AClient )
412+ getServerClient cId s = IM. lookup cId <$> getServerClients s
413+ {-# INLINE getServerClient #-}
414+
415+ insertServerClient :: AClient -> Server -> IO Bool
416+ insertServerClient ac@ (AClient _ _ Client {clientId, connected}) Server {clients} =
417+ atomically $
418+ ifM
419+ (readTVar connected)
420+ (True <$ modifyTVar' (serverClients clients) (IM. insert clientId ac))
421+ (pure False )
422+ {-# INLINE insertServerClient #-}
423+
424+ deleteServerClient :: ClientId -> Server -> IO ()
425+ deleteServerClient cId Server {clients} = atomically $ modifyTVar' (serverClients clients) $ IM. delete cId
426+ {-# INLINE deleteServerClient #-}
427+
428+ newServerSubscribers :: IO ServerSubscribers
429+ newServerSubscribers = do
430+ subQ <- newTQueueIO
431+ queueSubscribers <- SubscribedClients <$> TM. emptyIO
432+ subClients <- newTVarIO IS. empty
433+ pendingEvents <- newTVarIO IM. empty
434+ pure ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents}
301435
302436newClient :: SQSType qs -> SMSType ms -> ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO (Client (MsgStore qs ms ))
303437newClient _ _ clientId qSize thVersion sessionId createdAt = do
@@ -312,7 +446,24 @@ newClient _ _ clientId qSize thVersion sessionId createdAt = do
312446 connected <- newTVarIO True
313447 rcvActiveAt <- newTVarIO createdAt
314448 sndActiveAt <- newTVarIO createdAt
315- return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
449+ return
450+ Client
451+ { clientId,
452+ subscriptions,
453+ ntfSubscriptions,
454+ rcvQ,
455+ sndQ,
456+ msgQ,
457+ procThreads,
458+ endThreads,
459+ endThreadSeq,
460+ thVersion,
461+ sessionId,
462+ connected,
463+ createdAt,
464+ rcvActiveAt,
465+ sndActiveAt
466+ }
316467
317468newSubscription :: SubscriptionThread -> STM Sub
318469newSubscription st = do
@@ -362,9 +513,24 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
362513 serverStats <- newServerStats =<< getCurrentTime
363514 sockets <- newTVarIO []
364515 clientSeq <- newTVarIO 0
365- clients <- newTVarIO mempty
366516 proxyAgent <- newSMPProxyAgent smpAgentCfg random
367- pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
517+ pure
518+ Env
519+ { serverActive,
520+ config,
521+ serverInfo,
522+ server,
523+ serverIdentity,
524+ msgStore,
525+ ntfStore,
526+ random,
527+ tlsServerCreds,
528+ httpServerCreds,
529+ serverStats,
530+ sockets,
531+ clientSeq,
532+ proxyAgent
533+ }
368534 where
369535 loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q ) -> FilePath -> STMQueueStore q -> IO ()
370536 loadStoreLog mkQ f st = do
0 commit comments