Skip to content

Commit 9854cac

Browse files
authored
agent: optimize subscriptions (#1645)
* agent: optimize subscriptions * simplify * clean up
1 parent c8b551d commit 9854cac

File tree

1 file changed

+37
-22
lines changed

1 file changed

+37
-22
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,32 +1264,43 @@ type QSubResult = QCmdResult (Maybe SMP.ServiceId)
12641264
subscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
12651265
subscribeConnections' _ [] = pure M.empty
12661266
subscribeConnections' c connIds = do
1267-
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (`getConns` connIds)
1268-
let (errs, cs) = M.mapEither id conns
1269-
errs' = M.map (Left . storeError) errs
1270-
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
1267+
conns <- withStore' c (`getConns` connIds)
1268+
let (subRs, cs) = foldr partitionResultsConns ([], []) $ zip connIds conns
12711269
resumeDelivery cs
1272-
resumeConnCmds c $ M.keys cs
1273-
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs)
1270+
resumeConnCmds c $ map fst cs
1271+
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs)
12741272
rcvRs' <- storeClientServiceAssocs rcvRs
12751273
ns <- asks ntfSupervisor
12761274
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs
1277-
let rs = M.unions ([errs', subRs, rcvRs'] :: [Map ConnId (Either AgentErrorType (Maybe ClientServiceId))])
1275+
let rs = M.fromList subRs `M.union` rcvRs'
12781276
notifyResultError rs
12791277
pure rs
12801278
where
1281-
rcvQueueOrResult :: SomeConn -> Either (Either AgentErrorType (Maybe ClientServiceId)) [RcvQueue]
1282-
rcvQueueOrResult (SomeConn _ conn) = case conn of
1283-
DuplexConnection _ rqs _ -> Right $ L.toList rqs
1284-
SndConnection _ sq -> Left $ sndSubResult sq
1285-
RcvConnection _ rq -> Right [rq]
1286-
ContactConnection _ rq -> Right [rq]
1287-
NewConnection _ -> Left (Right Nothing)
1279+
partitionResultsConns :: (ConnId, Either StoreError SomeConn) ->
1280+
([(ConnId, Either AgentErrorType (Maybe ClientServiceId))], [(ConnId, SomeConn)]) ->
1281+
([(ConnId, Either AgentErrorType (Maybe ClientServiceId))], [(ConnId, SomeConn)])
1282+
partitionResultsConns (connId, conn_) (rs, cs) = case conn_ of
1283+
Left e -> ((connId, Left (storeError e)) : rs, cs)
1284+
Right c'@(SomeConn _ conn) -> case conn of
1285+
DuplexConnection {} -> (rs, cs')
1286+
SndConnection _ sq -> ((connId, sndSubResult sq) : rs, cs')
1287+
RcvConnection _ _ -> (rs, cs')
1288+
ContactConnection _ _ -> (rs, cs')
1289+
NewConnection _ -> ((connId, Right Nothing) : rs, cs')
1290+
where
1291+
cs' = (connId, c') : cs
12881292
sndSubResult :: SndQueue -> Either AgentErrorType (Maybe ClientServiceId)
12891293
sndSubResult SndQueue {status} = case status of
12901294
Confirmed -> Right Nothing
12911295
Active -> Left $ CONN SIMPLEX "subscribeConnections"
12921296
_ -> Left $ INTERNAL "unexpected queue status"
1297+
rcvQueues :: (ConnId, SomeConn) -> [RcvQueue]
1298+
rcvQueues (_, SomeConn _ conn) = case conn of
1299+
DuplexConnection _ rqs _ -> L.toList rqs
1300+
SndConnection {} -> []
1301+
RcvConnection _ rq -> [rq]
1302+
ContactConnection _ rq -> [rq]
1303+
NewConnection _ -> []
12931304
connResults :: [(RcvQueue, Either AgentErrorType (Maybe SMP.ServiceId))] -> Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId))
12941305
connResults = M.map snd . foldl' addResult M.empty
12951306
where
@@ -1308,21 +1319,25 @@ subscribeConnections' c connIds = do
13081319
-- TODO [certs rcv] store associations of queues with client service ID
13091320
storeClientServiceAssocs :: Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId)) -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
13101321
storeClientServiceAssocs = pure . M.map (Nothing <$)
1311-
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> Map ConnId SomeConn -> AM' ()
1322+
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> [(ConnId, SomeConn)] -> AM' ()
13121323
sendNtfCreate ns rcvRs cs = do
13131324
let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs
1314-
cs' = M.restrictKeys cs oks
1315-
(csCreate, csDelete) = M.partition (\(SomeConn _ conn) -> enableNtfs $ toConnData conn) cs'
1325+
(csCreate, csDelete) = foldr (groupConnIds oks) ([], []) cs
13161326
sendNtfCmd NSCCreate csCreate
13171327
sendNtfCmd NSCSmpDelete csDelete
13181328
where
1319-
sendNtfCmd cmd cs' = forM_ (L.nonEmpty $ M.keys cs') $ \cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)
1320-
resumeDelivery :: Map ConnId SomeConn -> AM ()
1329+
groupConnIds oks (connId, SomeConn _ conn) acc@(csCreate, csDelete)
1330+
| connId `S.notMember` oks = acc
1331+
| enableNtfs (toConnData conn) = (connId : csCreate, csDelete)
1332+
| otherwise = (csCreate, connId : csDelete)
1333+
sendNtfCmd cmd = mapM_ (\cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)) . L.nonEmpty
1334+
resumeDelivery :: [(ConnId, SomeConn)] -> AM ()
13211335
resumeDelivery conns = do
1322-
conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery
1336+
deliverTo <- S.fromList <$> withStore' c getConnectionsForDelivery
1337+
let conns' = filter ((`S.member` deliverTo) . fst) conns
13231338
lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) conns'
1324-
sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue)
1325-
sndQueue (SomeConn _ conn) = case conn of
1339+
sndQueue :: (ConnId, SomeConn) -> Maybe (ConnData, NonEmpty SndQueue)
1340+
sndQueue (_, SomeConn _ conn) = case conn of
13261341
DuplexConnection cData _ sqs -> Just (cData, sqs)
13271342
SndConnection cData sq -> Just (cData, [sq])
13281343
_ -> Nothing

0 commit comments

Comments
 (0)