Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 34 additions & 33 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ import Simplex.Messaging.Agent.Store.Entity
import Simplex.Messaging.Agent.Store.Interface (closeDBStore, execSQL, getCurrentMigrations)
import Simplex.Messaging.Agent.Store.Shared (UpMigration (..), upMigration)
import qualified Simplex.Messaging.Agent.TSessionSubs as SS
import Simplex.Messaging.Client (NetworkRequestMode (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, TransportSessionMode (..), nonBlockingWriteTBQueue, smpErrorClientNotice, temporaryClientError, unexpectedResponse)
import Simplex.Messaging.Client (NetworkRequestMode (..), ProtocolClientError (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, TransportSessionMode (..), nonBlockingWriteTBQueue, smpErrorClientNotice, temporaryClientError, unexpectedResponse)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
Expand Down Expand Up @@ -222,6 +222,7 @@ import Simplex.Messaging.Protocol
SParty (..),
SProtocolType (..),
ServiceSub (..),
ServiceSubResult,
SndPublicAuthKey,
SubscriptionMode (..),
UserProtocol,
Expand All @@ -232,7 +233,7 @@ import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
import Simplex.Messaging.SystemTime
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion)
import Simplex.Messaging.Transport (SMPVersion, THClientService' (..), THandleAuth (..), THandleParams (..))
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import Simplex.RemoteControl.Client
Expand Down Expand Up @@ -502,7 +503,7 @@ resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either Agen
resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
{-# INLINE resubscribeConnections #-}

subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSubResult))
subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
{-# INLINE subscribeClientServices #-}

Expand Down Expand Up @@ -1355,28 +1356,23 @@ toConnResult connId rs = case M.lookup connId rs of
Just (Left e) -> throwE e
_ -> throwE $ INTERNAL $ "no result for connection " <> B.unpack connId

type QCmdResult a = (QueueStatus, Either AgentErrorType a)

type QDelResult = QCmdResult ()

type QSubResult = QCmdResult (Maybe SMP.ServiceId)
type QCmdResult = (QueueStatus, Either AgentErrorType ())

subscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ()))
subscribeConnections' _ [] = pure M.empty
subscribeConnections' c connIds = subscribeConnections_ c . zip connIds =<< withStore' c (`getConnSubs` connIds)

subscribeConnections_ :: AgentClient -> [(ConnId, Either StoreError SomeConnSub)] -> AM (Map ConnId (Either AgentErrorType ()))
subscribeConnections_ c conns = do
-- TODO [certs rcv] - it should exclude connections already associated, and then if some don't deliver any response they may be unassociated
let (subRs, cs) = foldr partitionResultsConns ([], []) conns
resumeDelivery cs
resumeConnCmds c $ map fst cs
-- queue/service association is handled in the client
rcvRs <- lift $ connResults <$> subscribeQueues c False (concatMap rcvQueues cs)
rcvRs' <- storeClientServiceAssocs rcvRs
ns <- asks ntfSupervisor
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs cs
-- union is left-biased
let rs = rcvRs' `M.union` subRs
let rs = rcvRs `M.union` subRs
notifyResultError rs
pure rs
where
Expand All @@ -1400,24 +1396,21 @@ subscribeConnections_ c conns = do
_ -> Left $ INTERNAL "unexpected queue status"
rcvQueues :: (ConnId, SomeConnSub) -> [RcvQueueSub]
rcvQueues (_, SomeConn _ conn) = connRcvQueues conn
connResults :: [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId))
connResults :: [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> Map ConnId (Either AgentErrorType ())
connResults = M.map snd . foldl' addResult M.empty
where
-- collects results by connection ID
addResult :: Map ConnId QSubResult -> (RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId)) -> Map ConnId QSubResult
addResult rs (RcvQueueSub {connId, status}, r) = M.alter (combineRes (status, r)) connId rs
addResult :: Map ConnId QCmdResult -> (RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId)) -> Map ConnId QCmdResult
addResult rs (RcvQueueSub {connId, status}, r) = M.alter (combineRes (status, () <$ r)) connId rs
-- combines two results for one connection, by using only Active queues (if there is at least one Active queue)
combineRes :: QSubResult -> Maybe QSubResult -> Maybe QSubResult
combineRes :: QCmdResult -> Maybe QCmdResult -> Maybe QCmdResult
combineRes r' (Just r) = Just $ if order r <= order r' then r else r'
combineRes r' _ = Just r'
order :: QSubResult -> Int
order :: QCmdResult -> Int
order (Active, Right _) = 1
order (Active, _) = 2
order (_, Right _) = 3
order _ = 4
-- TODO [certs rcv] store associations of queues with client service ID
storeClientServiceAssocs :: Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId)) -> AM (Map ConnId (Either AgentErrorType ()))
storeClientServiceAssocs = pure . M.map (() <$)
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType ()) -> [(ConnId, SomeConnSub)] -> AM' ()
sendNtfCreate ns rcvRs cs = do
let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs
Expand Down Expand Up @@ -1522,14 +1515,14 @@ resubscribeConnections' c connIds = do
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'

-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSubResult))
subscribeClientServices' c userId =
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
where
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
subscribe = do
srvs <- withStore' c (`getClientServiceServers` userId)
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c userId srv n idsHash) srvs
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv n idsHash) srvs

-- requesting messages sequentially, to reduce memory usage
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
Expand Down Expand Up @@ -2383,13 +2376,13 @@ deleteConnQueues c nm waitDelivery ntf rqs = do
connResults = M.map snd . foldl' addResult M.empty
where
-- collects results by connection ID
addResult :: Map ConnId QDelResult -> (RcvQueue, Either AgentErrorType ()) -> Map ConnId QDelResult
addResult :: Map ConnId QCmdResult -> (RcvQueue, Either AgentErrorType ()) -> Map ConnId QCmdResult
addResult rs (RcvQueue {connId, status}, r) = M.alter (combineRes (status, r)) connId rs
-- combines two results for one connection, by prioritizing errors in Active queues
combineRes :: QDelResult -> Maybe QDelResult -> Maybe QDelResult
combineRes :: QCmdResult -> Maybe QCmdResult -> Maybe QCmdResult
combineRes r' (Just r) = Just $ if order r <= order r' then r else r'
combineRes r' _ = Just r'
order :: QDelResult -> Int
order :: QCmdResult -> Int
order (Active, Left _) = 1
order (_, Left _) = 2
order _ = 3
Expand Down Expand Up @@ -2840,11 +2833,17 @@ data ACKd = ACKd | ACKPending
-- It cannot be finally, as sometimes it needs to be ACK+DEL,
-- and sometimes ACK has to be sent from the consumer.
processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' ()
processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId, ts) = do
processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandleParams {thAuth, sessionId = sessId}, ts) = do
upConnIds <- newTVarIO []
serviceRQs <- newTVarIO ([] :: [RcvQueue])
forM_ ts $ \(entId, t) -> case t of
STEvent msgOrErr
| entId == SMP.NoEntity -> pure () -- TODO [certs rcv] process SALL
| entId == SMP.NoEntity -> case msgOrErr of
Right msg -> case msg of
SMP.ALLS -> notifySub c $ SERVICE_ALL srv
SMP.ERR e -> notifyErr "" $ PCEProtocolError e
_ -> logError $ "unexpected event: " <> tshow msg
Left e -> notifyErr "" e
| otherwise -> withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of
Right msg -> runProcessSMP rq conn (toConnData conn) msg
Left e -> lift $ do
Expand All @@ -2853,11 +2852,10 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
STResponse (Cmd SRecipient cmd) respOrErr ->
withRcvConn entId $ \rq conn -> case cmd of
SMP.SUB -> case respOrErr of
Right SMP.OK -> liftIO $ processSubOk rq upConnIds
-- TODO [certs rcv] associate queue with the service
Right (SMP.SOK _serviceId_) -> liftIO $ processSubOk rq upConnIds
Right SMP.OK -> liftIO $ processSubOk rq upConnIds serviceRQs Nothing
Right (SMP.SOK serviceId_) -> liftIO $ processSubOk rq upConnIds serviceRQs serviceId_
Right [email protected] {} -> do
liftIO $ processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails
liftIO $ processSubOk rq upConnIds serviceRQs Nothing -- the connection is UP even when processing this particular message fails
runProcessSMP rq conn (toConnData conn) msg
Right r -> lift $ processSubErr rq $ unexpectedResponse r
Left e -> lift $ unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported
Expand All @@ -2873,6 +2871,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
unless (null connIds) $ do
notify' "" $ UP srv connIds
atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds
readTVarIO serviceRQs >>= processRcvServiceAssocs c
where
withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' ()
withRcvConn rId a = do
Expand All @@ -2882,11 +2881,13 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
tryAllErrors' (a rq conn) >>= \case
Left e -> notify' connId (ERR e)
Right () -> pure ()
processSubOk :: RcvQueue -> TVar [ConnId] -> IO ()
processSubOk rq@RcvQueue {connId} upConnIds =
processSubOk :: RcvQueue -> TVar [ConnId] -> TVar [RcvQueue] -> Maybe SMP.ServiceId -> IO ()
processSubOk rq@RcvQueue {connId} upConnIds serviceRQs serviceId_ =
atomically . whenM (isPendingSub rq) $ do
SS.addActiveSub tSess sessId rq $ currentSubs c
modifyTVar' upConnIds (connId :)
when (isJust serviceId_ && serviceId_ == clientServiceId_) $ modifyTVar' serviceRQs (rq :)
clientServiceId_ = (\THClientService {serviceId} -> serviceId) <$> (clientService =<< thAuth)
processSubErr :: RcvQueue -> SMPClientError -> AM' ()
processSubErr rq@RcvQueue {connId} e = do
atomically . whenM (isPendingSub rq) $
Expand Down
41 changes: 23 additions & 18 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ module Simplex.Messaging.Agent.Client
subscribeQueues,
subscribeUserServerQueues,
subscribeClientService,
processRcvServiceAssocs,
processClientNotices,
getQueueMessage,
decryptSMPMessage,
Expand Down Expand Up @@ -280,6 +281,7 @@ import Simplex.Messaging.Protocol
SMPMsgMeta (..),
SProtocolType (..),
ServiceSub (..),
ServiceSubResult (..),
SndPublicAuthKey,
SubscriptionMode (..),
NewNtfCreds (..),
Expand All @@ -292,6 +294,7 @@ import Simplex.Messaging.Protocol
XFTPServerWithAuth,
pattern NoEntity,
senderCanSecure,
serviceSubResult,
)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Protocol.Types
Expand Down Expand Up @@ -785,6 +788,7 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
serverDown (qs, conns, serviceSub_) = whenM (readTVarIO active) $ do
notifySub c $ hostEvent' DISCONNECT client
unless (null conns) $ notifySub c $ DOWN srv conns
mapM_ (notifySub c . SERVICE_DOWN srv) serviceSub_
unless (null qs && isNothing serviceSub_) $ do
releaseGetLocksIO c qs
mode <- getSessionModeIO c
Expand Down Expand Up @@ -1514,7 +1518,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
newErr = throwE . BROKER (B.unpack $ strEncode srv) . UNEXPECTED . ("Create queue: " <>)

processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> Maybe ServiceId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ([RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)])
processSubResults c tSess@(userId, srv, _) sessId smpServiceId rs = do
processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
pending <- SS.getPendingSubs tSess $ currentSubs c
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pending) (M.empty, ([], []), [], 0) rs
unless (M.null failed) $ do
Expand All @@ -1541,10 +1545,10 @@ processSubResults c tSess@(userId, srv, _) sessId smpServiceId rs = do
| otherwise -> (failed', subscribed, notices, ignored)
where
failed' = M.insert rcvId e failed
Right serviceId_
Right serviceId_'
| rcvId `M.member` pendingSubs ->
let subscribed' = case (smpServiceId, serviceId_, pendingSS) of
(Just sId, Just sId', Just ServiceSub {serviceId}) | sId == sId' && sId == serviceId -> (qs, rq : sQs)
let subscribed' = case (serviceId_, serviceId_', pendingSS) of
(Just sId, Just sId', Just ServiceSub {smpServiceId}) | sId == sId' && sId == smpServiceId -> (qs, rq : sQs)
_ -> (rq : qs, sQs)
in (failed, subscribed', notices', ignored)
| otherwise -> (failed, subscribed, notices', ignored + 1)
Expand Down Expand Up @@ -1692,7 +1696,8 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
sessId = sessionId $ thParams smp
smpServiceId = (\THClientService {serviceId} -> serviceId) <$> smpClientService smp

processRcvServiceAssocs :: AgentClient -> [RcvQueueSub] -> AM' ()
processRcvServiceAssocs :: SMPQueue q => AgentClient -> [q] -> AM' ()
processRcvServiceAssocs _ [] = pure ()
processRcvServiceAssocs c serviceQs =
withStore' c (`setRcvServiceAssocs` serviceQs) `catchAllErrors'` \e -> do
logError $ "processClientNotices error: " <> tshow e
Expand All @@ -1709,17 +1714,16 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
logError $ "processClientNotices error: " <> tshow e
notifySub' c "" $ ERR e

resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSub
resubscribeClientService c tSess (ServiceSub _ n idsHash) =
withServiceClient c tSess $ \smp _ -> do
subscribeClientService_ c tSess smp n idsHash
resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult
resubscribeClientService c tSess serviceSub =
withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub

subscribeClientService :: AgentClient -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSub
subscribeClientService c userId srv n idsHash =
subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSubResult
subscribeClientService c withEvent userId srv n idsHash =
withServiceClient c tSess $ \smp smpServiceId -> do
let serviceSub = ServiceSub smpServiceId n idsHash
atomically $ SS.setPendingServiceSub tSess serviceSub $ currentSubs c
subscribeClientService_ c tSess smp n idsHash
subscribeClientService_ c withEvent tSess smp serviceSub
where
tSess = (userId, srv, Nothing)

Expand All @@ -1730,14 +1734,15 @@ withServiceClient c tSess action =
Just smpServiceId -> action smp smpServiceId
Nothing -> throwE PCEServiceUnavailable

subscribeClientService_ :: AgentClient -> SMPTransportSession -> SMPClient -> Int64 -> IdsHash -> ExceptT SMPClientError IO ServiceSub
subscribeClientService_ c tSess smp n idsHash = do
-- TODO [certs rcv] handle error
serviceSub' <- subscribeService smp SMP.SRecipientService n idsHash
subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult
subscribeClientService_ c withEvent tSess@(_, srv, _) smp expected@(ServiceSub _ n idsHash) = do
subscribed <- subscribeService smp SMP.SRecipientService n idsHash
let sessId = sessionId $ thParams smp
r = serviceSubResult expected subscribed
atomically $ whenM (activeClientSession c tSess sessId) $
SS.setActiveServiceSub tSess sessId serviceSub' $ currentSubs c
pure serviceSub'
SS.setActiveServiceSub tSess sessId subscribed $ currentSubs c
when withEvent $ notifySub c $ SERVICE_UP srv r
pure r

activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)
Expand Down
11 changes: 11 additions & 0 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ import Simplex.Messaging.Protocol
NMsgMeta,
ProtocolServer (..),
QueueMode (..),
ServiceSub,
ServiceSubResult,
SMPClientVersion,
SMPServer,
SMPServerWithAuth,
Expand Down Expand Up @@ -388,6 +390,9 @@ data AEvent (e :: AEntity) where
DISCONNECT :: AProtocolType -> TransportHost -> AEvent AENone
DOWN :: SMPServer -> [ConnId] -> AEvent AENone
UP :: SMPServer -> [ConnId] -> AEvent AENone
SERVICE_ALL :: SMPServer -> AEvent AENone -- all service messages are delivered
SERVICE_DOWN :: SMPServer -> ServiceSub -> AEvent AENone
SERVICE_UP :: SMPServer -> ServiceSubResult -> AEvent AENone
SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> AEvent AEConn
RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> AEvent AEConn
SENT :: AgentMsgId -> Maybe SMPServer -> AEvent AEConn
Expand Down Expand Up @@ -459,6 +464,9 @@ data AEventTag (e :: AEntity) where
DISCONNECT_ :: AEventTag AENone
DOWN_ :: AEventTag AENone
UP_ :: AEventTag AENone
SERVICE_ALL_ :: AEventTag AENone
SERVICE_DOWN_ :: AEventTag AENone
SERVICE_UP_ :: AEventTag AENone
SWITCH_ :: AEventTag AEConn
RSYNC_ :: AEventTag AEConn
SENT_ :: AEventTag AEConn
Expand Down Expand Up @@ -514,6 +522,9 @@ aEventTag = \case
DISCONNECT {} -> DISCONNECT_
DOWN {} -> DOWN_
UP {} -> UP_
SERVICE_ALL _ -> SERVICE_ALL_
SERVICE_DOWN {} -> SERVICE_DOWN_
SERVICE_UP {} -> SERVICE_UP_
SWITCH {} -> SWITCH_
RSYNC {} -> RSYNC_
SENT {} -> SENT_
Expand Down
Loading
Loading