Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ import Data.Bifunctor (bimap, first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Composition
import Data.Either (isRight, partitionEithers, rights)
import Data.Either (fromRight, isRight, partitionEithers, rights)
import Data.Foldable (foldl', toList)
import Data.Functor (($>))
import Data.Functor.Identity
Expand Down Expand Up @@ -221,7 +221,6 @@ import Simplex.Messaging.Protocol
SMPMsgMeta,
SParty (..),
SProtocolType (..),
ServiceSub (..),
ServiceSubResult,
SndPublicAuthKey,
SubscriptionMode (..),
Expand Down Expand Up @@ -1451,7 +1450,23 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
let userSrvs' = case activeUserId_ of
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
Nothing -> userSrvs
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs'
useServices <- readTVarIO $ useClientServices c
-- These options are possible below:
-- 1) services fully disabled:
-- No service subscriptions will be attempted, and existing services and association will remain in in the database,
-- but they will be ignored because of hasService parameter set to False.
-- This approach preserves performance for all clients that do not use services.
-- 2) at least one user ID has services enabled:
-- Service will be loaded for all user/server combinations:
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
-- b) service is disabled and record exists: service record and all associations will be removed,
-- c) service is disabled or no record: no subscription attempt.
-- On successful service subscription, only unassociated queues will be subscribed.
userSrvs'' <-
if any id useServices
then lift $ mapConcurrently (subscribeService useServices) userSrvs'
else pure $ map (,False) userSrvs'
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs''
let (errs, oks) = partitionEithers rs
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
Expand All @@ -1460,21 +1475,31 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
resumeAllCommands c
where
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int)
subscribeUserServer maxPending currPending (userId, srv) = do
subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc)
subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do
withStore' c (\db -> getSubscriptionService db userId srv) >>= \case
Just serviceSub -> case M.lookup userId useServices of
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
Left e | clientServiceError e -> unassocQueues $> False
_ -> pure True
_ -> unassocQueues $> False
where
unassocQueues = withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
_ -> pure False
subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int)
subscribeUserServer maxPending currPending ((userId, srv), hasService) = do
atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry
tryAllErrors' $ do
qs <- withStore' c $ \db -> do
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded
atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded hasService
unless (null qs) $ atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
pure qs
let n = length qs
lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n)
unless (null qs) $ lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n)
pure n
where
subscribe qs = do
rs <- subscribeUserServerQueues c userId srv qs
-- TODO [certs rcv] storeClientServiceAssocs store associations of queues with client service ID
ns <- asks ntfSupervisor
whenM (liftIO $ hasInstantNotifications ns) $ sendNtfCreate ns rs
sendNtfCreate :: NtfSupervisor -> [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> AM' ()
Expand Down Expand Up @@ -1522,7 +1547,7 @@ subscribeClientServices' c userId =
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
subscribe = do
srvs <- withStore' c (`getClientServiceServers` userId)
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv n idsHash) srvs
lift $ M.fromList <$> mapConcurrently (\(srv, serviceSub) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv serviceSub) srvs

-- requesting messages sequentially, to reduce memory usage
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
Expand Down
52 changes: 34 additions & 18 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ module Simplex.Messaging.Agent.Client
getAgentSubscriptions,
slowNetworkConfig,
protocolClientError,
clientServiceError,
Worker (..),
SessionVar (..),
SubscriptionsInfo (..),
Expand Down Expand Up @@ -303,7 +304,7 @@ import Simplex.Messaging.Session
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleParams (sessionId, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion)
import Simplex.Messaging.Transport (HandshakeError (..), SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleAuth (..), THandleParams (sessionId, thAuth, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion)
import Simplex.Messaging.Transport.Client (TransportHost (..))
import Simplex.Messaging.Transport.Credentials
import Simplex.Messaging.Util
Expand Down Expand Up @@ -619,7 +620,7 @@ getServiceCredentials c userId srv =
let g = agentDRG c
((C.KeyHash kh, serviceCreds), serviceId_) <-
withStore' c $ \db ->
getClientService db userId srv >>= \case
getClientServiceCredentials db userId srv >>= \case
Just service -> pure service
Nothing -> do
cred <- genCredentials g Nothing (25, 24 * 999999) "simplex"
Expand Down Expand Up @@ -747,15 +748,13 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
ts <- readTVarIO proxySessTs
ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
-- TODO [certs rcv] add service to SS, possibly combine with SS.setSessionId
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
updateClientService service smp
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
-- TODO [certs rcv] this should differentiate between service ID just set and service ID changed, and in the latter case disassociate the queues
updateClientService service smp = case (service, smpClientService smp) of
(Just (_, serviceId_), Just THClientService {serviceId})
| serviceId_ /= Just serviceId -> withStore' c $ \db -> setClientServiceId db userId srv serviceId
| otherwise -> pure ()
(Just (_, serviceId_), Just THClientService {serviceId}) -> withStore' c $ \db -> do
setClientServiceId db userId srv serviceId
forM_ serviceId_ $ \sId -> when (sId /= serviceId) $ removeRcvServiceAssocs db userId srv
(Just _, Nothing) -> withStore' c $ \db -> deleteClientService db userId srv -- e.g., server version downgrade
(Nothing, Just _) -> logError "server returned serviceId without service credentials in request"
(Nothing, Nothing) -> pure ()
Expand Down Expand Up @@ -1258,6 +1257,14 @@ protocolClientError protocolError_ host = \case
PCEServiceUnavailable {} -> BROKER host NO_SERVICE
PCEIOError e -> BROKER host $ NETWORK $ NEConnectError $ E.displayException e

-- it is consistent with smpClientServiceError
clientServiceError :: AgentErrorType -> Bool
clientServiceError = \case
BROKER _ NO_SERVICE -> True
SMP _ SMP.SERVICE -> True
SMP _ (SMP.PROXY (SMP.BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen.
_ -> False

data ProtocolTestStep
= TSConnect
| TSDisconnect
Expand Down Expand Up @@ -1446,8 +1453,8 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
withClient c nm tSess $ \(SMPConnectedClient smp _) -> do
(ntfKeys, ntfCreds) <- liftIO $ mkNtfCreds a g smp
(thParams smp,ntfKeys,) <$> createSMPQueue smp nm nonce_ rKeys dhKey auth subMode (queueReqData cqrd) ntfCreds
-- TODO [certs rcv] validate that serviceId is the same as in the client session, fail otherwise
-- possibly, it should allow returning Nothing - it would indicate incorrect old version
let sessServiceId = (\THClientService {serviceId = sId} -> sId) <$> (clientService =<< thAuth thParams')
when (isJust serviceId && serviceId /= sessServiceId) $ logError "incorrect service ID in NEW response"
liftIO . logServer "<--" c srv NoEntity $ B.unwords ["IDS", logSecret rcvId, logSecret sndId]
shortLink <- mkShortLinkCreds thParams' qik
let rq =
Expand All @@ -1463,7 +1470,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
sndId,
queueMode,
shortLink,
rcvServiceAssoc = isJust serviceId,
rcvServiceAssoc = isJust serviceId && serviceId == sessServiceId,
status = New,
enableNtfs,
clientNoticeId = Nothing,
Expand Down Expand Up @@ -1559,6 +1566,8 @@ temporaryAgentError :: AgentErrorType -> Bool
temporaryAgentError = \case
BROKER _ e -> tempBrokerError e
SMP _ (SMP.PROXY (SMP.BROKER e)) -> tempBrokerError e
SMP _ (SMP.STORE _) -> True
NTF _ (SMP.STORE _) -> True
XFTP _ XFTP.TIMEOUT -> True
PROXY _ _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> tempBrokerError e
PROXY _ _ (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> True
Expand All @@ -1569,6 +1578,7 @@ temporaryAgentError = \case
tempBrokerError = \case
NETWORK _ -> True
TIMEOUT -> True
TRANSPORT (TEHandshake BAD_SERVICE) -> True -- this error is considered temporary because it is DB error
_ -> False

temporaryOrHostError :: AgentErrorType -> Bool
Expand Down Expand Up @@ -1715,11 +1725,16 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
notifySub' c "" $ ERR e

resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult
resubscribeClientService c tSess serviceSub =
withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub

subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSubResult
subscribeClientService c withEvent userId srv n idsHash =
resubscribeClientService c tSess@(userId, srv, _) serviceSub =
withServiceClient c tSess (\smp _ -> subscribeClientService_ c True tSess smp serviceSub) `catchE` \e -> do
when (clientServiceError e) $ do
qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
void $ lift $ subscribeUserServerQueues c userId srv qs
throwE e

-- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event
subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult
subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) =
withServiceClient c tSess $ \smp smpServiceId -> do
let serviceSub = ServiceSub smpServiceId n idsHash
atomically $ SS.setPendingServiceSub tSess serviceSub $ currentSubs c
Expand All @@ -1728,14 +1743,15 @@ subscribeClientService c withEvent userId srv n idsHash =
tSess = (userId, srv, Nothing)

withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a
withServiceClient c tSess action =
withServiceClient c tSess subscribe =
withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of
Just smpServiceId -> action smp smpServiceId
Just smpServiceId -> subscribe smp smpServiceId
Nothing -> throwE PCEServiceUnavailable

-- TODO [certs rcv] send subscription error event?
subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult
subscribeClientService_ c withEvent tSess@(_, srv, _) smp expected@(ServiceSub _ n idsHash) = do
subscribeClientService_ c withEvent tSess@(userId, srv, _) smp expected@(ServiceSub _ n idsHash) = do
subscribed <- subscribeService smp SMP.SRecipientService n idsHash
let sessId = sessionId $ thParams smp
r = serviceSubResult expected subscribed
Expand Down
Loading