From 3af4b9b90482db9df55b2c85cf29daef42dd1e91 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 28 Nov 2025 20:30:56 +0000 Subject: [PATCH 01/10] agent: remove service/queue associations when service ID changes --- src/Simplex/Messaging/Agent/Client.hs | 8 +++---- .../Messaging/Agent/Store/AgentStore.hs | 23 ++++++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 77d73027d..5b694a606 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -747,15 +747,13 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do ts <- readTVarIO proxySessTs ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs - -- TODO [certs rcv] add service to SS, possibly combine with SS.setSessionId atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c updateClientService service smp pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs} - -- TODO [certs rcv] this should differentiate between service ID just set and service ID changed, and in the latter case disassociate the queues updateClientService service smp = case (service, smpClientService smp) of - (Just (_, serviceId_), Just THClientService {serviceId}) - | serviceId_ /= Just serviceId -> withStore' c $ \db -> setClientServiceId db userId srv serviceId - | otherwise -> pure () + (Just (_, serviceId_), Just THClientService {serviceId}) -> withStore' c $ \db -> do + setClientServiceId db userId srv serviceId + forM_ serviceId_ $ \sId -> when (sId /= serviceId) $ removeRcvServiceAssocs db userId srv (Just _, Nothing) -> withStore' c $ \db -> deleteClientService db userId srv -- e.g., server version downgrade (Nothing, Just _) -> logError "server returned serviceId without service credentials in request" (Nothing, Nothing) -> pure () diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index a732d28d4..14c98fb30 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -54,6 +54,7 @@ module Simplex.Messaging.Agent.Store.AgentStore getUserServerRcvQueueSubs, unsetQueuesToSubscribe, setRcvServiceAssocs, + removeRcvServiceAssocs, getConnIds, getConn, getDeletedConn, @@ -2256,9 +2257,29 @@ setRcvServiceAssocs db rqs = #if defined(dbPostgres) DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN " $ Only $ In (map queueId rqs) #else - DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs + DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_isd = ?" $ map (Only . queueId) rqs #endif +removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO () +removeRcvServiceAssocs db userId (SMPServer h p kh) = + DB.execute + db + [sql| + UPDATE rcv_queues + SET rcv_service_assoc = 0 + WHERE EXISTS ( + SELECT 1 + FROM connections c + JOIN servers s ON rcv_queues.host = s.host AND rcv_queues.port = s.port + WHERE c.conn_id = rcv_queues.conn_id + AND c.user_id = ? + AND rcv_queues.host = ? + AND rcv_queues.port = ? + AND COALESCE(rcv_queues.server_key_hash, s.key_hash) = ? + ) + |] + (userId, h, p, kh) + -- * getConn helpers getConnIds :: DB.Connection -> IO [ConnId] From 29c6b7508728d80bd5232b0290bbe6e2e06104fc Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 28 Nov 2025 21:11:38 +0000 Subject: [PATCH 02/10] agent: check that service ID in NEW response matches session ID in transport session --- src/Simplex/Messaging/Agent/Client.hs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 5b694a606..4f02c022e 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -303,7 +303,7 @@ import Simplex.Messaging.Session import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleParams (sessionId, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion) +import Simplex.Messaging.Transport (SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleAuth (..), THandleParams (sessionId, thAuth, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion) import Simplex.Messaging.Transport.Client (TransportHost (..)) import Simplex.Messaging.Transport.Credentials import Simplex.Messaging.Util @@ -1444,8 +1444,8 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl withClient c nm tSess $ \(SMPConnectedClient smp _) -> do (ntfKeys, ntfCreds) <- liftIO $ mkNtfCreds a g smp (thParams smp,ntfKeys,) <$> createSMPQueue smp nm nonce_ rKeys dhKey auth subMode (queueReqData cqrd) ntfCreds - -- TODO [certs rcv] validate that serviceId is the same as in the client session, fail otherwise - -- possibly, it should allow returning Nothing - it would indicate incorrect old version + let sessServiceId = (\THClientService {serviceId = sId} -> sId) <$> (clientService =<< thAuth thParams') + when (isJust serviceId && serviceId /= sessServiceId) $ logError "incorrect service ID in NEW response" liftIO . logServer "<--" c srv NoEntity $ B.unwords ["IDS", logSecret rcvId, logSecret sndId] shortLink <- mkShortLinkCreds thParams' qik let rq = @@ -1461,7 +1461,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl sndId, queueMode, shortLink, - rcvServiceAssoc = isJust serviceId, + rcvServiceAssoc = isJust serviceId && serviceId == sessServiceId, status = New, enableNtfs, clientNoticeId = Nothing, From e220e1844d0a21e070a86cfb09fa4637974c2e8a Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 2 Dec 2025 12:24:03 +0000 Subject: [PATCH 03/10] agent subscription WIP --- src/Simplex/Messaging/Agent.hs | 29 ++++--- src/Simplex/Messaging/Agent/Client.hs | 33 ++++++-- .../Messaging/Agent/Store/AgentStore.hs | 79 +++++++++++++++---- src/Simplex/Messaging/Client.hs | 1 + 4 files changed, 109 insertions(+), 33 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 18e9d0465..eb35e32d4 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -153,7 +153,7 @@ import Data.Bifunctor (bimap, first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Composition -import Data.Either (isRight, partitionEithers, rights) +import Data.Either (fromRight, isRight, partitionEithers, rights) import Data.Foldable (foldl', toList) import Data.Functor (($>)) import Data.Functor.Identity @@ -221,7 +221,6 @@ import Simplex.Messaging.Protocol SMPMsgMeta, SParty (..), SProtocolType (..), - ServiceSub (..), ServiceSubResult, SndPublicAuthKey, SubscriptionMode (..), @@ -1451,7 +1450,12 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do let userSrvs' = case activeUserId_ of Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs Nothing -> userSrvs - rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs' + useServices <- readTVarIO $ useClientServices c + userSrvs'' <- + if any id useServices + then lift $ mapConcurrently (subscribeService useServices) userSrvs' + else pure $ map (,False) userSrvs' + rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs'' let (errs, oks) = partitionEithers rs logInfo $ "subscribed " <> tshow (sum oks) <> " queues" forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",) @@ -1460,16 +1464,23 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do resumeAllCommands c where handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e) - subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int) - subscribeUserServer maxPending currPending (userId, srv) = do + subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc) + subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do + withStore' c (\db -> getSubscriptionService db userId srv) >>= \case + Just serviceSub -> case M.lookup userId useServices of + Just True -> isRight <$> tryAllErrors (subscribeClientService c True userId srv serviceSub) + _ -> False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs db userId srv) + _ -> pure False + subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int) + subscribeUserServer maxPending currPending ((userId, srv), hasService) = do atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry tryAllErrors' $ do qs <- withStore' c $ \db -> do - qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded - atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction + qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded hasService + unless (null qs) $ atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction pure qs let n = length qs - lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n) + unless (null qs) $ lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n) pure n where subscribe qs = do @@ -1522,7 +1533,7 @@ subscribeClientServices' c userId = useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c) subscribe = do srvs <- withStore' c (`getClientServiceServers` userId) - lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv n idsHash) srvs + lift $ M.fromList <$> mapConcurrently (\(srv, serviceSub) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c False userId srv serviceSub) srvs -- requesting messages sequentially, to reduce memory usage getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta))) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 4f02c022e..43cb09443 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -303,7 +303,7 @@ import Simplex.Messaging.Session import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleAuth (..), THandleParams (sessionId, thAuth, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion) +import Simplex.Messaging.Transport (HandshakeError (..), SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleAuth (..), THandleParams (sessionId, thAuth, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion) import Simplex.Messaging.Transport.Client (TransportHost (..)) import Simplex.Messaging.Transport.Credentials import Simplex.Messaging.Util @@ -619,7 +619,7 @@ getServiceCredentials c userId srv = let g = agentDRG c ((C.KeyHash kh, serviceCreds), serviceId_) <- withStore' c $ \db -> - getClientService db userId srv >>= \case + getClientServiceCredentials db userId srv >>= \case Just service -> pure service Nothing -> do cred <- genCredentials g Nothing (25, 24 * 999999) "simplex" @@ -1256,6 +1256,15 @@ protocolClientError protocolError_ host = \case PCEServiceUnavailable {} -> BROKER host NO_SERVICE PCEIOError e -> BROKER host $ NETWORK $ NEConnectError $ E.displayException e +-- it is consistent with smpClientServiceError +clientServiceError :: AgentErrorType -> Bool +clientServiceError = \case + BROKER _ NO_SERVICE -> True + BROKER _ (TRANSPORT (TEHandshake BAD_SERVICE)) -> True -- TODO [certs rcv] this error may be temporary, so we should possibly resubscribe. + SMP _ SMP.SERVICE -> True + SMP _ (SMP.PROXY (SMP.BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen. + _ -> False + data ProtocolTestStep = TSConnect | TSDisconnect @@ -1716,8 +1725,9 @@ resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> resubscribeClientService c tSess serviceSub = withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub -subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSubResult -subscribeClientService c withEvent userId srv n idsHash = +-- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event +subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult +subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) = withServiceClient c tSess $ \smp smpServiceId -> do let serviceSub = ServiceSub smpServiceId n idsHash atomically $ SS.setPendingServiceSub tSess serviceSub $ currentSubs c @@ -1726,14 +1736,21 @@ subscribeClientService c withEvent userId srv n idsHash = tSess = (userId, srv, Nothing) withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a -withServiceClient c tSess action = - withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) -> +withServiceClient c tSess@(userId, srv, _) subscribe = + unassocOnError $ withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) -> case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of - Just smpServiceId -> action smp smpServiceId + Just smpServiceId -> subscribe smp smpServiceId Nothing -> throwE PCEServiceUnavailable + where + unassocOnError a = a `catchE` \e -> do + when (clientServiceError e) $ do + qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv + void $ lift $ subscribeUserServerQueues c userId srv qs + throwE e +-- TODO [certs rcv] send subscription error event? subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult -subscribeClientService_ c withEvent tSess@(_, srv, _) smp expected@(ServiceSub _ n idsHash) = do +subscribeClientService_ c withEvent tSess@(userId, srv, _) smp expected@(ServiceSub _ n idsHash) = do subscribed <- subscribeService smp SMP.SRecipientService n idsHash let sessId = sessionId $ thParams smp r = serviceSubResult expected subscribed diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 14c98fb30..8e33f2741 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -37,7 +37,9 @@ module Simplex.Messaging.Agent.Store.AgentStore -- * Client services createClientService, - getClientService, + getClientServiceCredentials, + getSubscriptionServices, + getSubscriptionService, getClientServiceServers, setClientServiceId, deleteClientService, @@ -52,6 +54,7 @@ module Simplex.Messaging.Agent.Store.AgentStore updateClientNotices, getSubscriptionServers, getUserServerRcvQueueSubs, + unassocUserServerRcvQueueSubs, unsetQueuesToSubscribe, setRcvServiceAssocs, removeRcvServiceAssocs, @@ -420,8 +423,8 @@ createClientService db userId srv (kh, (cert, pk)) = do |] (userId, host srv, port srv, serverKeyHash_, kh, cert, pk) -getClientService :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ((C.KeyHash, TLS.Credential), Maybe ServiceId)) -getClientService db userId srv = +getClientServiceCredentials :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ((C.KeyHash, TLS.Credential), Maybe ServiceId)) +getClientServiceCredentials db userId srv = maybeFirstRow toService $ DB.query db @@ -436,21 +439,41 @@ getClientService db userId srv = where toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_) -getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)] -getClientServiceServers db userId = - map toServer - <$> DB.query +getSubscriptionServices :: DB.Connection -> IO [(UserId, (SMPServer, ServiceSub))] +getSubscriptionServices db = map toUserService <$> DB.query_ db clientServiceQuery + where + toUserService (Only userId :. serviceRow) = (userId, toServerService serviceRow) + +getSubscriptionService :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ServiceSub) +getSubscriptionService db userId (SMPServer h p kh) = + maybeFirstRow toService $ + DB.query db [sql| - SELECT c.host, c.port, s.key_hash, c.service_id, c.service_queue_count, c.service_queue_ids_hash + SELECT c.service_id, c.service_queue_count, c.service_queue_ids_hash FROM client_services c JOIN servers s ON s.host = c.host AND s.port = c.port - WHERE c.user_id = ? + WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ? |] - (Only userId) + (userId, h, p, kh) where - toServer (host, port, kh, serviceId, n, Binary idsHash) = - (SMPServer host port kh, ServiceSub serviceId n (IdsHash idsHash)) + toService (serviceId, qCnt, idsHash) = ServiceSub serviceId qCnt idsHash + +getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)] +getClientServiceServers db userId = + map toServerService <$> DB.query db (clientServiceQuery <> " WHERE c.user_id = ?") (Only userId) + +clientServiceQuery :: Query +clientServiceQuery = + [sql| + SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash + FROM client_services c + JOIN servers s ON s.host = c.host AND s.port = c.port + |] + +toServerService :: (NonEmpty TransportHost, ServiceName, C.KeyHash, ServiceId, Int64, Binary ByteString) -> (ProtocolServer 'PSMP, ServiceSub) +toServerService (host, port, kh, serviceId, n, Binary idsHash) = + (SMPServer host port kh, ServiceSub serviceId n (IdsHash idsHash)) setClientServiceId :: DB.Connection -> UserId -> SMPServer -> ServiceId -> IO () setClientServiceId db userId srv serviceId = @@ -474,7 +497,9 @@ deleteClientService db userId srv = (userId, host srv, port srv) deleteClientServices :: DB.Connection -> UserId -> IO () -deleteClientServices db userId = DB.execute db "DELETE FROM client_services WHERE user_id = ?" (Only userId) +deleteClientServices db userId = do + DB.execute db "DELETE FROM client_services WHERE user_id = ?" (Only userId) + removeUserRcvServiceAssocs db userId createConn_ :: TVar ChaChaDRG -> @@ -2237,17 +2262,24 @@ getSubscriptionServers db onlyNeeded = toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer) toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash) -getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> IO [RcvQueueSub] -getUserServerRcvQueueSubs db userId srv onlyNeeded = +-- TODO [certs rcv] check index for getting queues with service present +getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> ServiceAssoc -> IO [RcvQueueSub] +getUserServerRcvQueueSubs db userId srv onlyNeeded hasService = map toRcvQueueSub <$> DB.query db - (rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?") + (rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?" <> serviceCond) (userId, host srv, port srv) where toSubscribe | onlyNeeded = " WHERE q.to_subscribe = 1 AND " | otherwise = " WHERE " + serviceCond + | hasService = " AND q.rcv_service_assoc = 0" + | otherwise = "" + +unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub] +unassocUserServerRcvQueueSubs db userId srv = undefined unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" @@ -2280,6 +2312,21 @@ removeRcvServiceAssocs db userId (SMPServer h p kh) = |] (userId, h, p, kh) +removeUserRcvServiceAssocs :: DB.Connection -> UserId -> IO () +removeUserRcvServiceAssocs db userId = + DB.execute + db + [sql| + UPDATE rcv_queues + SET rcv_service_assoc = 0 + WHERE EXISTS ( + SELECT 1 + FROM connections c + WHERE c.conn_id = rcv_queues.conn_id AND c.user_id = ? + ) + |] + (Only userId) + -- * getConn helpers getConnIds :: DB.Connection -> IO [ConnId] diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 81e9820a2..7284101f3 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -778,6 +778,7 @@ temporaryClientError = \case _ -> False {-# INLINE temporaryClientError #-} +-- it is consistent with clientServiceError smpClientServiceError :: SMPClientError -> Bool smpClientServiceError = \case PCEServiceUnavailable -> True From 9f0540c7f100054566679cc5361242daa9d8096f Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 3 Dec 2025 12:08:55 +0000 Subject: [PATCH 04/10] test --- tests/AgentTests/FunctionalAPITests.hs | 41 ++++++++++++++++++-------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index cb74bc0b6..7c8b7d39c 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -124,8 +124,6 @@ import Fixtures #endif #if defined(dbServerPostgres) import qualified Database.PostgreSQL.Simple as PSQL -import Simplex.Messaging.Agent.Store (Connection' (..), StoredRcvQueue (..), SomeConn' (..)) -import Simplex.Messaging.Agent.Store.AgentStore (getConn) import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue) import Simplex.Messaging.Server.MsgStore.Postgres (PostgresQueue) import Simplex.Messaging.Server.MsgStore.Types (QSType (..)) @@ -477,7 +475,7 @@ functionalAPITests ps = do it "should connect two users and switch session mode" $ withSmpServer ps testTwoUsers describe "Client service certificates" $ do - it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps + fit "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps describe "Connection switch" $ do describe "should switch delivery to the new queue" $ testServerMatrix2 ps testSwitchConnection @@ -3679,26 +3677,43 @@ testClientServiceConnection ps = do subscribeConnection user sId exchangeGreetingsMsgId 4 service uId user sId pure (conns, qIdHash) - withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + (uId', sId') <- withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do - [(_, Right (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 1 qIdHash')))] <- M.toList <$> subscribeClientServices service 1 - ("", "", SERVICE_ALL _) <- nGet service - liftIO $ qIdHash' `shouldBe` qIdHash + subscribeAllConnections service False Nothing + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 1 qIdHash')))) -> qIdHash' == qIdHash; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] subscribeConnection user sId exchangeGreetingsMsgId 6 service uId user sId ("", "", DOWN _ [_]) <- nGet user ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 qIdHash')) <- nGet service qIdHash' `shouldBe` qIdHash -- TODO [certs rcv] how to integrate service counts into stats - -- r <- nGet service -- TODO [certs rcv] some event when service disconnects with count - -- print r withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do ("", "", UP _ [_]) <- nGet user - ("", "", SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 1 qIdHash''))) <- nGet service - ("", "", SERVICE_ALL _) <- nGet service - liftIO $ qIdHash'' `shouldBe` qIdHash - -- r <- nGet service -- TODO [certs rcv] some event when service reconnects with count + -- Nothing in ServiceSubResult confirms that both counts and IDs hash match + -- SERVICE_ALL may be deliverd before SERVICE_UP event in case there are no messages to deliver + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 1 qIdHash'')))) -> qIdHash'' == qIdHash; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] exchangeGreetingsMsgId 8 service uId user sId + conns'@(uId', sId') <- makeConnection user service -- opposite direction + exchangeGreetings user sId' service uId' + pure conns' + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + subscribeAllConnections service False Nothing + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 2 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + -- TODO [certs rcv] test message delivery during subscription + subscribeAllConnections user False Nothing + ("", "", UP _ [_, _]) <- nGet user + exchangeGreetingsMsgId 4 user sId' service uId' + exchangeGreetingsMsgId 10 service uId user sId getSMPAgentClient' :: Int -> AgentConfig -> InitialAgentServers -> String -> IO AgentClient getSMPAgentClient' clientId cfg' initServers dbPath = do From 9cb059d1ebf9b46ab90973fccf148d50a2aead3c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 3 Dec 2025 12:19:43 +0000 Subject: [PATCH 05/10] comment --- src/Simplex/Messaging/Agent.hs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index eb35e32d4..e7206442a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1451,6 +1451,17 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs Nothing -> userSrvs useServices <- readTVarIO $ useClientServices c + -- These options are possible below: + -- 1) services fully disabled: + -- No service subscriptions will be attempted, and existing services and association will remain in in the database, + -- but they will be ignored because of hasService parameter set to False. + -- This approach preserves performance for all clients that do not use services. + -- 2) at least one user ID has services enabled: + -- Service will be loaded for all user/server combinations: + -- a) service is enabled for and service record exists: subscription will be attempted, + -- b) service is disabled and record exists: service record and all associations will be removed, + -- c) service is disabled or no record: no subscription attempt. + -- On successful service subscription, only unassociated queues will be subscribed. userSrvs'' <- if any id useServices then lift $ mapConcurrently (subscribeService useServices) userSrvs' @@ -1468,6 +1479,8 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do withStore' c (\db -> getSubscriptionService db userId srv) >>= \case Just serviceSub -> case M.lookup userId useServices of + -- TODO [certs rcv] improve logic to differentiate between permanent and temporary service subscription errors, + -- as the current logic would fall back to per-queue subscriptions on ANY service subscription error (e.g., network connection error). Just True -> isRight <$> tryAllErrors (subscribeClientService c True userId srv serviceSub) _ -> False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs db userId srv) _ -> pure False From f5b4ee89ba0ce801a00b46ac4c7a0ad1f556b524 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 3 Dec 2025 12:41:12 +0000 Subject: [PATCH 06/10] enable tests --- tests/AgentTests/FunctionalAPITests.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 7c8b7d39c..5b3f3506e 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -475,7 +475,7 @@ functionalAPITests ps = do it "should connect two users and switch session mode" $ withSmpServer ps testTwoUsers describe "Client service certificates" $ do - fit "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps + it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps describe "Connection switch" $ do describe "should switch delivery to the new queue" $ testServerMatrix2 ps testSwitchConnection From c0e84e30fe0a82a8e152b3747b618641a096b3dc Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 3 Dec 2025 23:54:13 +0000 Subject: [PATCH 07/10] update queries --- src/Simplex/Messaging/Agent.hs | 2 +- .../Messaging/Agent/Store/AgentStore.hs | 59 +++++++++++-------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e7206442a..4d9313f10 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1458,7 +1458,7 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do -- This approach preserves performance for all clients that do not use services. -- 2) at least one user ID has services enabled: -- Service will be loaded for all user/server combinations: - -- a) service is enabled for and service record exists: subscription will be attempted, + -- a) service is enabled for user ID and service record exists: subscription will be attempted, -- b) service is disabled and record exists: service record and all associations will be removed, -- c) service is disabled or no record: no subscription attempt. -- On successful service subscription, only unassociated queues will be subscribed. diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 8e33f2741..0d0b2af70 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -2278,8 +2278,20 @@ getUserServerRcvQueueSubs db userId srv onlyNeeded hasService = | hasService = " AND q.rcv_service_assoc = 0" | otherwise = "" -unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub] -unassocUserServerRcvQueueSubs db userId srv = undefined +unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub] +unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) = + map toRcvQueueSub + <$> DB.query + db + (removeRcvAssocsQuery <> " " <> returningColums) + (h, p, userId, kh) + where + returningColums = + [sql| + RETURNING c.user_id, rcv_queues.conn_id, rcv_queues.host, rcv_queues.port, COALESCE(rcv_queues.server_key_hash, s.key_hash), + rcv_queues.rcv_id, rcv_queues.rcv_private_key, rcv_queues.status, c.enable_ntfs, rcv_queues.client_notice_id, + rcv_queues.rcv_queue_id, rcv_queues.rcv_primary, rcv_queues.replace_rcv_queue_id + |] unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" @@ -2289,28 +2301,26 @@ setRcvServiceAssocs db rqs = #if defined(dbPostgres) DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN " $ Only $ In (map queueId rqs) #else - DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_isd = ?" $ map (Only . queueId) rqs + DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs #endif removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO () -removeRcvServiceAssocs db userId (SMPServer h p kh) = - DB.execute - db - [sql| - UPDATE rcv_queues - SET rcv_service_assoc = 0 - WHERE EXISTS ( - SELECT 1 - FROM connections c - JOIN servers s ON rcv_queues.host = s.host AND rcv_queues.port = s.port - WHERE c.conn_id = rcv_queues.conn_id - AND c.user_id = ? - AND rcv_queues.host = ? - AND rcv_queues.port = ? - AND COALESCE(rcv_queues.server_key_hash, s.key_hash) = ? - ) - |] - (userId, h, p, kh) +removeRcvServiceAssocs db userId (SMPServer h p kh) = DB.execute db removeRcvAssocsQuery (h, p, userId, kh) + +removeRcvAssocsQuery :: Query +removeRcvAssocsQuery = + [sql| + UPDATE rcv_queues + SET rcv_service_assoc = 0 + FROM connections c, servers s + WHERE rcv_queues.host = ? + AND rcv_queues.port = ? + AND c.conn_id = rcv_queues.conn_id + AND c.user_id = ? + AND s.host = rcv_queues.host + AND s.port = rcv_queues.port + AND COALESCE(rcv_queues.server_key_hash, s.key_hash) = ? + |] removeUserRcvServiceAssocs :: DB.Connection -> UserId -> IO () removeUserRcvServiceAssocs db userId = @@ -2319,11 +2329,8 @@ removeUserRcvServiceAssocs db userId = [sql| UPDATE rcv_queues SET rcv_service_assoc = 0 - WHERE EXISTS ( - SELECT 1 - FROM connections c - WHERE c.conn_id = rcv_queues.conn_id AND c.user_id = ? - ) + FROM connections c + WHERE c.conn_id = rcv_queues.conn_id AND c.user_id = ? |] (Only userId) From 92a9579e6958026e42a45e32c0e62fca243b4b0f Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Thu, 4 Dec 2025 08:58:14 +0000 Subject: [PATCH 08/10] agent: option to add SQLite aggregates to DB connection (#1673) * agent: add build_relations_vector function to sqlite * update aggregate * use static aggregate * remove relations --------- Co-authored-by: Evgeny Poberezkin --- src/Simplex/Messaging/Agent/Store/SQLite.hs | 11 +++-- .../Messaging/Agent/Store/SQLite/Common.hs | 11 +++-- .../Messaging/Agent/Store/SQLite/Util.hs | 48 +++++++++++++++++++ 3 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 0471a5cd7..6cc63c066 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -110,7 +110,6 @@ connectDB path functions key track = do pure db where prepare db = do - let db' = SQL.connectionHandle $ DB.conn db unless (BA.null key) . SQLite3.exec db' $ "PRAGMA key = " <> keyString key <> ";" SQLite3.exec db' . fromQuery $ [sql| @@ -120,9 +119,13 @@ connectDB path functions key track = do PRAGMA secure_delete = ON; PRAGMA auto_vacuum = FULL; |] - forM_ functions $ \SQLiteFuncDef {funcName, argCount, deterministic, funcPtr} -> - createStaticFunction db' funcName argCount deterministic funcPtr - >>= either (throwIO . userError . show) pure + mapM_ addFunction functions + where + db' = SQL.connectionHandle $ DB.conn db + addFunction SQLiteFuncDef {funcName, argCount, funcPtrs} = + either (throwIO . userError . show) pure =<< case funcPtrs of + SQLiteFuncPtr isDet funcPtr -> createStaticFunction db' funcName argCount isDet funcPtr + SQLiteAggrPtrs stepPtr finalPtr -> createStaticAggregate db' funcName argCount stepPtr finalPtr closeDBStore :: DBStore -> IO () closeDBStore st@DBStore {dbClosed} = diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs index 04e724749..aac5ee37e 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Common.hs @@ -7,6 +7,7 @@ module Simplex.Messaging.Agent.Store.SQLite.Common ( DBStore (..), DBOpts (..), SQLiteFuncDef (..), + SQLiteFuncPtrs (..), withConnection, withConnection', withTransaction, @@ -55,14 +56,18 @@ data DBOpts = DBOpts track :: DB.TrackQueries } --- e.g. `SQLiteFuncDef "name" 2 True f` +-- e.g. `SQLiteFuncDef "func_name" 2 (SQLiteFuncPtr True func)` +-- or `SQLiteFuncDef "aggr_name" 3 (SQLiteAggrPtrs step final)` data SQLiteFuncDef = SQLiteFuncDef { funcName :: ByteString, argCount :: CArgCount, - deterministic :: Bool, - funcPtr :: FunPtr SQLiteFunc + funcPtrs :: SQLiteFuncPtrs } +data SQLiteFuncPtrs + = SQLiteFuncPtr {deterministic :: Bool, funcPtr :: FunPtr SQLiteFunc} + | SQLiteAggrPtrs {stepPtr :: FunPtr SQLiteFunc, finalPtr :: FunPtr SQLiteFuncFinal} + withConnectionPriority :: DBStore -> Bool -> (DB.Connection -> IO a) -> IO a withConnectionPriority DBStore {dbSem, dbConnection} priority action | priority = E.bracket_ signal release $ withMVar dbConnection action diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Util.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Util.hs index a3c3b94ac..2cbd7ecff 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Util.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Util.hs @@ -3,16 +3,20 @@ module Simplex.Messaging.Agent.Store.SQLite.Util where import Control.Exception (SomeException, catch, mask_) import Data.ByteString (ByteString) import qualified Data.ByteString as B +import Data.IORef import Database.SQLite3.Direct (Database (..), FuncArgs (..), FuncContext (..)) import Database.SQLite3.Bindings import Foreign.C.String import Foreign.Ptr import Foreign.StablePtr +import Foreign.Storable data CFuncPtrs = CFuncPtrs (FunPtr CFunc) (FunPtr CFunc) (FunPtr CFuncFinal) type SQLiteFunc = Ptr CContext -> CArgCount -> Ptr (Ptr CValue) -> IO () +type SQLiteFuncFinal = Ptr CContext -> IO () + mkSQLiteFunc :: (FuncContext -> FuncArgs -> IO ()) -> SQLiteFunc mkSQLiteFunc f cxt nArgs cvals = catchAsResultError cxt $ f (FuncContext cxt) (FuncArgs nArgs cvals) {-# INLINE mkSQLiteFunc #-} @@ -25,6 +29,50 @@ createStaticFunction (Database db) name nArgs isDet funPtr = mask_ $ do B.useAsCString name $ \namePtr -> toResult () <$> c_sqlite3_create_function_v2 db namePtr nArgs flags (castStablePtrToPtr u) funPtr nullFunPtr nullFunPtr nullFunPtr +mkSQLiteAggStep :: a -> (FuncContext -> FuncArgs -> a -> IO a) -> SQLiteFunc +mkSQLiteAggStep initSt xStep cxt nArgs cvals = catchAsResultError cxt $ do + -- we store the aggregate state in the buffer returned by + -- c_sqlite3_aggregate_context as a StablePtr pointing to an IORef that + -- contains the actual aggregate state + aggCtx <- getAggregateContext cxt + aggStPtr <- peek aggCtx + aggStRef <- + if castStablePtrToPtr aggStPtr /= nullPtr + then deRefStablePtr aggStPtr + else do + aggStRef <- newIORef initSt + aggStPtr' <- newStablePtr aggStRef + poke aggCtx aggStPtr' + return aggStRef + aggSt <- readIORef aggStRef + aggSt' <- xStep (FuncContext cxt) (FuncArgs nArgs cvals) aggSt + writeIORef aggStRef aggSt' + +mkSQLiteAggFinal :: a -> (FuncContext -> a -> IO ()) -> SQLiteFuncFinal +mkSQLiteAggFinal initSt xFinal cxt = do + aggCtx <- getAggregateContext cxt + aggStPtr <- peek aggCtx + if castStablePtrToPtr aggStPtr == nullPtr + then catchAsResultError cxt $ xFinal (FuncContext cxt) initSt + else do + catchAsResultError cxt $ do + aggStRef <- deRefStablePtr aggStPtr + aggSt <- readIORef aggStRef + xFinal (FuncContext cxt) aggSt + freeStablePtr aggStPtr + +getAggregateContext :: Ptr CContext -> IO (Ptr a) +getAggregateContext cxt = c_sqlite3_aggregate_context cxt stPtrSize + where + stPtrSize = fromIntegral $ sizeOf (undefined :: StablePtr ()) + +-- Based on createAggregate from Database.SQLite3.Direct, but uses static function pointers to avoid dynamic wrappers that trigger DCL. +createStaticAggregate :: Database -> ByteString -> CArgCount -> FunPtr SQLiteFunc -> FunPtr SQLiteFuncFinal -> IO (Either Error ()) +createStaticAggregate (Database db) name nArgs stepPtr finalPtr = mask_ $ do + u <- newStablePtr $ CFuncPtrs nullFunPtr stepPtr finalPtr + B.useAsCString name $ \namePtr -> + toResult () <$> c_sqlite3_create_function_v2 db namePtr nArgs 0 (castStablePtrToPtr u) nullFunPtr stepPtr finalPtr nullFunPtr + -- Convert a 'CError' to a 'Either Error', in the common case where -- SQLITE_OK signals success and anything else signals an error. -- From 135e7fe5845f5fdfadd3ec294a35d5f0dd8946a0 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 5 Dec 2025 12:59:46 +0000 Subject: [PATCH 09/10] add test, treat BAD_SERVICE as temp error, only remove queue associations on service errors --- src/Simplex/Messaging/Agent.hs | 11 +++--- src/Simplex/Messaging/Agent/Client.hs | 21 +++++------ src/Simplex/Messaging/Client.hs | 1 - src/Simplex/Messaging/Protocol.hs | 1 + tests/AgentTests/FunctionalAPITests.hs | 49 ++++++++++++++++++++++++-- 5 files changed, 65 insertions(+), 18 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 4d9313f10..f155ce77b 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1479,10 +1479,12 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do withStore' c (\db -> getSubscriptionService db userId srv) >>= \case Just serviceSub -> case M.lookup userId useServices of - -- TODO [certs rcv] improve logic to differentiate between permanent and temporary service subscription errors, - -- as the current logic would fall back to per-queue subscriptions on ANY service subscription error (e.g., network connection error). - Just True -> isRight <$> tryAllErrors (subscribeClientService c True userId srv serviceSub) - _ -> False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs db userId srv) + Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case + Left e | clientServiceError e -> unassocQueues $> False + _ -> pure True + _ -> unassocQueues $> False + where + unassocQueues = withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv _ -> pure False subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int) subscribeUserServer maxPending currPending ((userId, srv), hasService) = do @@ -1498,7 +1500,6 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do where subscribe qs = do rs <- subscribeUserServerQueues c userId srv qs - -- TODO [certs rcv] storeClientServiceAssocs store associations of queues with client service ID ns <- asks ntfSupervisor whenM (liftIO $ hasInstantNotifications ns) $ sendNtfCreate ns rs sendNtfCreate :: NtfSupervisor -> [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> AM' () diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 43cb09443..8a4d38f3a 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -120,6 +120,7 @@ module Simplex.Messaging.Agent.Client getAgentSubscriptions, slowNetworkConfig, protocolClientError, + clientServiceError, Worker (..), SessionVar (..), SubscriptionsInfo (..), @@ -1260,7 +1261,6 @@ protocolClientError protocolError_ host = \case clientServiceError :: AgentErrorType -> Bool clientServiceError = \case BROKER _ NO_SERVICE -> True - BROKER _ (TRANSPORT (TEHandshake BAD_SERVICE)) -> True -- TODO [certs rcv] this error may be temporary, so we should possibly resubscribe. SMP _ SMP.SERVICE -> True SMP _ (SMP.PROXY (SMP.BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen. _ -> False @@ -1566,6 +1566,8 @@ temporaryAgentError :: AgentErrorType -> Bool temporaryAgentError = \case BROKER _ e -> tempBrokerError e SMP _ (SMP.PROXY (SMP.BROKER e)) -> tempBrokerError e + SMP _ (SMP.STORE _) -> True + NTF _ (SMP.STORE _) -> True XFTP _ XFTP.TIMEOUT -> True PROXY _ _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> tempBrokerError e PROXY _ _ (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> True @@ -1576,6 +1578,7 @@ temporaryAgentError = \case tempBrokerError = \case NETWORK _ -> True TIMEOUT -> True + TRANSPORT (TEHandshake BAD_SERVICE) -> True -- this error is considered temporary because it is DB error _ -> False temporaryOrHostError :: AgentErrorType -> Bool @@ -1722,8 +1725,12 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do notifySub' c "" $ ERR e resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult -resubscribeClientService c tSess serviceSub = - withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub +resubscribeClientService c tSess@(userId, srv, _) serviceSub = + withServiceClient c tSess (\smp _ -> subscribeClientService_ c True tSess smp serviceSub) `catchE` \e -> do + when (clientServiceError e) $ do + qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv + void $ lift $ subscribeUserServerQueues c userId srv qs + throwE e -- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult @@ -1737,16 +1744,10 @@ subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) = withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a withServiceClient c tSess@(userId, srv, _) subscribe = - unassocOnError $ withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) -> + withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) -> case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of Just smpServiceId -> subscribe smp smpServiceId Nothing -> throwE PCEServiceUnavailable - where - unassocOnError a = a `catchE` \e -> do - when (clientServiceError e) $ do - qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv - void $ lift $ subscribeUserServerQueues c userId srv qs - throwE e -- TODO [certs rcv] send subscription error event? subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 7284101f3..ac2dc9a9d 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -782,7 +782,6 @@ temporaryClientError = \case smpClientServiceError :: SMPClientError -> Bool smpClientServiceError = \case PCEServiceUnavailable -> True - PCETransportError (TEHandshake BAD_SERVICE) -> True -- TODO [certs rcv] this error may be temporary, so we should possibly resubscribe. PCEProtocolError SERVICE -> True PCEProtocolError (PROXY (BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen. _ -> False diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index a5f94960e..6b232f12b 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -143,6 +143,7 @@ module Simplex.Messaging.Protocol IdsHash (..), ServiceSub (..), ServiceSubResult (..), + ServiceSubError (..), serviceSubResult, queueIdsHash, queueIdHash, diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index f9cfbb577..31967917a 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -66,7 +66,7 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (isRight) import Data.Int (Int64) -import Data.List (find, isSuffixOf, nub) +import Data.List (find, isPrefixOf, isSuffixOf, nub) import Data.List.NonEmpty (NonEmpty) import qualified Data.Map as M import Data.Maybe (isJust, isNothing) @@ -113,7 +113,7 @@ import Simplex.Messaging.Util (bshow, diffToMicroseconds) import Simplex.Messaging.Version (VersionRange (..)) import qualified Simplex.Messaging.Version as V import Simplex.Messaging.Version.Internal (Version (..)) -import System.Directory (copyFile, renameFile) +import System.Directory (copyFile, removeFile, renameFile) import Test.Hspec hiding (fit, it) import UnliftIO import Util @@ -124,10 +124,13 @@ import Fixtures #endif #if defined(dbServerPostgres) import qualified Database.PostgreSQL.Simple as PSQL +import qualified Simplex.Messaging.Agent.Store.Postgres as Postgres +import qualified Simplex.Messaging.Agent.Store.Postgres.Common as Postgres import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue) import Simplex.Messaging.Server.MsgStore.Postgres (PostgresQueue) import Simplex.Messaging.Server.MsgStore.Types (QSType (..)) import Simplex.Messaging.Server.QueueStore.Postgres +import Simplex.Messaging.Server.QueueStore.Postgres.Migrations import Simplex.Messaging.Server.QueueStore.Types (QueueStoreClass (..)) #endif @@ -476,6 +479,7 @@ functionalAPITests ps = do withSmpServer ps testTwoUsers describe "Client service certificates" $ do it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps + it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps describe "Connection switch" $ do describe "should switch delivery to the new queue" $ testServerMatrix2 ps testSwitchConnection @@ -3715,6 +3719,47 @@ testClientServiceConnection ps = do exchangeGreetingsMsgId 4 user sId' service uId' exchangeGreetingsMsgId 10 service uId user sId +testClientServiceIDChange :: HasCallStack => (ASrvTransport, AStoreType) -> IO () +testClientServiceIDChange ps@(_, ASType qs _) = do + (sId, uId) <- withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + conns@(sId, uId) <- makeConnection service user + exchangeGreetings service uId user sId + pure conns + _ :: () <- case qs of + SQSPostgres -> do +#if defined(dbServerPostgres) + st <- either (error . show) pure =<< Postgres.createDBStore testStoreDBOpts serverMigrations (MigrationConfig MCError Nothing) + void $ Postgres.withTransaction st (`PSQL.execute_` "DELETE FROM services") +#else + pure () +#endif + SQSMemory -> do + s <- readFile testStoreLogFile + removeFile testStoreLogFile + writeFile testStoreLogFile $ unlines $ filter (not . ("NEW_SERVICE" `isPrefixOf`)) $ lines s + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + subscribeAllConnections service False Nothing + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 1 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False, + \case ("", "", AEvt SAENone (UP _ _)) -> True; _ -> False + ] + subscribeAllConnections user False Nothing + ("", "", UP _ [_]) <- nGet user + exchangeGreetingsMsgId 4 service uId user sId + -- disable service in the client + -- The test uses True for non-existing user to make sure it's removed for user 1, + -- because if no users use services, then it won't be checking them to optimize for most clients. + withAgentClientsServers2 (agentCfg, initAgentServers {useServices = M.fromList [(100, True)]}) (agentCfg, initAgentServers) $ \notService user -> do + withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + subscribeAllConnections notService False Nothing + ("", "", UP _ [_]) <- nGet notService + subscribeAllConnections user False Nothing + ("", "", UP _ [_]) <- nGet user + exchangeGreetingsMsgId 6 notService uId user sId + getSMPAgentClient' :: Int -> AgentConfig -> InitialAgentServers -> String -> IO AgentClient getSMPAgentClient' clientId cfg' initServers dbPath = do Right st <- liftIO $ createStore dbPath From d5a802a61357c1913fa4f0ef8eff092beff710cf Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 5 Dec 2025 17:52:04 +0000 Subject: [PATCH 10/10] add packZipWith for backward compatibility with GHC 8.10.7 --- src/Simplex/Messaging/Agent/Client.hs | 2 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 5 ++-- src/Simplex/Messaging/Util.hs | 26 +++++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 8a4d38f3a..7acfb0b49 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1743,7 +1743,7 @@ subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) = tSess = (userId, srv, Nothing) withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a -withServiceClient c tSess@(userId, srv, _) subscribe = +withServiceClient c tSess subscribe = withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) -> case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of Just smpServiceId -> subscribe smp smpServiceId diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 3585b7d63..a670dd3e2 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -70,7 +70,7 @@ import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import Simplex.Messaging.Agent.Store.SQLite.Util import Simplex.Messaging.Agent.Store.Shared (Migration (..), MigrationConfig (..), MigrationError (..)) import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Util (ifM, safeDecodeUtf8) +import Simplex.Messaging.Util (ifM, packZipWith, safeDecodeUtf8) import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist) import System.FilePath (takeDirectory, takeFileName, ()) @@ -146,7 +146,8 @@ sqliteXorMd5Combine = mkSQLiteFunc $ \cxt args -> do SQLite3.funcResultBlob cxt $ xorMd5Combine idsHash rId xorMd5Combine :: ByteString -> ByteString -> ByteString -xorMd5Combine idsHash rId = B.packZipWith xor idsHash $ C.md5Hash rId +xorMd5Combine idsHash rId = packZipWith xor idsHash $ C.md5Hash rId +{-# INLINE xorMd5Combine #-} closeDBStore :: DBStore -> IO () closeDBStore st@DBStore {dbClosed} = diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index e9f37b1ae..83a911452 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE MonadComprehensions #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -15,6 +16,7 @@ import qualified Data.Aeson as J import Data.Bifunctor (first, second) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.ByteString.Internal (toForeignPtr, unsafeCreate) import qualified Data.ByteString.Lazy.Char8 as LB import Data.IORef import Data.Int (Int64) @@ -29,6 +31,9 @@ import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8With, encodeUtf8) import Data.Time (NominalDiffTime) import Data.Tuple (swap) +import Data.Word (Word8) +import Foreign.ForeignPtr (withForeignPtr) +import Foreign.Storable (peekByteOff, pokeByteOff) import GHC.Conc (labelThread, myThreadId, threadDelay) import UnliftIO hiding (atomicModifyIORef') import qualified UnliftIO.Exception as UE @@ -156,6 +161,27 @@ mapAccumLM_NonEmpty mapAccumLM_NonEmpty f s (x :| xs) = [(s2, x' :| xs') | (s1, x') <- f s x, (s2, xs') <- mapAccumLM_List f s1 xs] +-- | Optimized from bytestring package for GHC 8.10.7 compatibility +packZipWith :: (Word8 -> Word8 -> Word8) -> ByteString -> ByteString -> ByteString +packZipWith f s1 s2 = + unsafeCreate len $ \r -> + withForeignPtr fp1 $ \p1 -> + withForeignPtr fp2 $ \p2 -> zipWith_ p1 p2 r + where + zipWith_ p1 p2 r = go 0 + where + go :: Int -> IO () + go !n + | n >= len = pure () + | otherwise = do + x <- peekByteOff p1 (off1 + n) + y <- peekByteOff p2 (off2 + n) + pokeByteOff r n (f x y) + go (n + 1) + (fp1, off1, l1) = toForeignPtr s1 + (fp2, off2, l2) = toForeignPtr s2 + len = min l1 l2 + tryWriteTBQueue :: TBQueue a -> a -> STM Bool tryWriteTBQueue q a = do full <- isFullTBQueue q