Skip to content

Commit 9cf6c97

Browse files
ntf: batch ntf worker command processing (#1337)
* ntf: batch ntf worker command processing * remove comment * change batch size * wip * catch * refactor * refactor * batch check * refactor * reschedule * increase ntfSubCheckInterval * first check interval * check more statuses * refactor, remove foldr' * refactor 2 * refactor client * refactor 3 * ntf server: improve support for batched commands (#1340) * re-create ntf subscriptions on NTF AUTH errors * name --------- Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
1 parent 0739f7b commit 9cf6c97

File tree

8 files changed

+294
-157
lines changed

8 files changed

+294
-157
lines changed

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ module Simplex.Messaging.Agent.Client
7070
agentNtfDeleteToken,
7171
agentNtfEnableCron,
7272
agentNtfCreateSubscription,
73+
agentNtfCreateSubscriptions,
7374
agentNtfCheckSubscription,
75+
agentNtfCheckSubscriptions,
7476
agentNtfDeleteSubscription,
7577
agentXFTPDownloadChunk,
7678
agentXFTPNewChunk,
@@ -153,6 +155,7 @@ module Simplex.Messaging.Agent.Client
153155
incXFTPServerStat',
154156
incXFTPServerSizeStat,
155157
incNtfServerStat,
158+
incNtfServerStat',
156159
AgentWorkersDetails (..),
157160
getAgentWorkersDetails,
158161
AgentWorkersSummary (..),
@@ -1730,10 +1733,34 @@ agentNtfCreateSubscription :: AgentClient -> NtfTokenId -> NtfToken -> SMPQueueN
17301733
agentNtfCreateSubscription c tknId NtfToken {ntfServer, ntfPrivKey} smpQueue nKey =
17311734
withNtfClient c ntfServer tknId "SNEW" $ \ntf -> ntfCreateSubscription ntf ntfPrivKey (NewNtfSub tknId smpQueue nKey)
17321735

1733-
agentNtfCheckSubscription :: AgentClient -> NtfSubscriptionId -> NtfToken -> AM NtfSubStatus
1734-
agentNtfCheckSubscription c subId NtfToken {ntfServer, ntfPrivKey} =
1736+
agentNtfCreateSubscriptions :: AgentClient -> NtfToken -> NonEmpty (NewNtfEntity 'Subscription) -> AM' (NonEmpty (Either AgentErrorType NtfSubscriptionId))
1737+
agentNtfCreateSubscriptions = withNtfBatch "SNEW" ntfCreateSubscriptions
1738+
1739+
agentNtfCheckSubscription :: AgentClient -> NtfToken -> NtfSubscriptionId -> AM NtfSubStatus
1740+
agentNtfCheckSubscription c NtfToken {ntfServer, ntfPrivKey} subId =
17351741
withNtfClient c ntfServer subId "SCHK" $ \ntf -> ntfCheckSubscription ntf ntfPrivKey subId
17361742

1743+
agentNtfCheckSubscriptions :: AgentClient -> NtfToken -> NonEmpty NtfSubscriptionId -> AM' (NonEmpty (Either AgentErrorType NtfSubStatus))
1744+
agentNtfCheckSubscriptions = withNtfBatch "SCHK" ntfCheckSubscriptions
1745+
1746+
-- This batch sends all commands to one ntf server (client can only use one server at a time)
1747+
withNtfBatch ::
1748+
ByteString ->
1749+
(NtfClient -> C.APrivateAuthKey -> NonEmpty a -> IO (NonEmpty (Either NtfClientError r))) ->
1750+
AgentClient ->
1751+
NtfToken ->
1752+
NonEmpty a ->
1753+
AM' (NonEmpty (Either AgentErrorType r))
1754+
withNtfBatch cmdStr action c NtfToken {ntfServer, ntfPrivKey} subs = do
1755+
let tSess = (0, ntfServer, Nothing)
1756+
tryAgentError' (getNtfServerClient c tSess) >>= \case
1757+
Left e -> pure $ L.map (\_ -> Left e) subs
1758+
Right ntf -> liftIO $ do
1759+
logServer' "-->" c ntfServer (bshow (length subs) <> " subscriptions") cmdStr
1760+
L.map agentError <$> action ntf ntfPrivKey subs
1761+
where
1762+
agentError = first $ protocolClientError NTF $ clientServer ntf
1763+
17371764
agentNtfDeleteSubscription :: AgentClient -> NtfSubscriptionId -> NtfToken -> AM ()
17381765
agentNtfDeleteSubscription c subId NtfToken {ntfServer, ntfPrivKey} =
17391766
withNtfClient c ntfServer subId "SDEL" $ \ntf -> ntfDeleteSubscription ntf ntfPrivKey subId
@@ -2058,9 +2085,13 @@ incXFTPServerStat_ = incServerStat (\AgentClient {xftpServersStats = s} -> s) ne
20582085
{-# INLINE incXFTPServerStat_ #-}
20592086

20602087
incNtfServerStat :: AgentClient -> UserId -> NtfServer -> (AgentNtfServerStats -> TVar Int) -> STM ()
2061-
incNtfServerStat c userId srv sel = incServerStat (\AgentClient {ntfServersStats = s} -> s) newAgentNtfServerStats c userId srv sel 1
2088+
incNtfServerStat c userId srv sel = incNtfServerStat' c userId srv sel 1
20622089
{-# INLINE incNtfServerStat #-}
20632090

2091+
incNtfServerStat' :: AgentClient -> UserId -> NtfServer -> (AgentNtfServerStats -> TVar Int) -> Int -> STM ()
2092+
incNtfServerStat' = incServerStat (\AgentClient {ntfServersStats = s} -> s) newAgentNtfServerStats
2093+
{-# INLINE incNtfServerStat' #-}
2094+
20642095
incServerStat :: Num n => (AgentClient -> TMap (UserId, ProtocolServer p) s) -> STM s -> AgentClient -> UserId -> ProtocolServer p -> (s -> TVar n) -> n -> STM ()
20652096
incServerStat statsSel mkNewStats c userId srv sel n = do
20662097
TM.lookup (userId, srv) (statsSel c) >>= \case

src/Simplex/Messaging/Agent/Env/SQLite.hs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ data AgentConfig = AgentConfig
150150
deleteErrorCount :: Int,
151151
ntfCron :: Word16,
152152
ntfBatchSize :: Int,
153+
ntfSubFirstCheckInterval :: NominalDiffTime,
153154
ntfSubCheckInterval :: NominalDiffTime,
154155
caCertificateFile :: FilePath,
155156
privateKeyFile :: FilePath,
@@ -219,8 +220,9 @@ defaultAgentConfig =
219220
xftpMaxRecipientsPerRequest = 200,
220221
deleteErrorCount = 10,
221222
ntfCron = 20, -- minutes
222-
ntfBatchSize = 200,
223-
ntfSubCheckInterval = nominalDay,
223+
ntfBatchSize = 150,
224+
ntfSubFirstCheckInterval = nominalDay,
225+
ntfSubCheckInterval = 3 * nominalDay,
224226
-- CA certificate private key is not needed for initialization
225227
-- ! we do not generate these
226228
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",

0 commit comments

Comments
 (0)