Skip to content

Commit 7d0115d

Browse files
authored
ntf server, agent: send all periodic notifications from one thread, only to old active clients or new clients with periodic notification mode (#1528)
* ntf server, agent: send all periodic notifications from one thread, only to old active clients or new clients with periodic notification mode * send different type via subscription queues * option to compact store log on start
1 parent f024ab1 commit 7d0115d

File tree

11 files changed

+137
-131
lines changed

11 files changed

+137
-131
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2197,10 +2197,9 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode =
21972197
atomically $ nsUpdateToken ns tkn'
21982198
agentNtfCheckToken c tknId tkn' >>= \case
21992199
NTActive -> do
2200-
cron <- asks $ ntfCron . config
2201-
agentNtfEnableCron c tknId tkn cron
22022200
when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c
22032201
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCSmpDelete
2202+
lift $ setCronInterval c tknId tkn
22042203
t tkn' (NTActive, Just NTACheck) $ pure ()
22052204
status -> t tkn' (status, Nothing) $ pure ()
22062205
| otherwise -> replaceToken tknId
@@ -2261,11 +2260,17 @@ verifyNtfToken' c deviceToken nonce code =
22612260
withToken c tkn (Just (NTConfirmed, NTAVerify code')) (NTActive, Just NTACheck) $
22622261
agentNtfVerifyToken c tknId tkn code'
22632262
when (toStatus == NTActive) $ do
2264-
cron <- asks $ ntfCron . config
2265-
agentNtfEnableCron c tknId tkn cron
2263+
lift $ setCronInterval c tknId tkn
22662264
when (ntfMode == NMInstant) $ initializeNtfSubs c
22672265
_ -> throwE $ CMD PROHIBITED "verifyNtfToken: no token"
22682266

2267+
setCronInterval :: AgentClient -> NtfTokenId -> NtfToken -> AM' ()
2268+
setCronInterval c tknId tkn@NtfToken {ntfMode} = do
2269+
cron <- case ntfMode of
2270+
NMPeriodic -> asks $ ntfCron . config
2271+
_ -> pure 0
2272+
void $ forkIO $ void $ runExceptT $ agentNtfSetCronInterval c tknId tkn cron
2273+
22692274
checkNtfToken' :: AgentClient -> DeviceToken -> AM NtfTknStatus
22702275
checkNtfToken' c deviceToken =
22712276
withStore' c getSavedNtfToken >>= \case

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ module Simplex.Messaging.Agent.Client
7777
agentNtfCheckToken,
7878
agentNtfReplaceToken,
7979
agentNtfDeleteToken,
80-
agentNtfEnableCron,
80+
agentNtfSetCronInterval,
8181
agentNtfCreateSubscription,
8282
agentNtfCreateSubscriptions,
8383
agentNtfCheckSubscription,
@@ -1812,9 +1812,10 @@ agentNtfDeleteToken :: AgentClient -> NtfServer -> C.APrivateAuthKey -> NtfToken
18121812
agentNtfDeleteToken c ntfServer ntfPrivKey tknId =
18131813
withNtfClient c ntfServer tknId "TDEL" $ \ntf -> ntfDeleteToken ntf ntfPrivKey tknId
18141814

1815-
agentNtfEnableCron :: AgentClient -> NtfTokenId -> NtfToken -> Word16 -> AM ()
1816-
agentNtfEnableCron c tknId NtfToken {ntfServer, ntfPrivKey} interval =
1817-
withNtfClient c ntfServer tknId "TCRN" $ \ntf -> ntfEnableCron ntf ntfPrivKey tknId interval
1815+
-- set to 0 to disable
1816+
agentNtfSetCronInterval :: AgentClient -> NtfTokenId -> NtfToken -> Word16 -> AM ()
1817+
agentNtfSetCronInterval c tknId NtfToken {ntfServer, ntfPrivKey} interval =
1818+
withNtfClient c ntfServer tknId "TCRN" $ \ntf -> ntfSetCronInterval ntf ntfPrivKey tknId interval
18181819

18191820
agentNtfCreateSubscription :: AgentClient -> NtfTokenId -> NtfToken -> SMPQueueNtf -> SMP.NtfPrivateAuthKey -> AM NtfSubscriptionId
18201821
agentNtfCreateSubscription c tknId NtfToken {ntfServer, ntfPrivKey} smpQueue nKey =

src/Simplex/Messaging/Notifications/Client.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId
4949
ntfDeleteToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> ExceptT NtfClientError IO ()
5050
ntfDeleteToken = okNtfCommand TDEL
5151

52-
ntfEnableCron :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> Word16 -> ExceptT NtfClientError IO ()
53-
ntfEnableCron c pKey tknId int = okNtfCommand (TCRN int) c pKey tknId
52+
-- set to 0 to disable
53+
ntfSetCronInterval :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> Word16 -> ExceptT NtfClientError IO ()
54+
ntfSetCronInterval c pKey tknId int = okNtfCommand (TCRN int) c pKey tknId
5455

5556
ntfCreateSubscription :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Subscription -> ExceptT NtfClientError IO NtfSubscriptionId
5657
ntfCreateSubscription c pKey newSub =

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 44 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import qualified Data.Text as T
4040
import qualified Data.Text.IO as T
4141
import Data.Text.Encoding (decodeLatin1)
4242
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
43-
import Data.Time.Clock.System (getSystemTime)
43+
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
4444
import Data.Time.Format.ISO8601 (iso8601Show)
4545
import GHC.IORef (atomicSwapIORef)
4646
import GHC.Stats (getRTSStats)
@@ -76,7 +76,7 @@ import System.Environment (lookupEnv)
7676
import System.Exit (exitFailure, exitSuccess)
7777
import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode)
7878
import System.Mem.Weak (deRefWeak)
79-
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, async, uninterruptibleCancel, unliftIO, withFile)
79+
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, unliftIO, withFile)
8080
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId)
8181
import UnliftIO.Directory (doesFileExist, renameFile)
8282
import UnliftIO.Exception
@@ -108,6 +108,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
108108
raceAny_
109109
( ntfSubscriber s
110110
: ntfPush ps
111+
: periodicNtfsThread ps
111112
: map runServer transports
112113
<> serverStatsThread_ cfg
113114
<> prometheusMetricsThread_ cfg
@@ -252,7 +253,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
252253
ntfActiveSubs <- getSMPSubMetrics a srvSubs
253254
ntfPendingSubs <- getSMPSubMetrics a pendingSrvSubs
254255
smpSessionCount <- M.size <$> readTVarIO smpSessions
255-
apnsPushQLength <- fromIntegral <$> atomically (lengthTBQueue pushQ)
256+
apnsPushQLength <- atomically $ lengthTBQueue pushQ
256257
pure NtfRealTimeMetrics {threadsCount, srvSubscribers, srvClients, srvSubWorkers, ntfActiveSubs, ntfPendingSubs, smpSessionCount, apnsPushQLength}
257258
where
258259
getSMPSubMetrics :: SMPClientAgent -> TMap SMPServer (TMap SMPSub a) -> IO NtfSMPSubMetrics
@@ -463,16 +464,12 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
463464
-- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds
464465
-- this should be analysed once we have prometheus stats
465466
subs <- atomically $ readTQueue subscriberSubQ
466-
-- TODO [ntfdb] validate/partition that SMP server matches and log internal error if not
467467
updated <- liftIO $ batchUpdateSubStatus st subs NSPending
468468
logSubStatus smpServer "subscribing" (L.length subs) updated
469469
liftIO $ subscribeQueues smpServer subs
470470

471-
-- \| Subscribe to queues. The list of results can have a different order.
472-
subscribeQueues :: SMPServer -> NonEmpty NtfSubRec -> IO ()
473-
subscribeQueues srv subs = subscribeQueuesNtfs ca srv (L.map sub subs)
474-
where
475-
sub NtfSubRec {smpQueue = SMPQueueNtf {notifierId}, notifierKey} = (notifierId, notifierKey)
471+
subscribeQueues :: SMPServer -> NonEmpty ServerNtfSub -> IO ()
472+
subscribeQueues srv subs = subscribeQueuesNtfs ca srv (L.map snd subs)
476473

477474
receiveSMP :: M ()
478475
receiveSMP = forever $ do
@@ -492,7 +489,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
492489
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
493490
ntfs_ <- liftIO $ addTokenLastNtf st newNtf
494491
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
495-
-- TODO [ntfdb] track queued notifications separately?
492+
-- TODO [ntfdb] count queued notifications separately?
496493
incNtfStat ntfReceived
497494
Right SMP.END -> do
498495
whenM (atomically $ activeClientSession' ca sessionId srv) $ do
@@ -554,56 +551,70 @@ ntfPush :: NtfPushServer -> M ()
554551
ntfPush s@NtfPushServer {pushQ} = forever $ do
555552
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
556553
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
554+
st <- asks store
557555
case ntf of
558556
PNVerification _ ->
559-
deliverNotification pp tkn ntf >>= \case
557+
liftIO (deliverNotification st pp tkn ntf) >>= \case
560558
Right _ -> do
561-
st <- asks store
562559
void $ liftIO $ setTknStatusConfirmed st tkn
563560
incNtfStatT t ntfVrfDelivered
564561
Left _ -> incNtfStatT t ntfVrfFailed
565-
PNCheckMessages -> checkActiveTkn tknStatus $ do
566-
deliverNotification pp tkn ntf
567-
>>= incNtfStatT t . (\case Left _ -> ntfCronFailed; Right () -> ntfCronDelivered)
562+
PNCheckMessages -> do
563+
liftIO (deliverNotification st pp tkn ntf) >>= \case
564+
Right _ -> do
565+
void $ liftIO $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime
566+
incNtfStatT t ntfCronDelivered
567+
Left _ -> incNtfStatT t ntfCronFailed
568568
PNMessage {} -> checkActiveTkn tknStatus $ do
569569
stats <- asks serverStats
570570
liftIO $ updatePeriodStats (activeTokens stats) ntfTknId
571-
deliverNotification pp tkn ntf
571+
liftIO (deliverNotification st pp tkn ntf)
572572
>>= incNtfStatT t . (\case Left _ -> ntfFailed; Right () -> ntfDelivered)
573573
where
574574
checkActiveTkn :: NtfTknStatus -> M () -> M ()
575575
checkActiveTkn status action
576576
| status == NTActive = action
577577
| otherwise = liftIO $ logError "bad notification token status"
578-
deliverNotification :: PushProvider -> NtfTknRec -> PushNotification -> M (Either PushProviderError ())
579-
deliverNotification pp tkn@NtfTknRec {ntfTknId} ntf = do
580-
deliver <- liftIO $ getPushClient s pp
581-
liftIO (runExceptT $ deliver tkn ntf) >>= \case
578+
deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ())
579+
deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do
580+
deliver <- getPushClient s pp
581+
runExceptT (deliver tkn ntf) >>= \case
582582
Right _ -> pure $ Right ()
583583
Left e -> case e of
584584
PPConnection _ -> retryDeliver
585585
PPRetryLater -> retryDeliver
586586
PPCryptoError _ -> err e
587587
PPResponseError {} -> err e
588588
PPTokenInvalid r -> do
589-
st <- asks store
590-
void $ liftIO $ updateTknStatus st tkn $ NTInvalid $ Just r
589+
void $ updateTknStatus st tkn $ NTInvalid $ Just r
591590
err e
592591
PPPermanentError -> err e
593592
where
594-
retryDeliver :: M (Either PushProviderError ())
593+
retryDeliver :: IO (Either PushProviderError ())
595594
retryDeliver = do
596-
deliver <- liftIO $ newPushClient s pp
597-
liftIO (runExceptT $ deliver tkn ntf) >>= \case
595+
deliver <- newPushClient s pp
596+
runExceptT (deliver tkn ntf) >>= \case
598597
Right _ -> pure $ Right ()
599598
Left e -> case e of
600599
PPTokenInvalid r -> do
601-
st <- asks store
602-
void $ liftIO $ updateTknStatus st tkn $ NTInvalid $ Just r
600+
void $ updateTknStatus st tkn $ NTInvalid $ Just r
603601
err e
604602
_ -> err e
605603
err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e
606604

605+
-- TODO [ntfdb] this could be further improved by sending periodic notifications directly from this thread,
606+
-- without any queue
607+
periodicNtfsThread :: NtfPushServer -> M ()
608+
periodicNtfsThread NtfPushServer {pushQ} = do
609+
st <- asks store
610+
ntfsInterval <- asks $ periodicNtfsInterval . config
611+
let interval = 1000000 * ntfsInterval
612+
liftIO $ forever $ do
613+
threadDelay interval
614+
now <- systemSeconds <$> getSystemTime
615+
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
616+
logInfo $ "Scheduled periodic notifications: " <> tshow cnt
617+
607618
runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M ()
608619
runNtfClientTransport th@THandle {params} = do
609620
qSize <- asks $ clientQSize . config
@@ -692,7 +703,7 @@ verifyNtfTransmission st auth_ (tAuth, authorized, (corrId, entId, _)) = \case
692703
e -> VRFailed e
693704

694705
client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M ()
695-
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ, intervalNotifiers} =
706+
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ} =
696707
forever $
697708
atomically (readTBQueue rcvQ)
698709
>>= mapM processCommand
@@ -728,11 +739,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
728739
TVFY code -- this allows repeated verification for cases when client connection dropped before server response
729740
| (tknStatus == NTRegistered || tknStatus == NTConfirmed || tknStatus == NTActive) && tknRegCode == code -> do
730741
logDebug "TVFY - token verified"
731-
withNtfStore (`setTokenActive` tkn) $ \tIds -> do
732-
-- TODO [ntfdb] this will be unnecessary if all cron notifications move to one thread
733-
forM_ tIds cancelInvervalNotifications
734-
incNtfStatT token tknVerified
735-
pure NROk
742+
withNtfStore (`setTokenActive` tkn) $ \_ -> NROk <$ incNtfStatT token tknVerified
736743
| otherwise -> do
737744
logDebug "TVFY - incorrect code or token status"
738745
pure $ NRErr AUTH
@@ -754,45 +761,24 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
754761
forM_ ss $ \(smpServer, nIds) -> do
755762
atomically $ removeSubscriptions ca smpServer SPNotifier nIds
756763
atomically $ removePendingSubs ca smpServer SPNotifier nIds
757-
cancelInvervalNotifications tknId
758764
incNtfStatT token tknDeleted
759765
pure NROk
760766
TCRN 0 -> do
761767
logDebug "TCRN 0"
762-
withNtfStore (\st -> updateTknCronInterval st ntfTknId 0) $ \_ -> do
763-
-- TODO [ntfdb] move cron intervals to one thread
764-
cancelInvervalNotifications tknId
765-
pure NROk
768+
withNtfStore (\st -> updateTknCronInterval st ntfTknId 0) $ \_ -> pure NROk
766769
TCRN int
767770
| int < 20 -> pure $ NRErr QUOTA
768771
| otherwise -> do
769772
logDebug "TCRN"
770-
withNtfStore (\st -> updateTknCronInterval st ntfTknId int) $ \_ -> do
771-
-- TODO [ntfdb] move cron intervals to one thread
772-
liftIO (TM.lookupIO tknId intervalNotifiers) >>= \case
773-
Nothing -> runIntervalNotifier int
774-
Just IntervalNotifier {interval, action} ->
775-
unless (interval == int) $ do
776-
uninterruptibleCancel action
777-
runIntervalNotifier int
778-
pure NROk
779-
where
780-
runIntervalNotifier interval = do
781-
action <- async . intervalNotifier $ fromIntegral interval * 1000000 * 60
782-
let notifier = IntervalNotifier {action, token = tkn, interval}
783-
atomically $ TM.insert tknId notifier intervalNotifiers
784-
where
785-
intervalNotifier delay = forever $ do
786-
liftIO $ threadDelay' delay
787-
atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
788-
NtfReqNew corrId (ANE SSubscription newSub@(NewNtfSub _ (SMPQueueNtf srv _) _)) -> do
773+
withNtfStore (\st -> updateTknCronInterval st ntfTknId int) $ \_ -> pure NROk
774+
NtfReqNew corrId (ANE SSubscription newSub@(NewNtfSub _ (SMPQueueNtf srv nId) nKey)) -> do
789775
logDebug "SNEW - new subscription"
790776
subId <- getId
791777
let sub = mkNtfSubRec subId newSub
792778
resp <-
793779
withNtfStore (`addNtfSubscription` sub) $ \case
794780
True -> do
795-
atomically $ writeTBQueue newSubQ (srv, [sub])
781+
atomically $ writeTBQueue newSubQ (srv, [(subId, (nId, nKey))])
796782
incNtfStat subCreated
797783
pure $ NRSubId subId
798784
False -> pure $ NRErr AUTH
@@ -823,10 +809,6 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
823809
getRegCode = NtfRegCode <$> (randomBytes =<< asks (regCodeBytes . config))
824810
randomBytes :: Int -> M ByteString
825811
randomBytes n = atomically . C.randomBytes n =<< asks random
826-
cancelInvervalNotifications :: NtfTokenId -> M ()
827-
cancelInvervalNotifications tknId =
828-
atomically (TM.lookupDelete tknId intervalNotifiers)
829-
>>= mapM_ (uninterruptibleCancel . action)
830812

831813
withNtfStore :: (NtfPostgresStore -> IO (Either ErrorType a)) -> (a -> M NtfResponse) -> M NtfResponse
832814
withNtfStore stAction continue = do

0 commit comments

Comments
 (0)