Skip to content

Commit 2ea9a9a

Browse files
agent: finalize initial service subscriptions, remove associations on service ID changes (#1672)
* agent: remove service/queue associations when service ID changes * agent: check that service ID in NEW response matches session ID in transport session * agent subscription WIP * test * comment * enable tests * update queries * agent: option to add SQLite aggregates to DB connection (#1673) * agent: add build_relations_vector function to sqlite * update aggregate * use static aggregate * remove relations --------- Co-authored-by: Evgeny Poberezkin <[email protected]> * add test, treat BAD_SERVICE as temp error, only remove queue associations on service errors * add packZipWith for backward compatibility with GHC 8.10.7 --------- Co-authored-by: spaced4ndy <[email protected]>
1 parent ff7bdbc commit 2ea9a9a

File tree

10 files changed

+330
-70
lines changed

10 files changed

+330
-70
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ import Data.Bifunctor (bimap, first)
153153
import Data.ByteString.Char8 (ByteString)
154154
import qualified Data.ByteString.Char8 as B
155155
import Data.Composition
156-
import Data.Either (isRight, partitionEithers, rights)
156+
import Data.Either (fromRight, isRight, partitionEithers, rights)
157157
import Data.Foldable (foldl', toList)
158158
import Data.Functor (($>))
159159
import Data.Functor.Identity
@@ -221,7 +221,6 @@ import Simplex.Messaging.Protocol
221221
SMPMsgMeta,
222222
SParty (..),
223223
SProtocolType (..),
224-
ServiceSub (..),
225224
ServiceSubResult,
226225
SndPublicAuthKey,
227226
SubscriptionMode (..),
@@ -1451,7 +1450,23 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14511450
let userSrvs' = case activeUserId_ of
14521451
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
14531452
Nothing -> userSrvs
1454-
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs'
1453+
useServices <- readTVarIO $ useClientServices c
1454+
-- These options are possible below:
1455+
-- 1) services fully disabled:
1456+
-- No service subscriptions will be attempted, and existing services and association will remain in in the database,
1457+
-- but they will be ignored because of hasService parameter set to False.
1458+
-- This approach preserves performance for all clients that do not use services.
1459+
-- 2) at least one user ID has services enabled:
1460+
-- Service will be loaded for all user/server combinations:
1461+
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
1462+
-- b) service is disabled and record exists: service record and all associations will be removed,
1463+
-- c) service is disabled or no record: no subscription attempt.
1464+
-- On successful service subscription, only unassociated queues will be subscribed.
1465+
userSrvs'' <-
1466+
if any id useServices
1467+
then lift $ mapConcurrently (subscribeService useServices) userSrvs'
1468+
else pure $ map (,False) userSrvs'
1469+
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs''
14551470
let (errs, oks) = partitionEithers rs
14561471
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
14571472
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
@@ -1460,21 +1475,31 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14601475
resumeAllCommands c
14611476
where
14621477
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
1463-
subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int)
1464-
subscribeUserServer maxPending currPending (userId, srv) = do
1478+
subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc)
1479+
subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do
1480+
withStore' c (\db -> getSubscriptionService db userId srv) >>= \case
1481+
Just serviceSub -> case M.lookup userId useServices of
1482+
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
1483+
Left e | clientServiceError e -> unassocQueues $> False
1484+
_ -> pure True
1485+
_ -> unassocQueues $> False
1486+
where
1487+
unassocQueues = withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
1488+
_ -> pure False
1489+
subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int)
1490+
subscribeUserServer maxPending currPending ((userId, srv), hasService) = do
14651491
atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry
14661492
tryAllErrors' $ do
14671493
qs <- withStore' c $ \db -> do
1468-
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded
1469-
atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
1494+
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded hasService
1495+
unless (null qs) $ atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
14701496
pure qs
14711497
let n = length qs
1472-
lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n)
1498+
unless (null qs) $ lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n)
14731499
pure n
14741500
where
14751501
subscribe qs = do
14761502
rs <- subscribeUserServerQueues c userId srv qs
1477-
-- TODO [certs rcv] storeClientServiceAssocs store associations of queues with client service ID
14781503
ns <- asks ntfSupervisor
14791504
whenM (liftIO $ hasInstantNotifications ns) $ sendNtfCreate ns rs
14801505
sendNtfCreate :: NtfSupervisor -> [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> AM' ()
@@ -1522,7 +1547,7 @@ subscribeClientServices' c userId =
15221547
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
15231548
subscribe = do
15241549
srvs <- withStore' c (`getClientServiceServers` userId)
1525-
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv n idsHash) srvs
1550+
lift $ M.fromList <$> mapConcurrently (\(srv, serviceSub) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv serviceSub) srvs
15261551

15271552
-- requesting messages sequentially, to reduce memory usage
15281553
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ module Simplex.Messaging.Agent.Client
120120
getAgentSubscriptions,
121121
slowNetworkConfig,
122122
protocolClientError,
123+
clientServiceError,
123124
Worker (..),
124125
SessionVar (..),
125126
SubscriptionsInfo (..),
@@ -303,7 +304,7 @@ import Simplex.Messaging.Session
303304
import Simplex.Messaging.SystemTime
304305
import Simplex.Messaging.TMap (TMap)
305306
import qualified Simplex.Messaging.TMap as TM
306-
import Simplex.Messaging.Transport (SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleParams (sessionId, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion)
307+
import Simplex.Messaging.Transport (HandshakeError (..), SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleAuth (..), THandleParams (sessionId, thAuth, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion)
307308
import Simplex.Messaging.Transport.Client (TransportHost (..))
308309
import Simplex.Messaging.Transport.Credentials
309310
import Simplex.Messaging.Util
@@ -619,7 +620,7 @@ getServiceCredentials c userId srv =
619620
let g = agentDRG c
620621
((C.KeyHash kh, serviceCreds), serviceId_) <-
621622
withStore' c $ \db ->
622-
getClientService db userId srv >>= \case
623+
getClientServiceCredentials db userId srv >>= \case
623624
Just service -> pure service
624625
Nothing -> do
625626
cred <- genCredentials g Nothing (25, 24 * 999999) "simplex"
@@ -747,15 +748,13 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
747748
smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
748749
ts <- readTVarIO proxySessTs
749750
ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
750-
-- TODO [certs rcv] add service to SS, possibly combine with SS.setSessionId
751751
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
752752
updateClientService service smp
753753
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
754-
-- TODO [certs rcv] this should differentiate between service ID just set and service ID changed, and in the latter case disassociate the queues
755754
updateClientService service smp = case (service, smpClientService smp) of
756-
(Just (_, serviceId_), Just THClientService {serviceId})
757-
| serviceId_ /= Just serviceId -> withStore' c $ \db -> setClientServiceId db userId srv serviceId
758-
| otherwise -> pure ()
755+
(Just (_, serviceId_), Just THClientService {serviceId}) -> withStore' c $ \db -> do
756+
setClientServiceId db userId srv serviceId
757+
forM_ serviceId_ $ \sId -> when (sId /= serviceId) $ removeRcvServiceAssocs db userId srv
759758
(Just _, Nothing) -> withStore' c $ \db -> deleteClientService db userId srv -- e.g., server version downgrade
760759
(Nothing, Just _) -> logError "server returned serviceId without service credentials in request"
761760
(Nothing, Nothing) -> pure ()
@@ -1258,6 +1257,14 @@ protocolClientError protocolError_ host = \case
12581257
PCEServiceUnavailable {} -> BROKER host NO_SERVICE
12591258
PCEIOError e -> BROKER host $ NETWORK $ NEConnectError $ E.displayException e
12601259

1260+
-- it is consistent with smpClientServiceError
1261+
clientServiceError :: AgentErrorType -> Bool
1262+
clientServiceError = \case
1263+
BROKER _ NO_SERVICE -> True
1264+
SMP _ SMP.SERVICE -> True
1265+
SMP _ (SMP.PROXY (SMP.BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen.
1266+
_ -> False
1267+
12611268
data ProtocolTestStep
12621269
= TSConnect
12631270
| TSDisconnect
@@ -1446,8 +1453,8 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
14461453
withClient c nm tSess $ \(SMPConnectedClient smp _) -> do
14471454
(ntfKeys, ntfCreds) <- liftIO $ mkNtfCreds a g smp
14481455
(thParams smp,ntfKeys,) <$> createSMPQueue smp nm nonce_ rKeys dhKey auth subMode (queueReqData cqrd) ntfCreds
1449-
-- TODO [certs rcv] validate that serviceId is the same as in the client session, fail otherwise
1450-
-- possibly, it should allow returning Nothing - it would indicate incorrect old version
1456+
let sessServiceId = (\THClientService {serviceId = sId} -> sId) <$> (clientService =<< thAuth thParams')
1457+
when (isJust serviceId && serviceId /= sessServiceId) $ logError "incorrect service ID in NEW response"
14511458
liftIO . logServer "<--" c srv NoEntity $ B.unwords ["IDS", logSecret rcvId, logSecret sndId]
14521459
shortLink <- mkShortLinkCreds thParams' qik
14531460
let rq =
@@ -1463,7 +1470,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
14631470
sndId,
14641471
queueMode,
14651472
shortLink,
1466-
rcvServiceAssoc = isJust serviceId,
1473+
rcvServiceAssoc = isJust serviceId && serviceId == sessServiceId,
14671474
status = New,
14681475
enableNtfs,
14691476
clientNoticeId = Nothing,
@@ -1559,6 +1566,8 @@ temporaryAgentError :: AgentErrorType -> Bool
15591566
temporaryAgentError = \case
15601567
BROKER _ e -> tempBrokerError e
15611568
SMP _ (SMP.PROXY (SMP.BROKER e)) -> tempBrokerError e
1569+
SMP _ (SMP.STORE _) -> True
1570+
NTF _ (SMP.STORE _) -> True
15621571
XFTP _ XFTP.TIMEOUT -> True
15631572
PROXY _ _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> tempBrokerError e
15641573
PROXY _ _ (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> True
@@ -1569,6 +1578,7 @@ temporaryAgentError = \case
15691578
tempBrokerError = \case
15701579
NETWORK _ -> True
15711580
TIMEOUT -> True
1581+
TRANSPORT (TEHandshake BAD_SERVICE) -> True -- this error is considered temporary because it is DB error
15721582
_ -> False
15731583

15741584
temporaryOrHostError :: AgentErrorType -> Bool
@@ -1715,11 +1725,16 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
17151725
notifySub' c "" $ ERR e
17161726

17171727
resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult
1718-
resubscribeClientService c tSess serviceSub =
1719-
withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub
1720-
1721-
subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSubResult
1722-
subscribeClientService c withEvent userId srv n idsHash =
1728+
resubscribeClientService c tSess@(userId, srv, _) serviceSub =
1729+
withServiceClient c tSess (\smp _ -> subscribeClientService_ c True tSess smp serviceSub) `catchE` \e -> do
1730+
when (clientServiceError e) $ do
1731+
qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
1732+
void $ lift $ subscribeUserServerQueues c userId srv qs
1733+
throwE e
1734+
1735+
-- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event
1736+
subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult
1737+
subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) =
17231738
withServiceClient c tSess $ \smp smpServiceId -> do
17241739
let serviceSub = ServiceSub smpServiceId n idsHash
17251740
atomically $ SS.setPendingServiceSub tSess serviceSub $ currentSubs c
@@ -1728,14 +1743,15 @@ subscribeClientService c withEvent userId srv n idsHash =
17281743
tSess = (userId, srv, Nothing)
17291744

17301745
withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a
1731-
withServiceClient c tSess action =
1746+
withServiceClient c tSess subscribe =
17321747
withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
17331748
case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of
1734-
Just smpServiceId -> action smp smpServiceId
1749+
Just smpServiceId -> subscribe smp smpServiceId
17351750
Nothing -> throwE PCEServiceUnavailable
17361751

1752+
-- TODO [certs rcv] send subscription error event?
17371753
subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult
1738-
subscribeClientService_ c withEvent tSess@(_, srv, _) smp expected@(ServiceSub _ n idsHash) = do
1754+
subscribeClientService_ c withEvent tSess@(userId, srv, _) smp expected@(ServiceSub _ n idsHash) = do
17391755
subscribed <- subscribeService smp SMP.SRecipientService n idsHash
17401756
let sessId = sessionId $ thParams smp
17411757
r = serviceSubResult expected subscribed

0 commit comments

Comments
 (0)