@@ -194,7 +194,7 @@ import Simplex.Messaging.Agent.Store.Entity
194194import Simplex.Messaging.Agent.Store.Interface (closeDBStore , execSQL , getCurrentMigrations )
195195import Simplex.Messaging.Agent.Store.Shared (UpMigration (.. ), upMigration )
196196import qualified Simplex.Messaging.Agent.TSessionSubs as SS
197- import Simplex.Messaging.Client (NetworkRequestMode (.. ), SMPClientError , ServerTransmission (.. ), ServerTransmissionBatch , TransportSessionMode (.. ), nonBlockingWriteTBQueue , smpErrorClientNotice , temporaryClientError , unexpectedResponse )
197+ import Simplex.Messaging.Client (NetworkRequestMode (.. ), ProtocolClientError ( .. ), SMPClientError , ServerTransmission (.. ), ServerTransmissionBatch , TransportSessionMode (.. ), nonBlockingWriteTBQueue , smpErrorClientNotice , temporaryClientError , unexpectedResponse )
198198import qualified Simplex.Messaging.Crypto as C
199199import Simplex.Messaging.Crypto.File (CryptoFile , CryptoFileArgs )
200200import Simplex.Messaging.Crypto.Ratchet (PQEncryption , PQSupport (.. ), pattern PQEncOff , pattern PQEncOn , pattern PQSupportOff , pattern PQSupportOn )
@@ -222,6 +222,7 @@ import Simplex.Messaging.Protocol
222222 SParty (.. ),
223223 SProtocolType (.. ),
224224 ServiceSub (.. ),
225+ ServiceSubResult ,
225226 SndPublicAuthKey ,
226227 SubscriptionMode (.. ),
227228 UserProtocol ,
@@ -232,7 +233,7 @@ import qualified Simplex.Messaging.Protocol as SMP
232233import Simplex.Messaging.ServiceScheme (ServiceScheme (.. ))
233234import Simplex.Messaging.SystemTime
234235import qualified Simplex.Messaging.TMap as TM
235- import Simplex.Messaging.Transport (SMPVersion )
236+ import Simplex.Messaging.Transport (SMPVersion , THClientService' ( .. ), THandleAuth ( .. ), THandleParams ( .. ) )
236237import Simplex.Messaging.Util
237238import Simplex.Messaging.Version
238239import Simplex.RemoteControl.Client
@@ -502,7 +503,7 @@ resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either Agen
502503resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
503504{-# INLINE resubscribeConnections #-}
504505
505- subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSub ))
506+ subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSubResult ))
506507subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
507508{-# INLINE subscribeClientServices #-}
508509
@@ -1355,28 +1356,23 @@ toConnResult connId rs = case M.lookup connId rs of
13551356 Just (Left e) -> throwE e
13561357 _ -> throwE $ INTERNAL $ " no result for connection " <> B. unpack connId
13571358
1358- type QCmdResult a = (QueueStatus , Either AgentErrorType a )
1359-
1360- type QDelResult = QCmdResult ()
1361-
1362- type QSubResult = QCmdResult (Maybe SMP. ServiceId )
1359+ type QCmdResult = (QueueStatus , Either AgentErrorType () )
13631360
13641361subscribeConnections' :: AgentClient -> [ConnId ] -> AM (Map ConnId (Either AgentErrorType () ))
13651362subscribeConnections' _ [] = pure M. empty
13661363subscribeConnections' c connIds = subscribeConnections_ c . zip connIds =<< withStore' c (`getConnSubs` connIds)
13671364
13681365subscribeConnections_ :: AgentClient -> [(ConnId , Either StoreError SomeConnSub )] -> AM (Map ConnId (Either AgentErrorType () ))
13691366subscribeConnections_ c conns = do
1370- -- TODO [certs rcv] - it should exclude connections already associated, and then if some don't deliver any response they may be unassociated
13711367 let (subRs, cs) = foldr partitionResultsConns ([] , [] ) conns
13721368 resumeDelivery cs
13731369 resumeConnCmds c $ map fst cs
1370+ -- queue/service association is handled in the client
13741371 rcvRs <- lift $ connResults <$> subscribeQueues c False (concatMap rcvQueues cs)
1375- rcvRs' <- storeClientServiceAssocs rcvRs
13761372 ns <- asks ntfSupervisor
1377- lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs
1373+ lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs cs
13781374 -- union is left-biased
1379- let rs = rcvRs' `M.union` subRs
1375+ let rs = rcvRs `M.union` subRs
13801376 notifyResultError rs
13811377 pure rs
13821378 where
@@ -1400,24 +1396,21 @@ subscribeConnections_ c conns = do
14001396 _ -> Left $ INTERNAL " unexpected queue status"
14011397 rcvQueues :: (ConnId , SomeConnSub ) -> [RcvQueueSub ]
14021398 rcvQueues (_, SomeConn _ conn) = connRcvQueues conn
1403- connResults :: [(RcvQueueSub , Either AgentErrorType (Maybe SMP. ServiceId ))] -> Map ConnId (Either AgentErrorType (Maybe SMP. ServiceId ))
1399+ connResults :: [(RcvQueueSub , Either AgentErrorType (Maybe SMP. ServiceId ))] -> Map ConnId (Either AgentErrorType () )
14041400 connResults = M. map snd . foldl' addResult M. empty
14051401 where
14061402 -- collects results by connection ID
1407- addResult :: Map ConnId QSubResult -> (RcvQueueSub , Either AgentErrorType (Maybe SMP. ServiceId )) -> Map ConnId QSubResult
1408- addResult rs (RcvQueueSub {connId, status}, r) = M. alter (combineRes (status, r)) connId rs
1403+ addResult :: Map ConnId QCmdResult -> (RcvQueueSub , Either AgentErrorType (Maybe SMP. ServiceId )) -> Map ConnId QCmdResult
1404+ addResult rs (RcvQueueSub {connId, status}, r) = M. alter (combineRes (status, () <$ r)) connId rs
14091405 -- combines two results for one connection, by using only Active queues (if there is at least one Active queue)
1410- combineRes :: QSubResult -> Maybe QSubResult -> Maybe QSubResult
1406+ combineRes :: QCmdResult -> Maybe QCmdResult -> Maybe QCmdResult
14111407 combineRes r' (Just r) = Just $ if order r <= order r' then r else r'
14121408 combineRes r' _ = Just r'
1413- order :: QSubResult -> Int
1409+ order :: QCmdResult -> Int
14141410 order (Active , Right _) = 1
14151411 order (Active , _) = 2
14161412 order (_, Right _) = 3
14171413 order _ = 4
1418- -- TODO [certs rcv] store associations of queues with client service ID
1419- storeClientServiceAssocs :: Map ConnId (Either AgentErrorType (Maybe SMP. ServiceId )) -> AM (Map ConnId (Either AgentErrorType () ))
1420- storeClientServiceAssocs = pure . M. map (() <$ )
14211414 sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType () ) -> [(ConnId , SomeConnSub )] -> AM' ()
14221415 sendNtfCreate ns rcvRs cs = do
14231416 let oks = M. keysSet $ M. filter (either temporaryAgentError $ const True ) rcvRs
@@ -1522,14 +1515,14 @@ resubscribeConnections' c connIds = do
15221515 rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'
15231516
15241517-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
1525- subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSub ))
1518+ subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSubResult ))
15261519subscribeClientServices' c userId =
15271520 ifM useService subscribe $ throwError $ CMD PROHIBITED " no user service allowed"
15281521 where
15291522 useService = liftIO $ (Just True == ) <$> TM. lookupIO userId (useClientServices c)
15301523 subscribe = do
15311524 srvs <- withStore' c (`getClientServiceServers` userId)
1532- lift $ M. fromList <$> mapConcurrently (\ (srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c userId srv n idsHash) srvs
1525+ lift $ M. fromList <$> mapConcurrently (\ (srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv n idsHash) srvs
15331526
15341527-- requesting messages sequentially, to reduce memory usage
15351528getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta )))
@@ -2383,13 +2376,13 @@ deleteConnQueues c nm waitDelivery ntf rqs = do
23832376 connResults = M. map snd . foldl' addResult M. empty
23842377 where
23852378 -- collects results by connection ID
2386- addResult :: Map ConnId QDelResult -> (RcvQueue , Either AgentErrorType () ) -> Map ConnId QDelResult
2379+ addResult :: Map ConnId QCmdResult -> (RcvQueue , Either AgentErrorType () ) -> Map ConnId QCmdResult
23872380 addResult rs (RcvQueue {connId, status}, r) = M. alter (combineRes (status, r)) connId rs
23882381 -- combines two results for one connection, by prioritizing errors in Active queues
2389- combineRes :: QDelResult -> Maybe QDelResult -> Maybe QDelResult
2382+ combineRes :: QCmdResult -> Maybe QCmdResult -> Maybe QCmdResult
23902383 combineRes r' (Just r) = Just $ if order r <= order r' then r else r'
23912384 combineRes r' _ = Just r'
2392- order :: QDelResult -> Int
2385+ order :: QCmdResult -> Int
23932386 order (Active , Left _) = 1
23942387 order (_, Left _) = 2
23952388 order _ = 3
@@ -2840,11 +2833,17 @@ data ACKd = ACKd | ACKPending
28402833-- It cannot be finally, as sometimes it needs to be ACK+DEL,
28412834-- and sometimes ACK has to be sent from the consumer.
28422835processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' ()
2843- processSMPTransmissions c@ AgentClient {subQ} (tSess@ (userId, srv, _), _v, sessId, ts) = do
2836+ processSMPTransmissions c@ AgentClient {subQ} (tSess@ (userId, srv, _), THandleParams {thAuth, sessionId = sessId} , ts) = do
28442837 upConnIds <- newTVarIO []
2838+ serviceRQs <- newTVarIO ([] :: [RcvQueue ])
28452839 forM_ ts $ \ (entId, t) -> case t of
28462840 STEvent msgOrErr
2847- | entId == SMP. NoEntity -> pure () -- TODO [certs rcv] process SALL
2841+ | entId == SMP. NoEntity -> case msgOrErr of
2842+ Right msg -> case msg of
2843+ SMP. ALLS -> notifySub c $ SERVICE_ALL srv
2844+ SMP. ERR e -> notifyErr " " $ PCEProtocolError e
2845+ _ -> logError $ " unexpected event: " <> tshow msg
2846+ Left e -> notifyErr " " e
28482847 | otherwise -> withRcvConn entId $ \ rq@ RcvQueue {connId} conn -> case msgOrErr of
28492848 Right msg -> runProcessSMP rq conn (toConnData conn) msg
28502849 Left e -> lift $ do
@@ -2853,11 +2852,10 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
28532852 STResponse (Cmd SRecipient cmd) respOrErr ->
28542853 withRcvConn entId $ \ rq conn -> case cmd of
28552854 SMP. SUB -> case respOrErr of
2856- Right SMP. OK -> liftIO $ processSubOk rq upConnIds
2857- -- TODO [certs rcv] associate queue with the service
2858- Right (SMP. SOK _serviceId_) -> liftIO $ processSubOk rq upConnIds
2855+ Right SMP. OK -> liftIO $ processSubOk rq upConnIds serviceRQs Nothing
2856+ Right (SMP. SOK serviceId_) -> liftIO $ processSubOk rq upConnIds serviceRQs serviceId_
28592857 Right msg@ SMP. MSG {} -> do
2860- liftIO $ processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails
2858+ liftIO $ processSubOk rq upConnIds serviceRQs Nothing -- the connection is UP even when processing this particular message fails
28612859 runProcessSMP rq conn (toConnData conn) msg
28622860 Right r -> lift $ processSubErr rq $ unexpectedResponse r
28632861 Left e -> lift $ unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported
@@ -2873,6 +2871,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
28732871 unless (null connIds) $ do
28742872 notify' " " $ UP srv connIds
28752873 atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds
2874+ readTVarIO serviceRQs >>= processRcvServiceAssocs c
28762875 where
28772876 withRcvConn :: SMP. RecipientId -> (forall c . RcvQueue -> Connection c -> AM () ) -> AM' ()
28782877 withRcvConn rId a = do
@@ -2882,11 +2881,13 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
28822881 tryAllErrors' (a rq conn) >>= \ case
28832882 Left e -> notify' connId (ERR e)
28842883 Right () -> pure ()
2885- processSubOk :: RcvQueue -> TVar [ConnId ] -> IO ()
2886- processSubOk rq@ RcvQueue {connId} upConnIds =
2884+ processSubOk :: RcvQueue -> TVar [ConnId ] -> TVar [ RcvQueue ] -> Maybe SMP. ServiceId -> IO ()
2885+ processSubOk rq@ RcvQueue {connId} upConnIds serviceRQs serviceId_ =
28872886 atomically . whenM (isPendingSub rq) $ do
28882887 SS. addActiveSub tSess sessId rq $ currentSubs c
28892888 modifyTVar' upConnIds (connId : )
2889+ when (isJust serviceId_ && serviceId_ == clientServiceId_) $ modifyTVar' serviceRQs (rq : )
2890+ clientServiceId_ = (\ THClientService {serviceId} -> serviceId) <$> (clientService =<< thAuth)
28902891 processSubErr :: RcvQueue -> SMPClientError -> AM' ()
28912892 processSubErr rq@ RcvQueue {connId} e = do
28922893 atomically . whenM (isPendingSub rq) $
0 commit comments