diff --git a/simplexmq.cabal b/simplexmq.cabal index 7cee47e0d..cbdc7d66d 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -384,6 +384,7 @@ test-suite simplexmq-test AgentTests.MigrationTests AgentTests.NotificationTests AgentTests.SchemaDump + AgentTests.ServerChoice AgentTests.SQLiteTests CLITests CoreTests.BatchingTests diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index aabe3ff28..9f9525154 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -461,14 +461,14 @@ runXFTPSndPrepareWorker c Worker {doWork} = do pure srv where tryCreate = do - usedSrvs <- newTVarIO ([] :: [XFTPServer]) + triedHosts <- newTVarIO S.empty let AgentClient {xftpServers} = c userSrvCount <- liftIO $ length <$> TM.lookupIO userId xftpServers withRetryIntervalCount (riFast ri) $ \n _ loop -> do liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c let triedAllSrvs = n > userSrvCount - createWithNextSrv usedSrvs + createWithNextSrv triedHosts `catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop triedAllSrvs e) (throwE e) e where -- we don't do closeXFTPServerClient here to not risk closing connection for concurrent chunk upload @@ -477,10 +477,10 @@ runXFTPSndPrepareWorker c Worker {doWork} = do when (triedAllSrvs && serverHostError e) $ notify c sndFileEntityId $ SFWARN e liftIO $ assertAgentForeground c loop - createWithNextSrv usedSrvs = do + createWithNextSrv triedHosts = do deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId when deleted $ throwE $ FILE NO_FILE - withNextSrv c userId usedSrvs [] $ \srvAuth -> do + withNextSrv c userId storageSrvs triedHosts [] $ \srvAuth -> do replica <- agentXFTPNewChunk c ch numRecipients' srvAuth pure (replica, srvAuth) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index dc6f4ad17..94db36e28 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -120,6 +120,8 @@ module Simplex.Messaging.Agent debugAgentLocks, getAgentSubscriptions, logConnection, + -- for tests + withAgentEnv, ) where @@ -815,11 +817,13 @@ newConnToAccept c connId enableNtfs invId pqSup = do joinConn :: AgentClient -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM SndQueueSecured joinConn c userId connId enableNtfs cReq cInfo pqSupport subMode = do - srv <- case cReq of - CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _ -> - getNextServer c userId [qServer q] - _ -> getSMPServer c userId + srv <- getNextSMPServer c userId [qServer cReqQueue] joinConnSrv c userId connId enableNtfs cReq cInfo pqSupport subMode srv + where + cReqQueue :: SMPQueueUri + cReqQueue = case cReq of + CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _ -> q + CRContactUri ConnReqUriData {crSmpQueues = q :| _} -> q startJoinInvitation :: AgentClient -> UserId -> ConnId -> Maybe SndQueue -> Bool -> ConnectionRequestUri 'CMInvitation -> PQSupport -> AM (ConnData, SndQueue, CR.SndE2ERatchetParams 'C.X448) startJoinInvitation c userId connId sq_ enableNtfs cReqUri pqSup = @@ -1194,14 +1198,13 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do processCmd ri PendingCommand {cmdId, corrId, userId, command} pendingCmds = case command of AClientCommand cmd -> case cmd of NEW enableNtfs (ACM cMode) pqEnc subMode -> noServer $ do - usedSrvs <- newTVarIO ([] :: [SMPServer]) - tryCommand . withNextSrv c userId usedSrvs [] $ \srv -> do + triedHosts <- newTVarIO S.empty + tryCommand . withNextSrv c userId storageSrvs triedHosts [] $ \srv -> do cReq <- newRcvConnSrv c userId connId enableNtfs cMode Nothing pqEnc subMode srv notify $ INV (ACR cMode cReq) JOIN enableNtfs (ACR _ cReq@(CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _)) pqEnc subMode connInfo -> noServer $ do - let initUsed = [qServer q] - usedSrvs <- newTVarIO initUsed - tryCommand . withNextSrv c userId usedSrvs initUsed $ \srv -> do + triedHosts <- newTVarIO S.empty + tryCommand . withNextSrv c userId storageSrvs triedHosts [qServer q] $ \srv -> do sqSecured <- joinConnSrvAsync c userId connId enableNtfs cReq connInfo pqEnc subMode srv notify $ JOINED sqSecured LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK @@ -1649,8 +1652,8 @@ switchDuplexConnection c (DuplexConnection cData@ConnData {connId, userId} rqs s checkRQSwchStatus rq RSSwitchStarted clientVRange <- asks $ smpClientVRange . config -- try to get the server that is different from all queues, or at least from the primary rcv queue - srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId $ map qServer (L.toList rqs) <> map qServer (L.toList sqs) - srv' <- if srv == server then getNextServer c userId [server] else pure srvAuth + srvAuth@(ProtoServerWithAuth srv _) <- getNextSMPServer c userId $ map qServer (L.toList rqs) <> map qServer (L.toList sqs) + srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth (q, qUri, tSess, sessId) <- newRcvQueue c userId connId srv' clientVRange SMSubscribe False let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId} rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' @@ -2158,9 +2161,13 @@ debugAgentLocks AgentClient {connLocks = cs, invLocks = is, deleteLock = d} = do getLocks ls = atomically $ M.mapKeys (B.unpack . strEncode) . M.mapMaybe id <$> (mapM tryReadTMVar =<< readTVar ls) getSMPServer :: AgentClient -> UserId -> AM SMPServerWithAuth -getSMPServer c userId = withUserServers c userId pickServer +getSMPServer c userId = getNextSMPServer c userId [] {-# INLINE getSMPServer #-} +getNextSMPServer :: AgentClient -> UserId -> [SMPServer] -> AM SMPServerWithAuth +getNextSMPServer c userId = getNextServer c userId storageSrvs +{-# INLINE getNextSMPServer #-} + subscriber :: AgentClient -> AM' () subscriber c@AgentClient {msgQ} = forever $ do t <- atomically $ readTBQueue msgQ diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 33a3c2a10..af241c2f4 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -149,7 +149,6 @@ module Simplex.Messaging.Agent.Client userServers, pickServer, getNextServer, - withUserServers, withNextSrv, incSMPServerStat, incSMPServerStat', @@ -193,12 +192,12 @@ import qualified Data.ByteString.Char8 as B import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (deleteFirstsBy, find, foldl', partition, (\\)) +import Data.List (find, foldl', partition) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, listToMaybe) +import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, listToMaybe, mapMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) @@ -264,7 +263,6 @@ import Simplex.Messaging.Protocol VersionSMPC, XFTPServer, XFTPServerWithAuth, - sameSrvAddr', pattern NoEntity, ) import qualified Simplex.Messaging.Protocol as SMP @@ -619,7 +617,7 @@ getSMPServerClient c@AgentClient {active, smpClients, workerSeq} tSess = do getSMPProxyClient :: AgentClient -> Maybe SMPServerWithAuth -> SMPTransportSession -> AM (SMPConnectedClient, Either AgentErrorType ProxiedRelay) getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq} proxySrv_ destSess@(userId, destSrv, qId) = do unlessM (readTVarIO active) $ throwE INACTIVE - proxySrv <- maybe (getNextServer c userId [destSrv]) pure proxySrv_ + proxySrv <- maybe (getNextServer c userId proxySrvs [destSrv]) pure proxySrv_ ts <- liftIO getCurrentTime atomically (getClientVar proxySrv ts) >>= \(tSess, auth, v) -> either (newProxyClient tSess auth ts) (waitForProxyClient tSess auth) v @@ -1074,7 +1072,7 @@ sendOrProxySMPCommand :: (SMPClient -> ProxiedRelay -> ExceptT SMPClientError IO (Either ProxyClientError ())) -> (SMPClient -> ExceptT SMPClientError IO ()) -> AM (Maybe SMPServer) -sendOrProxySMPCommand c userId destSrv connId cmdStr senderId sendCmdViaProxy sendCmdDirectly = do +sendOrProxySMPCommand c userId destSrv@ProtocolServer {host = destHosts} connId cmdStr senderId sendCmdViaProxy sendCmdDirectly = do tSess <- mkTransportSession c userId destSrv connId ifM shouldUseProxy (sendViaProxy Nothing tSess) (sendDirectly tSess $> Nothing) where @@ -1093,7 +1091,7 @@ sendOrProxySMPCommand c userId destSrv connId cmdStr senderId sendCmdViaProxy se SPFAllow -> True SPFAllowProtected -> ipAddressProtected cfg destSrv SPFProhibit -> False - unknownServer = liftIO $ maybe True (notElem destSrv . knownSrvs) <$> TM.lookupIO userId (smpServers c) + unknownServer = liftIO $ maybe True (\srvs -> all (`S.notMember` knownHosts srvs) destHosts) <$> TM.lookupIO userId (smpServers c) sendViaProxy :: Maybe SMPServerWithAuth -> SMPTransportSession -> AM (Maybe SMPServer) sendViaProxy proxySrv_ destSess@(_, _, connId_) = do r <- tryAgentError . withProxySession c proxySrv_ destSess senderId ("PFWD " <> cmdStr) $ \(SMPConnectedClient smp _, proxySess@ProxiedRelay {prBasicAuth}) -> do @@ -2036,33 +2034,82 @@ userServers c = case protocolTypeI @p of SPXFTP -> xftpServers c {-# INLINE userServers #-} -pickServer :: forall p. NonEmpty (ProtoServerWithAuth p) -> AM (ProtoServerWithAuth p) +pickServer :: NonEmpty (Maybe OperatorId, ProtoServerWithAuth p) -> AM (ProtoServerWithAuth p) pickServer = \case - srv :| [] -> pure srv + (_, srv) :| [] -> pure srv servers -> do gen <- asks randomServer - atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1)) + atomically $ snd . (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1)) -getNextServer :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> UserId -> [ProtocolServer p] -> AM (ProtoServerWithAuth p) -getNextServer c userId usedSrvs = withUserServers c userId $ \srvs -> - case L.nonEmpty $ deleteFirstsBy sameSrvAddr' (L.toList srvs) (map noAuthSrv usedSrvs) of - Just srvs' -> pickServer srvs' - _ -> pickServer srvs +getNextServer :: + (ProtocolTypeI p, UserProtocol p) => + AgentClient -> + UserId -> + (UserServers p -> NonEmpty (Maybe OperatorId, ProtoServerWithAuth p)) -> + [ProtocolServer p] -> + AM (ProtoServerWithAuth p) +getNextServer c userId srvsSel usedSrvs = do + srvs <- getUserServers_ c userId srvsSel + snd <$> getNextServer_ srvs (usedOperatorsHosts srvs usedSrvs) + +usedOperatorsHosts :: NonEmpty (Maybe OperatorId, ProtoServerWithAuth p) -> [ProtocolServer p] -> (Set (Maybe OperatorId), Set TransportHost) +usedOperatorsHosts srvs usedSrvs = (usedOperators, usedHosts) + where + usedHosts = S.unions $ map serverHosts usedSrvs + usedOperators = S.fromList $ mapMaybe usedOp $ L.toList srvs + usedOp (op, srv) = if hasUsedHost srv then Just op else Nothing + hasUsedHost (ProtoServerWithAuth srv _) = any (`S.member` usedHosts) $ serverHosts srv + +getNextServer_ :: + (ProtocolTypeI p, UserProtocol p) => + NonEmpty (Maybe OperatorId, ProtoServerWithAuth p) -> + (Set (Maybe OperatorId), Set TransportHost) -> + AM (NonEmpty (Maybe OperatorId, ProtoServerWithAuth p), ProtoServerWithAuth p) +getNextServer_ servers (usedOperators, usedHosts) = do + -- choose from servers of unused operators, when possible + let otherOpsSrvs = filterOrAll ((`S.notMember` usedOperators) . fst) servers + -- choose from servers with unused hosts when possible + unusedSrvs = filterOrAll (isUnusedServer usedHosts) otherOpsSrvs + (otherOpsSrvs,) <$> pickServer unusedSrvs + where + filterOrAll p srvs = fromMaybe srvs $ L.nonEmpty $ L.filter p srvs + +isUnusedServer :: Set TransportHost -> (Maybe OperatorId, ProtoServerWithAuth p) -> Bool +isUnusedServer usedHosts (_, ProtoServerWithAuth ProtocolServer {host} _) = all (`S.notMember` usedHosts) host -withUserServers :: forall p a. (ProtocolTypeI p, UserProtocol p) => AgentClient -> UserId -> (NonEmpty (ProtoServerWithAuth p) -> AM a) -> AM a -withUserServers c userId action = +getUserServers_ :: + (ProtocolTypeI p, UserProtocol p) => + AgentClient -> + UserId -> + (UserServers p -> NonEmpty (Maybe OperatorId, ProtoServerWithAuth p)) -> + AM (NonEmpty (Maybe OperatorId, ProtoServerWithAuth p)) +getUserServers_ c userId srvsSel = liftIO (TM.lookupIO userId $ userServers c) >>= \case - Just srvs -> action $ enabledSrvs srvs + Just srvs -> pure $ srvsSel srvs _ -> throwE $ INTERNAL "unknown userId - no user servers" -withNextSrv :: forall p a. (ProtocolTypeI p, UserProtocol p) => AgentClient -> UserId -> TVar [ProtocolServer p] -> [ProtocolServer p] -> (ProtoServerWithAuth p -> AM a) -> AM a -withNextSrv c userId usedSrvs initUsed action = do - used <- readTVarIO usedSrvs - srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId used - srvs_ <- liftIO $ TM.lookupIO userId $ userServers c - let unused = maybe [] ((\\ used) . map protoServer . L.toList . enabledSrvs) srvs_ - used' = if null unused then initUsed else srv : used - atomically $ writeTVar usedSrvs $! used' +-- This function checks used servers and operators every time to allow +-- changing configuration while retry look is executing. +-- This function is not thread safe. +withNextSrv :: + (ProtocolTypeI p, UserProtocol p) => + AgentClient -> + UserId -> + (UserServers p -> NonEmpty (Maybe OperatorId, ProtoServerWithAuth p)) -> + TVar (Set TransportHost) -> + [ProtocolServer p] -> + (ProtoServerWithAuth p -> AM a) -> + AM a +withNextSrv c userId srvsSel triedHosts usedSrvs action = do + srvs <- getUserServers_ c userId srvsSel + let (usedOperators, usedHosts) = usedOperatorsHosts srvs usedSrvs + tried <- readTVarIO triedHosts + let triedOrUsed = S.union tried usedHosts + (otherOpsSrvs, srvAuth@(ProtoServerWithAuth srv _)) <- getNextServer_ srvs (usedOperators, triedOrUsed) + let newHosts = serverHosts srv + unusedSrvs = L.filter (isUnusedServer $ S.union triedOrUsed newHosts) otherOpsSrvs + !tried' = if null unusedSrvs then S.empty else S.union tried newHosts + atomically $ writeTVar triedHosts tried' action srvAuth incSMPServerStat :: AgentClient -> UserId -> SMPServer -> (AgentSMPServerStats -> TVar Int) -> STM () diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index c08ccef63..a78fb428e 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -17,11 +17,14 @@ module Simplex.Messaging.Agent.Env.SQLite AgentConfig (..), InitialAgentServers (..), ServerCfg (..), + ServerRoles (..), + OperatorId, UserServers (..), NetworkConfig (..), presetServerCfg, - enabledServerCfg, + allRoles, mkUserServers, + serverHosts, defaultAgentConfig, defaultReconnectInterval, tryAgentError, @@ -55,6 +58,8 @@ import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import Data.Maybe (fromMaybe) +import Data.Set (Set) +import qualified Data.Set as S import Data.Time.Clock (NominalDiffTime, nominalDay) import Data.Time.Clock.System (SystemTime (..)) import Data.Word (Word16) @@ -72,10 +77,11 @@ import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig) import Simplex.Messaging.Notifications.Transport (NTFVersion) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (defaultJSON) -import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth, ProtocolServer, ProtocolType (..), ProtocolTypeI, VersionRangeSMPC, XFTPServer, supportedSMPClientVRange) +import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth (..), ProtocolServer (..), ProtocolType (..), ProtocolTypeI, VersionRangeSMPC, XFTPServer, supportedSMPClientVRange) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPVersion) +import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors') import System.Mem.Weak (Weak) import System.Random (StdGen, newStdGen) @@ -94,29 +100,42 @@ data InitialAgentServers = InitialAgentServers data ServerCfg p = ServerCfg { server :: ProtoServerWithAuth p, - preset :: Bool, - tested :: Maybe Bool, - enabled :: Bool + operator :: Maybe OperatorId, + enabled :: Bool, + roles :: ServerRoles } deriving (Show) -enabledServerCfg :: ProtoServerWithAuth p -> ServerCfg p -enabledServerCfg server = ServerCfg {server, preset = False, tested = Nothing, enabled = True} +data ServerRoles = ServerRoles + { storage :: Bool, + proxy :: Bool + } + deriving (Show) + +allRoles :: ServerRoles +allRoles = ServerRoles True True -presetServerCfg :: Bool -> ProtoServerWithAuth p -> ServerCfg p -presetServerCfg enabled server = ServerCfg {server, preset = True, tested = Nothing, enabled} +presetServerCfg :: Bool -> ServerRoles -> Maybe OperatorId -> ProtoServerWithAuth p -> ServerCfg p +presetServerCfg enabled roles operator server = + ServerCfg {server, operator, enabled, roles} data UserServers p = UserServers - { enabledSrvs :: NonEmpty (ProtoServerWithAuth p), - knownSrvs :: NonEmpty (ProtocolServer p) + { storageSrvs :: NonEmpty (Maybe OperatorId, ProtoServerWithAuth p), + proxySrvs :: NonEmpty (Maybe OperatorId, ProtoServerWithAuth p), + knownHosts :: Set TransportHost } +type OperatorId = Int64 + -- This function sets all servers as enabled in case all passed servers are disabled. mkUserServers :: NonEmpty (ServerCfg p) -> UserServers p -mkUserServers srvs = UserServers {enabledSrvs, knownSrvs} +mkUserServers srvs = UserServers {storageSrvs = filterSrvs storage, proxySrvs = filterSrvs proxy, knownHosts} where - enabledSrvs = L.map (\ServerCfg {server} -> server) $ fromMaybe srvs $ L.nonEmpty $ L.filter (\ServerCfg {enabled} -> enabled) srvs - knownSrvs = L.map (\ServerCfg {server = ProtoServerWithAuth srv _} -> srv) srvs + filterSrvs role = L.map (\ServerCfg {operator, server} -> (operator, server)) $ fromMaybe srvs $ L.nonEmpty $ L.filter (\ServerCfg {enabled, roles} -> enabled && role roles) srvs + knownHosts = S.unions $ L.map (\ServerCfg {server = ProtoServerWithAuth srv _} -> serverHosts srv) srvs + +serverHosts :: ProtocolServer p -> Set TransportHost +serverHosts ProtocolServer {host} = S.fromList $ L.toList host data AgentConfig = AgentConfig { tcpPort :: Maybe ServiceName, @@ -337,6 +356,8 @@ updateRestartCount t (RestartCount minute count) = do $(pure []) +$(JQ.deriveJSON defaultJSON ''ServerRoles) + instance ProtocolTypeI p => ToJSON (ServerCfg p) where toEncoding = $(JQ.mkToEncoding defaultJSON ''ServerCfg) toJSON = $(JQ.mkToJSON defaultJSON ''ServerCfg) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 51b35d0a3..988639f5c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1821,9 +1821,9 @@ importMessages tty ms f old_ = do mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota) where -- if the first message in queue head is "quota", remove it. - mergeQuotaMsgs = withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \(mq, msg) -> - case msg of - MessageQuota {} -> tryDeleteMsg_ q mq False + mergeQuotaMsgs = + withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \case + (mq, MessageQuota {}) -> tryDeleteMsg_ q mq False _ -> pure () msgErr :: Show e => String -> e -> String msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s) diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index c9e11f296..56a7fef1f 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -16,6 +16,7 @@ import AgentTests.FunctionalAPITests (functionalAPITests) import AgentTests.MigrationTests (migrationTests) import AgentTests.NotificationTests (notificationTests) import AgentTests.SQLiteTests (storeTests) +import AgentTests.ServerChoice (serverChoiceTests) import Simplex.Messaging.Transport (ATransport (..)) import Test.Hspec @@ -26,4 +27,5 @@ agentTests (ATransport t) = do describe "Functional API" $ functionalAPITests (ATransport t) describe "Notification tests" $ notificationTests (ATransport t) describe "SQLite store" storeTests + describe "Chosen servers" serverChoiceTests describe "Migration tests" migrationTests diff --git a/tests/AgentTests/ServerChoice.hs b/tests/AgentTests/ServerChoice.hs new file mode 100644 index 000000000..0df995d08 --- /dev/null +++ b/tests/AgentTests/ServerChoice.hs @@ -0,0 +1,85 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedLists #-} +{-# LANGUAGE OverloadedStrings #-} + +module AgentTests.ServerChoice where + +import AgentTests.FunctionalAPITests +import Control.Monad.IO.Class +import Data.List.NonEmpty (NonEmpty (..)) +import qualified Data.Map.Strict as M +import SMPAgentClient +import Simplex.Messaging.Agent (withAgentEnv) +import Simplex.Messaging.Agent.Client hiding (userServers) +import Simplex.Messaging.Agent.Env.SQLite +import Simplex.Messaging.Client (defaultNetworkConfig) +import Simplex.Messaging.Protocol +import Test.Hspec +import Test.QuickCheck +import XFTPClient (testXFTPServer) + +serverChoiceTests :: Spec +serverChoiceTests = do + describe "Server operators" $ do + it "should choose server of different operator" $ ioProperty $ testChooseDifferentOperator + +operatorSimpleX :: Maybe OperatorId +operatorSimpleX = Just 1 + +operator2 :: Maybe OperatorId +operator2 = Just 2 + +testOp1Srv1 :: ProtoServerWithAuth 'PSMP +testOp1Srv1 = "smp://LcJU@test1.simplex.im" + +testOp1Srv2 :: ProtoServerWithAuth 'PSMP +testOp1Srv2 = "smp://LcJU@test2.simplex.im" + +testOp2Srv1 :: ProtoServerWithAuth 'PSMP +testOp2Srv1 = "smp://LcJU@srv1.example.com" + +testOp2Srv2 :: ProtoServerWithAuth 'PSMP +testOp2Srv2 = "smp://LcJU@srv2.example.com" + +testSMPServers :: NonEmpty (ServerCfg 'PSMP) +testSMPServers = + [ presetServerCfg True allRoles operatorSimpleX testOp1Srv1, + presetServerCfg True allRoles operatorSimpleX testOp1Srv2, + presetServerCfg True proxyOnly operator2 testOp2Srv1, + presetServerCfg True proxyOnly operator2 testOp2Srv2 + ] + +storageOnly :: ServerRoles +storageOnly = ServerRoles {storage = True, proxy = False} + +proxyOnly :: ServerRoles +proxyOnly = ServerRoles {storage = False, proxy = True} + +initServers :: InitialAgentServers +initServers = + InitialAgentServers + { smp = M.fromList [(1, testSMPServers)], + ntf = [testNtfServer], + xftp = userServers [testXFTPServer], + netCfg = defaultNetworkConfig + } + +testChooseDifferentOperator :: IO () +testChooseDifferentOperator = do + c <- getSMPAgentClient' 1 agentCfg initServers testDB + runRight_ $ do + -- chooses the only operator with storage role + srv1 <- withAgentEnv c $ getNextServer c 1 storageSrvs [] + liftIO $ srv1 == testOp1Srv1 || srv1 == testOp1Srv2 `shouldBe` True + -- chooses another server for storage + srv2 <- withAgentEnv c $ getNextServer c 1 storageSrvs [protoServer testOp1Srv1] + liftIO $ srv2 `shouldBe` testOp1Srv2 + -- chooses another operator for proxy + srv3 <- withAgentEnv c $ getNextServer c 1 proxySrvs [protoServer srv1] + liftIO $ srv3 == testOp2Srv1 || srv3 == testOp2Srv2 `shouldBe` True + -- chooses another operator for proxy + srv3' <- withAgentEnv c $ getNextServer c 1 proxySrvs [protoServer testOp1Srv1, protoServer testOp1Srv2] + liftIO $ srv3' == testOp2Srv1 || srv3' == testOp2Srv2 `shouldBe` True + -- chooses any other server + srv4 <- withAgentEnv c $ getNextServer c 1 proxySrvs [protoServer testOp1Srv1, protoServer testOp2Srv1] + liftIO $ srv4 == testOp1Srv2 || srv4 == testOp2Srv2 `shouldBe` True diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index 0bb050cbe..5e5f91b09 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -38,7 +38,7 @@ testSMPServer :: SMPServer testSMPServer = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001" testSMPServer2 :: SMPServer -testSMPServer2 = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5002" +testSMPServer2 = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@127.0.0.1:5002" testNtfServer :: NtfServer testNtfServer = "ntf://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:6001" @@ -92,7 +92,7 @@ userServers :: NonEmpty (ProtocolServer p) -> Map UserId (NonEmpty (ServerCfg p) userServers = userServers' . L.map noAuthSrv userServers' :: NonEmpty (ProtoServerWithAuth p) -> Map UserId (NonEmpty (ServerCfg p)) -userServers' srvs = M.fromList [(1, L.map (presetServerCfg True) srvs)] +userServers' srvs = M.fromList [(1, L.map (presetServerCfg True (ServerRoles True True) (Just 1)) srvs)] noAuthSrvCfg :: ProtocolServer p -> ServerCfg p -noAuthSrvCfg = presetServerCfg True . noAuthSrv +noAuthSrvCfg = presetServerCfg True (ServerRoles True True) (Just 1) . noAuthSrv diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 423b81074..d658c30a6 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -43,6 +43,9 @@ import Util testHost :: NonEmpty TransportHost testHost = "localhost" +testHost2 :: NonEmpty TransportHost +testHost2 = "127.0.0.1" + testPort :: ServiceName testPort = "5001" diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index b1bf469a3..b827edda2 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -61,7 +61,7 @@ smpProxyTests = do xit "batching proxy requests" todo describe "deliver message via SMP proxy" $ do let srv1 = SMPServer testHost testPort testKeyHash - srv2 = SMPServer testHost testPort2 testKeyHash + srv2 = SMPServer testHost2 testPort2 testKeyHash describe "client API" $ do let maxLen = maxMessageLength encryptedBlockSMPVersion describe "one server" $ do @@ -316,7 +316,7 @@ agentViaProxyVersionError :: IO () agentViaProxyVersionError = withAgent 1 agentCfg (servers [SMPServer testHost testPort testKeyHash]) testDB $ \alice -> do Left (A.BROKER _ (TRANSPORT TEVersion)) <- - withAgent 2 agentCfg (servers [SMPServer testHost testPort2 testKeyHash]) testDB2 $ \bob -> runExceptT $ do + withAgent 2 agentCfg (servers [SMPServer testHost2 testPort2 testKeyHash]) testDB2 $ \bob -> runExceptT $ do (_bobId, qInfo) <- A.createConnection alice 1 True SCMInvitation Nothing (CR.IKNoPQ PQSupportOn) SMSubscribe aliceId <- A.prepareConnectionToJoin bob 1 True qInfo PQSupportOn A.joinConnection bob 1 aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe