Skip to content

Commit 11a4859

Browse files
authored
agent: batch processing of subscription results and errors (#1652)
* agent: batch processing of subscription results and errors * run agent tests with in-memory server storage * version * non empty errors * size
1 parent a3d1a72 commit 11a4859

File tree

7 files changed

+170
-130
lines changed

7 files changed

+170
-130
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ import Simplex.Messaging.Agent.Store.Common (DBStore)
182182
import qualified Simplex.Messaging.Agent.Store.DB as DB
183183
import Simplex.Messaging.Agent.Store.Interface (closeDBStore, execSQL, getCurrentMigrations)
184184
import Simplex.Messaging.Agent.Store.Shared (UpMigration (..), upMigration)
185+
import qualified Simplex.Messaging.Agent.TSessionSubs as SS
185186
import Simplex.Messaging.Client (NetworkRequestMode (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, nonBlockingWriteTBQueue, temporaryClientError, unexpectedResponse)
186187
import qualified Simplex.Messaging.Crypto as C
187188
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
@@ -2472,10 +2473,10 @@ sendNtfConnCommands c cmd = do
24722473
ns <- asks ntfSupervisor
24732474
connIds <- liftIO $ S.toList <$> getSubscriptions c
24742475
rs <- withStore' c (`getConnsData` connIds)
2475-
let (connIds', cErrs) = enabledNtfConns (zip connIds rs)
2476+
let (connIds', errs) = enabledNtfConns (zip connIds rs)
24762477
forM_ (L.nonEmpty connIds') $ \connIds'' ->
24772478
atomically $ writeTBQueue (ntfSubQ ns) (cmd, connIds'')
2478-
unless (null cErrs) $ atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS cErrs)
2479+
forM_ (L.nonEmpty errs) $ notifySub c . ERRS
24792480
where
24802481
enabledNtfConns :: [(ConnId, Either StoreError (Maybe (ConnData, ConnectionMode)))] -> ([ConnId], [(ConnId, AgentErrorType)])
24812482
enabledNtfConns = foldr addEnabledConn ([], [])
@@ -2681,16 +2682,16 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
26812682
processSubOk :: RcvQueue -> TVar [ConnId] -> AM ()
26822683
processSubOk rq@RcvQueue {connId} upConnIds =
26832684
atomically . whenM (isPendingSub rq) $ do
2684-
addSubscription c sessId $ rcvQueueSub rq
2685+
SS.addActiveSub tSess sessId (rcvQueueSub rq) $ currentSubs c
26852686
modifyTVar' upConnIds (connId :)
26862687
processSubErr :: RcvQueue -> SMPClientError -> AM ()
26872688
processSubErr rq@RcvQueue {connId} e = do
26882689
atomically . whenM (isPendingSub rq) $
2689-
failSubscription c rq e >> incSMPServerStat c userId srv connSubErrs
2690+
failSubscription c tSess rq e >> incSMPServerStat c userId srv connSubErrs
26902691
lift $ notifyErr connId e
26912692
isPendingSub :: RcvQueue -> STM Bool
26922693
isPendingSub rq = do
2693-
pending <- (&&) <$> hasPendingSubscription c rq <*> activeClientSession c tSess sessId
2694+
pending <- (&&) <$> SS.hasPendingSub tSess (queueId rq) (currentSubs c) <*> activeClientSession c tSess sessId
26942695
unless pending $ incSMPServerStat c userId srv connSubIgnored
26952696
pure pending
26962697
notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m ()
@@ -2871,14 +2872,14 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
28712872
handleNotifyAck :: AM ACKd -> AM ACKd
28722873
handleNotifyAck m = m `catchAllErrors` \e -> notify (ERR e) >> ack
28732874
SMP.END ->
2874-
atomically (ifM (activeClientSession c tSess sessId) (removeSubscription c connId rq $> True) (pure False))
2875+
atomically (ifM (activeClientSession c tSess sessId) (removeSubscription c tSess connId rq $> True) (pure False))
28752876
>>= notifyEnd
28762877
where
28772878
notifyEnd removed
28782879
| removed = notify END >> logServer "<--" c srv rId "END"
28792880
| otherwise = logServer "<--" c srv rId "END from disconnected client - ignored"
28802881
-- Possibly, we need to add some flag to connection that it was deleted
2881-
SMP.DELD -> atomically (removeSubscription c connId rq) >> notify DELD
2882+
SMP.DELD -> atomically (removeSubscription c tSess connId rq) >> notify DELD
28822883
SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
28832884
r -> unexpected r
28842885
where

0 commit comments

Comments
 (0)