Skip to content

Commit a90601c

Browse files
committed
agent: always remove service when disabled, fix service subscriptions
1 parent 197e3c8 commit a90601c

File tree

9 files changed

+116
-110
lines changed

9 files changed

+116
-110
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,10 +1042,10 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni
10421042
createRcvQueue nonce_ qd e2eKeys = do
10431043
AgentConfig {smpClientVRange = vr} <- asks config
10441044
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
1045-
(rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
1045+
(rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
10461046
atomically $ incSMPServerStat c userId srv connCreated
10471047
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode
1048-
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
1048+
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
10491049
mapM_ (newQueueNtfSubscription c rq') ntfServer_
10501050
pure (rq', qUri)
10511051
createConnReq :: SMPQueueUri -> AM (ConnectionRequestUri c)
@@ -1293,11 +1293,11 @@ joinConnSrvAsync _c _userId _connId _enableNtfs (CRContactUri _) _cInfo _subMode
12931293
createReplyQueue :: AgentClient -> NetworkRequestMode -> ConnData -> SndQueue -> SubscriptionMode -> SMPServerWithAuth -> AM SMPQueueInfo
12941294
createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do
12951295
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
1296-
(rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
1296+
(rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
12971297
atomically $ incSMPServerStat c userId (qServer rq) connCreated
12981298
let qInfo = toVersionT qUri smpClientVersion
12991299
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode
1300-
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
1300+
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
13011301
mapM_ (newQueueNtfSubscription c rq') ntfServer_
13021302
pure qInfo
13031303

@@ -1453,22 +1453,14 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14531453
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
14541454
Nothing -> userSrvs
14551455
useServices <- readTVarIO $ useClientServices c
1456-
-- These options are possible below:
1457-
-- 1) services fully disabled:
1458-
-- No service subscriptions will be attempted, and existing services and association will remain in in the database,
1459-
-- but they will be ignored because of hasService parameter set to False.
1460-
-- This approach preserves performance for all clients that do not use services.
1461-
-- 2) at least one user ID has services enabled:
1462-
-- Service will be loaded for all user/server combinations:
1463-
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
1464-
-- b) service is disabled and record exists: service record and all associations will be removed,
1465-
-- c) service is disabled or no record: no subscription attempt.
1456+
-- Service will be loaded for all user/server combinations:
1457+
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
1458+
-- b) service is disabled and record exists: service record and all associations will be removed,
1459+
-- c) service is disabled or no record: no subscription attempt.
14661460
-- On successful service subscription, only unassociated queues will be subscribed.
1467-
userSrvs'' <-
1468-
if any id useServices
1469-
then lift $ mapConcurrently (subscribeService useServices) userSrvs'
1470-
else pure $ map (,False) userSrvs'
1471-
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs''
1461+
userSrvs2 <- withStore' c $ \db -> mapM (getService db useServices) userSrvs'
1462+
userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2
1463+
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3
14721464
let (errs, oks) = partitionEithers rs
14731465
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
14741466
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
@@ -1477,23 +1469,27 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14771469
resumeAllCommands c
14781470
where
14791471
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
1480-
subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc)
1481-
subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do
1482-
withStore' c (\db -> getSubscriptionService db userId srv) >>= \case
1472+
getService :: DB.Connection -> Map UserId Bool -> (UserId, SMPServer) -> IO ((UserId, SMPServer), Maybe ServiceSub)
1473+
getService db useServices us@(userId, srv) =
1474+
fmap (us,) $ getSubscriptionService db userId srv >>= \case
14831475
Just serviceSub -> case M.lookup userId useServices of
1484-
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
1485-
Right (ServiceSubResult e _) -> case e of
1486-
Just SSErrorServiceId {} -> unassocQueues
1487-
-- Below would resubscribe all queues after service was disabled and re-enabled
1488-
-- Possibly, we should always resubscribe all with expected is greated than subscribed
1489-
Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
1490-
_ -> pure True
1491-
Left e -> do
1492-
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
1493-
if clientServiceError e
1494-
then unassocQueues
1495-
else pure True
1496-
_ -> unassocQueues
1476+
Just True -> pure $ Just serviceSub
1477+
_ -> Nothing <$ unassocUserServerRcvQueueSubs' db userId srv
1478+
_ -> pure Nothing
1479+
subscribeService :: ((UserId, SMPServer), Maybe ServiceSub) -> AM' ((UserId, SMPServer), ServiceAssoc)
1480+
subscribeService (us@(userId, srv), serviceSub_) = fmap ((us,) . fromRight False) $ tryAllErrors' $
1481+
case serviceSub_ of
1482+
Just serviceSub -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
1483+
Right (ServiceSubResult e _) -> case e of
1484+
Just SSErrorServiceId {} -> unassocQueues
1485+
-- Possibly, we should always resubscribe all when expected is greater than subscribed
1486+
Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
1487+
_ -> pure True
1488+
Left e -> do
1489+
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
1490+
if clientServiceError e
1491+
then unassocQueues
1492+
else pure True
14971493
where
14981494
unassocQueues :: AM Bool
14991495
unassocQueues = False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv)
@@ -2231,10 +2227,10 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
22312227
srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth
22322228
-- TODO [notications] possible improvement would be to create ntf credentials here, to avoid creating them after rotation completes.
22332229
-- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions.
2234-
(q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
2230+
(q, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
22352231
let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
22362232
rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' SMSubscribe
2237-
lift $ addNewQueueSubscription c rq'' tSess sessId
2233+
lift $ addNewQueueSubscription c rq'' tSess sessId serviceId_
22382234
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
22392235
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD
22402236
let rqs' = updatedQs rq1 rqs <> [rq'']
@@ -2920,7 +2916,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
29202916
processSubOk :: RcvQueue -> TVar [ConnId] -> TVar [RcvQueue] -> Maybe SMP.ServiceId -> IO ()
29212917
processSubOk rq@RcvQueue {connId} upConnIds serviceRQs serviceId_ =
29222918
atomically . whenM (isPendingSub rq) $ do
2923-
SS.addActiveSub tSess sessId rq $ currentSubs c
2919+
SS.addActiveSub tSess sessId serviceId_ rq $ currentSubs c
29242920
modifyTVar' upConnIds (connId :)
29252921
when (isJust serviceId_ && serviceId_ == clientServiceId_) $ modifyTVar' serviceRQs (rq :)
29262922
clientServiceId_ = (\THClientService {serviceId} -> serviceId) <$> (clientService =<< thAuth)

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,7 +1420,7 @@ getSessionMode :: AgentClient -> STM TransportSessionMode
14201420
getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig
14211421
{-# INLINE getSessionMode #-}
14221422

1423-
newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
1423+
newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId)
14241424
newRcvQueue c nm userId connId srv vRange cMode enableNtfs subMode = do
14251425
let qrd = case cMode of SCMInvitation -> CQRMessaging Nothing; SCMContact -> CQRContact Nothing
14261426
e2eKeys <- atomically . C.generateKeyPair =<< asks random
@@ -1441,7 +1441,7 @@ queueReqData = \case
14411441
CQRMessaging d -> QRMessaging $ srvReq <$> d
14421442
CQRContact d -> QRContact $ srvReq <$> d
14431443

1444-
newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
1444+
newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId)
14451445
newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enableNtfs subMode nonce_ (e2eDhKey, e2ePrivKey) = do
14461446
C.AuthAlg a <- asks (rcvAuthAlg . config)
14471447
g <- asks random
@@ -1483,7 +1483,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
14831483
deleteErrors = 0
14841484
}
14851485
qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey queueMode
1486-
pure (rq, qUri, tSess, sessionId thParams')
1486+
pure (rq, qUri, tSess, sessionId thParams', sessServiceId)
14871487
where
14881488
mkNtfCreds :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> TVar ChaChaDRG -> SMPClient -> IO (Maybe (C.AAuthKeyPair, C.PrivateKeyX25519), Maybe NewNtfCreds)
14891489
mkNtfCreds a g smp
@@ -1828,14 +1828,14 @@ getRemovedSubs AgentClient {removedSubs} k = TM.lookup k removedSubs >>= maybe n
18281828
TM.insert k s removedSubs
18291829
pure s
18301830

1831-
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' ()
1832-
addNewQueueSubscription c rq' tSess sessId = do
1831+
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> Maybe ServiceId -> AM' ()
1832+
addNewQueueSubscription c rq' tSess sessId serviceId_ = do
18331833
let rq = rcvQueueSub rq'
18341834
same <- atomically $ do
18351835
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
18361836
active <- activeClientSession c tSess sessId
18371837
if active
1838-
then SS.addActiveSub tSess sessId rq' $ currentSubs c
1838+
then SS.addActiveSub tSess sessId serviceId_ rq' $ currentSubs c
18391839
else SS.addPendingSub tSess rq $ currentSubs c
18401840
pure active
18411841
unless same $ resubscribeSMPSession c tSess

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ module Simplex.Messaging.Agent.Store.AgentStore
3838
-- * Client services
3939
createClientService,
4040
getClientServiceCredentials,
41-
getSubscriptionServices,
4241
getSubscriptionService,
4342
getClientServiceServers,
4443
setClientServiceId,
@@ -345,7 +344,7 @@ handleSQLError err e = case constraintViolation e of
345344
handleSQLError :: StoreError -> SQLError -> StoreError
346345
handleSQLError err e
347346
| SQL.sqlError e == SQL.ErrorConstraint = err
348-
| otherwise = SEInternal $ bshow e
347+
| otherwise = SEInternal $ encodeUtf8 $ tshow e <> ": " <> SQL.sqlErrorDetails e <> ", " <> SQL.sqlErrorContext e
349348
#endif
350349

351350
createUserRecord :: DB.Connection -> IO UserId
@@ -440,11 +439,6 @@ getClientServiceCredentials db userId srv =
440439
where
441440
toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)
442441

443-
getSubscriptionServices :: DB.Connection -> IO [(UserId, (SMPServer, ServiceSub))]
444-
getSubscriptionServices db = map toUserService <$> DB.query_ db clientServiceQuery
445-
where
446-
toUserService (Only userId :. serviceRow) = (userId, toServerService serviceRow)
447-
448442
getSubscriptionService :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ServiceSub)
449443
getSubscriptionService db userId (SMPServer h p kh) =
450444
maybeFirstRow toService $
@@ -454,23 +448,24 @@ getSubscriptionService db userId (SMPServer h p kh) =
454448
SELECT c.service_id, c.service_queue_count, c.service_queue_ids_hash
455449
FROM client_services c
456450
JOIN servers s ON s.host = c.host AND s.port = c.port
457-
WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ?
451+
WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ? AND service_id IS NOT NULL
458452
|]
459453
(userId, h, p, kh)
460454
where
461455
toService (serviceId, qCnt, idsHash) = ServiceSub serviceId qCnt idsHash
462456

463457
getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)]
464458
getClientServiceServers db userId =
465-
map toServerService <$> DB.query db (clientServiceQuery <> " WHERE c.user_id = ?") (Only userId)
466-
467-
clientServiceQuery :: Query
468-
clientServiceQuery =
469-
[sql|
470-
SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash
471-
FROM client_services c
472-
JOIN servers s ON s.host = c.host AND s.port = c.port
473-
|]
459+
map toServerService <$>
460+
DB.query
461+
db
462+
[sql|
463+
SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash
464+
FROM client_services c
465+
JOIN servers s ON s.host = c.host AND s.port = c.port
466+
WHERE c.user_id = ? AND service_id IS NOT NULL
467+
|]
468+
(Only userId)
474469

475470
toServerService :: (NonEmpty TransportHost, ServiceName, C.KeyHash, ServiceId, Int64, Binary ByteString) -> (ProtocolServer 'PSMP, ServiceSub)
476471
toServerService (host, port, kh, serviceId, n, Binary idsHash) =
@@ -488,14 +483,20 @@ setClientServiceId db userId srv serviceId =
488483
(serviceId, userId, host srv, port srv)
489484

490485
deleteClientService :: DB.Connection -> UserId -> SMPServer -> IO ()
491-
deleteClientService db userId srv =
486+
deleteClientService db userId (SMPServer h p kh) =
492487
DB.execute
493488
db
494489
[sql|
495490
DELETE FROM client_services
496491
WHERE user_id = ? AND host = ? AND port = ?
492+
AND EXISTS (
493+
SELECT 1 FROM servers s
494+
WHERE s.host = client_services.host
495+
AND s.port = client_services.port
496+
AND COALESCE(client_services.server_key_hash, s.key_hash) = ?
497+
);
497498
|]
498-
(userId, host srv, port srv)
499+
(userId, h, p, Just kh)
499500

500501
deleteClientServices :: DB.Connection -> UserId -> IO ()
501502
deleteClientServices db userId = do
@@ -2280,7 +2281,8 @@ getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService =
22802281
| otherwise = ""
22812282

22822283
unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
2283-
unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) =
2284+
unassocUserServerRcvQueueSubs db userId srv@(SMPServer h p kh) = do
2285+
deleteClientService db userId srv
22842286
map toRcvQueueSub
22852287
<$> DB.query
22862288
db
@@ -2295,7 +2297,9 @@ unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) =
22952297
|]
22962298

22972299
unassocUserServerRcvQueueSubs' :: DB.Connection -> UserId -> SMPServer -> IO ()
2298-
unassocUserServerRcvQueueSubs' db userId (SMPServer h p kh) = DB.execute db removeRcvAssocsQuery (h, p, userId, kh)
2300+
unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do
2301+
deleteClientService db userId srv
2302+
DB.execute db removeRcvAssocsQuery (h, p, userId, kh)
22992303

23002304
unsetQueuesToSubscribe :: DB.Connection -> IO ()
23012305
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"

0 commit comments

Comments
 (0)