Skip to content

Commit f0dc600

Browse files
committed
ntf server: remove shared queue for all notification subscriptions (#1543)
* ntf server: remove shared queue for all notification subscriptions * wait for subscriber with timeout * safer * refactor * log
1 parent f44ea0a commit f0dc600

File tree

2 files changed

+109
-91
lines changed

2 files changed

+109
-91
lines changed

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 101 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ module Simplex.Messaging.Notifications.Server where
1919

2020
import Control.Concurrent (threadDelay)
2121
import Control.Concurrent.Async (mapConcurrently)
22+
import qualified Control.Exception as E
2223
import Control.Logger.Simple
2324
import Control.Monad
2425
import Control.Monad.Except
@@ -68,8 +69,8 @@ import Simplex.Messaging.Server.Control (CPClientRole (..))
6869
import Simplex.Messaging.Server.Env.STM (StartOptions (..))
6970
import Simplex.Messaging.Server.QueueStore (getSystemDate)
7071
import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, periodStatDataCounts, updatePeriodStats)
72+
import Simplex.Messaging.Session
7173
import Simplex.Messaging.TMap (TMap)
72-
import qualified Simplex.Messaging.TMap as TM
7374
import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams)
7475
import Simplex.Messaging.Transport.Buffer (trimCR)
7576
import Simplex.Messaging.Transport.Server (AddHTTP, runTransportServer, runLocalTCPServer)
@@ -78,7 +79,8 @@ import System.Environment (lookupEnv)
7879
import System.Exit (exitFailure, exitSuccess)
7980
import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode)
8081
import System.Mem.Weak (deRefWeak)
81-
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, unliftIO, withFile)
82+
import System.Timeout (timeout)
83+
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, race_, unliftIO, withFile)
8284
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId)
8385
import UnliftIO.Directory (doesFileExist, renameFile)
8486
import UnliftIO.Exception
@@ -140,9 +142,13 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
140142
logNote "Saving server state..."
141143
saveServer
142144
NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber
143-
liftIO $ readTVarIO smpSubscribers >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (deRefWeak >=> mapM_ killThread))
145+
liftIO $ readTVarIO smpSubscribers >>= mapM_ stopSubscriber
144146
liftIO $ closeSMPClientAgent smpAgent
145147
logNote "Server stopped"
148+
where
149+
stopSubscriber v =
150+
atomically (tryReadTMVar $ sessionVar v)
151+
>>= mapM (deRefWeak . subThreadId >=> mapM_ killThread)
146152

147153
saveServer :: M ()
148154
saveServer = asks store >>= liftIO . closeNtfDbStore >> saveServerStats
@@ -292,7 +298,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
292298
| otherwise = (own, other + 1)
293299
where
294300
host = safeDecodeUtf8 $ strEncode h
295-
301+
296302

297303
controlPortThread_ :: NtfServerConfig -> [M ()]
298304
controlPortThread_ NtfServerConfig {controlPort = Just port} = [runCPServer port]
@@ -440,98 +446,101 @@ resubscribe NtfSubscriber {smpAgent = ca} = do
440446
afterSubId_' = Just $ fst $ last subs
441447
if len < dbBatchSize then pure n' else loop n' afterSubId_'
442448

449+
-- this function is concurrency-safe - only onle subscriber per server can be created at a time,
450+
-- other threads would wait for the first thread to create it.
451+
subscribeNtfs :: NtfSubscriber -> NtfPostgresStore -> SMPServer -> NonEmpty ServerNtfSub -> IO ()
452+
subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st smpServer ntfSubs =
453+
getSubscriberVar
454+
>>= either createSMPSubscriber waitForSMPSubscriber
455+
>>= mapM_ (\sub -> atomically $ writeTQueue (subscriberSubQ sub) ntfSubs)
456+
where
457+
getSubscriberVar :: IO (Either SMPSubscriberVar SMPSubscriberVar)
458+
getSubscriberVar = atomically . getSessVar subscriberSeq smpServer smpSubscribers =<< getCurrentTime
459+
460+
createSMPSubscriber :: SMPSubscriberVar -> IO (Maybe SMPSubscriber)
461+
createSMPSubscriber v =
462+
E.handle (\(e :: SomeException) -> logError ("SMP subscriber exception: " <> tshow e) >> removeSubscriber v) $ do
463+
q <- newTQueueIO
464+
tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber q)
465+
let sub = SMPSubscriber {smpServer, subscriberSubQ = q, subThreadId = tId}
466+
atomically $ putTMVar (sessionVar v) sub -- this makes it available for other threads
467+
pure $ Just sub
468+
469+
waitForSMPSubscriber :: SMPSubscriberVar -> IO (Maybe SMPSubscriber)
470+
waitForSMPSubscriber v =
471+
-- reading without timeout first to avoid creating extra thread for timeout
472+
atomically (tryReadTMVar $ sessionVar v)
473+
>>= maybe (timeout 10000000 $ atomically $ readTMVar $ sessionVar v) (pure . Just)
474+
>>= maybe (logError "SMP subscriber timeout" >> removeSubscriber v) (pure . Just)
475+
476+
-- create/waitForSMPSubscriber should never throw, removing it from map in case it did
477+
removeSubscriber v = do
478+
atomically $ removeSessVar v smpServer smpSubscribers
479+
pure Nothing
480+
481+
runSMPSubscriber :: TQueue (NonEmpty ServerNtfSub) -> IO ()
482+
runSMPSubscriber q = forever $ do
483+
-- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds
484+
-- this should be analysed once we have prometheus stats
485+
subs <- atomically $ readTQueue q
486+
updated <- batchUpdateSubStatus st subs NSPending
487+
logSubStatus smpServer "subscribing" (L.length subs) updated
488+
subscribeQueuesNtfs ca smpServer $ L.map snd subs
489+
443490
ntfSubscriber :: NtfSubscriber -> M ()
444-
ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do
445-
raceAny_ [subscribe, receiveSMP, receiveAgent]
491+
ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
492+
race_ receiveSMP receiveAgent
446493
where
447-
subscribe :: M ()
448-
subscribe = forever $ do
449-
(srv, subs) <- atomically $ readTBQueue newSubQ
450-
SMPSubscriber {subscriberSubQ} <- getSMPSubscriber srv
451-
atomically $ writeTQueue subscriberSubQ subs
452-
453-
-- TODO [ntfdb] this does not guarantee that only one subscriber per server is created (there should be TMVar in the map)
454-
-- This does not need changing if single newSubQ remains, but if it is removed, it need to change
455-
getSMPSubscriber :: SMPServer -> M SMPSubscriber
456-
getSMPSubscriber smpServer =
457-
liftIO (TM.lookupIO smpServer smpSubscribers) >>= maybe createSMPSubscriber pure
458-
where
459-
createSMPSubscriber = do
460-
sub@SMPSubscriber {subThreadId} <- liftIO $ newSMPSubscriber smpServer
461-
atomically $ TM.insert smpServer sub smpSubscribers
462-
tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber sub)
463-
atomically . writeTVar subThreadId $ Just tId
464-
pure sub
465-
466-
runSMPSubscriber :: SMPSubscriber -> M ()
467-
runSMPSubscriber SMPSubscriber {smpServer, subscriberSubQ} = do
494+
receiveSMP = do
468495
st <- asks store
469-
forever $ do
470-
-- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds
471-
-- this should be analysed once we have prometheus stats
472-
subs <- atomically $ readTQueue subscriberSubQ
473-
updated <- liftIO $ batchUpdateSubStatus st subs NSPending
474-
logSubStatus smpServer "subscribing" (L.length subs) updated
475-
liftIO $ subscribeQueuesNtfs ca smpServer $ L.map snd subs
476-
477-
receiveSMP :: M ()
478-
receiveSMP = forever $ do
479-
((_, srv, _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
480-
forM ts $ \(ntfId, t) -> case t of
481-
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
482-
STResponse {} -> pure () -- it was already reported as timeout error
483-
STEvent msgOrErr -> do
484-
let smpQueue = SMPQueueNtf srv ntfId
485-
case msgOrErr of
486-
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
487-
ntfTs <- liftIO getSystemTime
488-
st <- asks store
489-
NtfPushServer {pushQ} <- asks pushServer
490-
stats <- asks serverStats
491-
liftIO $ updatePeriodStats (activeSubs stats) ntfId
492-
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
493-
ntfs_ <- liftIO $ addTokenLastNtf st newNtf
494-
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
495-
incNtfStat ntfReceived
496-
Right SMP.END -> do
497-
whenM (atomically $ activeClientSession' ca sessionId srv) $ do
498-
st <- asks store
499-
void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd
500-
Right SMP.DELD -> do
501-
st <- asks store
502-
void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted
503-
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
504-
Right _ -> logError "SMP server unexpected response"
505-
Left e -> logError $ "SMP client error: " <> tshow e
496+
NtfPushServer {pushQ} <- asks pushServer
497+
stats <- asks serverStats
498+
liftIO $ forever $ do
499+
((_, srv, _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
500+
forM ts $ \(ntfId, t) -> case t of
501+
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
502+
STResponse {} -> pure () -- it was already reported as timeout error
503+
STEvent msgOrErr -> do
504+
let smpQueue = SMPQueueNtf srv ntfId
505+
case msgOrErr of
506+
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
507+
ntfTs <- getSystemTime
508+
updatePeriodStats (activeSubs stats) ntfId
509+
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
510+
ntfs_ <- addTokenLastNtf st newNtf
511+
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
512+
incNtfStat_ stats ntfReceived
513+
Right SMP.END ->
514+
whenM (atomically $ activeClientSession' ca sessionId srv) $
515+
void $ updateSrvSubStatus st smpQueue NSEnd
516+
Right SMP.DELD ->
517+
void $ updateSrvSubStatus st smpQueue NSDeleted
518+
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
519+
Right _ -> logError "SMP server unexpected response"
520+
Left e -> logError $ "SMP client error: " <> tshow e
506521

507522
receiveAgent = do
508523
st <- asks store
509-
forever $
524+
liftIO $ forever $
510525
atomically (readTBQueue agentQ) >>= \case
511526
CAConnected srv ->
512527
logInfo $ "SMP server reconnected " <> showServer' srv
513528
CADisconnected srv subs -> do
514529
forM_ (L.nonEmpty $ map snd $ S.toList subs) $ \nIds -> do
515-
updated <- liftIO $ batchUpdateSrvSubStatus st srv nIds NSInactive
530+
updated <- batchUpdateSrvSubStatus st srv nIds NSInactive
516531
logSubStatus srv "disconnected" (L.length nIds) updated
517532
CASubscribed srv _ nIds -> do
518-
updated <- liftIO $ batchUpdateSrvSubStatus st srv nIds NSActive
533+
updated <- batchUpdateSrvSubStatus st srv nIds NSActive
519534
logSubStatus srv "subscribed" (L.length nIds) updated
520535
CASubError srv _ errs -> do
521536
forM_ (L.nonEmpty $ mapMaybe (\(nId, err) -> (nId,) <$> subErrorStatus err) $ L.toList errs) $ \subStatuses -> do
522-
updated <- liftIO $ batchUpdateSrvSubStatuses st srv subStatuses
537+
updated <- batchUpdateSrvSubStatuses st srv subStatuses
523538
logSubErrors srv subStatuses updated
524539

525-
logSubStatus :: SMPServer -> T.Text -> Int -> Int64 -> M ()
526-
logSubStatus srv event n updated =
527-
logInfo $ "SMP server " <> event <> " " <> showServer' srv <> " (" <> tshow n <> " subs, " <> tshow updated <> " subs updated)"
528-
529-
logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int64 -> M ()
540+
logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int64 -> IO ()
530541
logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> do
531542
logError $ "SMP server subscription errors " <> showServer' srv <> ": " <> tshow (L.head ss) <> " (" <> tshow (length ss) <> " errors, " <> tshow updated <> " subs updated)"
532543

533-
showServer' = decodeLatin1 . strEncode . host
534-
535544
subErrorStatus :: SMPClientError -> Maybe NtfSubStatus
536545
subErrorStatus = \case
537546
PCEProtocolError AUTH -> Just NSAuth
@@ -549,6 +558,13 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
549558
updateErr :: Show e => ByteString -> e -> Maybe NtfSubStatus
550559
updateErr errType e = Just $ NSErr $ errType <> bshow e
551560

561+
logSubStatus :: SMPServer -> T.Text -> Int -> Int64 -> IO ()
562+
logSubStatus srv event n updated =
563+
logInfo $ "SMP server " <> event <> " " <> showServer' srv <> " (" <> tshow n <> " subs, " <> tshow updated <> " subs updated)"
564+
565+
showServer' :: SMPServer -> Text
566+
showServer' = decodeLatin1 . strEncode . host
567+
552568
ntfPush :: NtfPushServer -> M ()
553569
ntfPush s@NtfPushServer {pushQ} = forever $ do
554570
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
@@ -703,7 +719,7 @@ verifyNtfTransmission st auth_ (tAuth, authorized, (corrId, entId, _)) = \case
703719
e -> VRFailed e
704720

705721
client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M ()
706-
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ} =
722+
client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServer {pushQ} =
707723
forever $
708724
atomically (readTBQueue rcvQ)
709725
>>= mapM processCommand
@@ -781,7 +797,8 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
781797
resp <-
782798
withNtfStore (`addNtfSubscription` sub) $ \case
783799
True -> do
784-
atomically $ writeTBQueue newSubQ (srv, [(subId, (nId, nKey))])
800+
st <- asks store
801+
liftIO $ subscribeNtfs ns st srv [(subId, (nId, nKey))]
785802
incNtfStat subCreated
786803
pure $ NRSubId subId
787804
False -> pure $ NRErr AUTH
@@ -823,11 +840,15 @@ withNtfStore stAction continue = do
823840
incNtfStatT :: DeviceToken -> (NtfServerStats -> IORef Int) -> M ()
824841
incNtfStatT (DeviceToken PPApnsNull _) _ = pure ()
825842
incNtfStatT _ statSel = incNtfStat statSel
843+
{-# INLINE incNtfStatT #-}
826844

827845
incNtfStat :: (NtfServerStats -> IORef Int) -> M ()
828-
incNtfStat statSel = do
829-
stats <- asks serverStats
830-
liftIO $ atomicModifyIORef'_ (statSel stats) (+ 1)
846+
incNtfStat statSel = asks serverStats >>= liftIO . (`incNtfStat_` statSel)
847+
{-# INLINE incNtfStat #-}
848+
849+
incNtfStat_ :: NtfServerStats -> (NtfServerStats -> IORef Int) -> IO ()
850+
incNtfStat_ stats statSel = atomicModifyIORef'_ (statSel stats) (+ 1)
851+
{-# INLINE incNtfStat_ #-}
831852

832853
restoreServerLastNtfs :: NtfSTMStore -> FilePath -> IO ()
833854
restoreServerLastNtfs st f =

src/Simplex/Messaging/Notifications/Server/Env.hs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import Simplex.Messaging.Server.Env.STM (StartOptions (..))
3636
import Simplex.Messaging.Server.Expiration
3737
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
3838
import Simplex.Messaging.Server.StoreLog (closeStoreLog)
39+
import Simplex.Messaging.Session
3940
import Simplex.Messaging.TMap (TMap)
4041
import qualified Simplex.Messaging.TMap as TM
4142
import Simplex.Messaging.Transport (ATransport, THandleParams, TransportPeer (..))
@@ -113,30 +114,26 @@ newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsCo
113114
exitFailure
114115

115116
data NtfSubscriber = NtfSubscriber
116-
{ smpSubscribers :: TMap SMPServer SMPSubscriber,
117-
newSubQ :: TBQueue (SMPServer, NonEmpty ServerNtfSub),
117+
{ smpSubscribers :: TMap SMPServer SMPSubscriberVar,
118+
subscriberSeq :: TVar Int,
118119
smpAgent :: SMPClientAgent
119120
}
120121

122+
type SMPSubscriberVar = SessionVar SMPSubscriber
123+
121124
newNtfSubscriber :: Natural -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber
122125
newNtfSubscriber qSize smpAgentCfg random = do
123126
smpSubscribers <- TM.emptyIO
124-
newSubQ <- newTBQueueIO qSize
127+
subscriberSeq <- newTVarIO 0
125128
smpAgent <- newSMPClientAgent smpAgentCfg random
126-
pure NtfSubscriber {smpSubscribers, newSubQ, smpAgent}
129+
pure NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent}
127130

128131
data SMPSubscriber = SMPSubscriber
129132
{ smpServer :: SMPServer,
130133
subscriberSubQ :: TQueue (NonEmpty ServerNtfSub),
131-
subThreadId :: TVar (Maybe (Weak ThreadId))
134+
subThreadId :: Weak ThreadId
132135
}
133136

134-
newSMPSubscriber :: SMPServer -> IO SMPSubscriber
135-
newSMPSubscriber smpServer = do
136-
subscriberSubQ <- newTQueueIO
137-
subThreadId <- newTVarIO Nothing
138-
pure SMPSubscriber {smpServer, subscriberSubQ, subThreadId}
139-
140137
data NtfPushServer = NtfPushServer
141138
{ pushQ :: TBQueue (NtfTknRec, PushNotification),
142139
pushClients :: TMap PushProvider PushProviderClient,

0 commit comments

Comments
 (0)