@@ -149,7 +149,6 @@ module Simplex.Messaging.Agent.Client
149149 userServers ,
150150 pickServer ,
151151 getNextServer ,
152- withUserServers ,
153152 withNextSrv ,
154153 incSMPServerStat ,
155154 incSMPServerStat' ,
@@ -193,12 +192,12 @@ import qualified Data.ByteString.Char8 as B
193192import Data.Either (isRight , partitionEithers )
194193import Data.Functor (($>) )
195194import Data.Int (Int64 )
196- import Data.List (deleteFirstsBy , find , foldl' , partition , (\\) )
195+ import Data.List (find , foldl' , partition )
197196import Data.List.NonEmpty (NonEmpty (.. ), (<|) )
198197import qualified Data.List.NonEmpty as L
199198import Data.Map.Strict (Map )
200199import qualified Data.Map.Strict as M
201- import Data.Maybe (catMaybes , fromMaybe , isJust , isNothing , listToMaybe )
200+ import Data.Maybe (catMaybes , fromMaybe , isJust , isNothing , listToMaybe , mapMaybe )
202201import Data.Set (Set )
203202import qualified Data.Set as S
204203import Data.Text (Text )
@@ -264,7 +263,6 @@ import Simplex.Messaging.Protocol
264263 VersionSMPC ,
265264 XFTPServer ,
266265 XFTPServerWithAuth ,
267- sameSrvAddr' ,
268266 pattern NoEntity ,
269267 )
270268import qualified Simplex.Messaging.Protocol as SMP
@@ -619,7 +617,7 @@ getSMPServerClient c@AgentClient {active, smpClients, workerSeq} tSess = do
619617getSMPProxyClient :: AgentClient -> Maybe SMPServerWithAuth -> SMPTransportSession -> AM (SMPConnectedClient , Either AgentErrorType ProxiedRelay )
620618getSMPProxyClient c@ AgentClient {active, smpClients, smpProxiedRelays, workerSeq} proxySrv_ destSess@ (userId, destSrv, qId) = do
621619 unlessM (readTVarIO active) $ throwE INACTIVE
622- proxySrv <- maybe (getNextServer c userId [destSrv]) pure proxySrv_
620+ proxySrv <- maybe (getNextServer c userId proxySrvs [destSrv]) pure proxySrv_
623621 ts <- liftIO getCurrentTime
624622 atomically (getClientVar proxySrv ts) >>= \ (tSess, auth, v) ->
625623 either (newProxyClient tSess auth ts) (waitForProxyClient tSess auth) v
@@ -1074,7 +1072,7 @@ sendOrProxySMPCommand ::
10741072 (SMPClient -> ProxiedRelay -> ExceptT SMPClientError IO (Either ProxyClientError () )) ->
10751073 (SMPClient -> ExceptT SMPClientError IO () ) ->
10761074 AM (Maybe SMPServer )
1077- sendOrProxySMPCommand c userId destSrv connId cmdStr senderId sendCmdViaProxy sendCmdDirectly = do
1075+ sendOrProxySMPCommand c userId destSrv@ ProtocolServer {host = destHosts} connId cmdStr senderId sendCmdViaProxy sendCmdDirectly = do
10781076 tSess <- mkTransportSession c userId destSrv connId
10791077 ifM shouldUseProxy (sendViaProxy Nothing tSess) (sendDirectly tSess $> Nothing )
10801078 where
@@ -1093,7 +1091,7 @@ sendOrProxySMPCommand c userId destSrv connId cmdStr senderId sendCmdViaProxy se
10931091 SPFAllow -> True
10941092 SPFAllowProtected -> ipAddressProtected cfg destSrv
10951093 SPFProhibit -> False
1096- unknownServer = liftIO $ maybe True (notElem destSrv . knownSrvs ) <$> TM. lookupIO userId (smpServers c)
1094+ unknownServer = liftIO $ maybe True (\ srvs -> all ( `S.notMember` knownHosts srvs) destHosts ) <$> TM. lookupIO userId (smpServers c)
10971095 sendViaProxy :: Maybe SMPServerWithAuth -> SMPTransportSession -> AM (Maybe SMPServer )
10981096 sendViaProxy proxySrv_ destSess@ (_, _, connId_) = do
10991097 r <- tryAgentError . withProxySession c proxySrv_ destSess senderId (" PFWD " <> cmdStr) $ \ (SMPConnectedClient smp _, proxySess@ ProxiedRelay {prBasicAuth}) -> do
@@ -2036,33 +2034,82 @@ userServers c = case protocolTypeI @p of
20362034 SPXFTP -> xftpServers c
20372035{-# INLINE userServers #-}
20382036
2039- pickServer :: forall p . NonEmpty (ProtoServerWithAuth p ) -> AM (ProtoServerWithAuth p )
2037+ pickServer :: NonEmpty (Maybe OperatorId , ProtoServerWithAuth p ) -> AM (ProtoServerWithAuth p )
20402038pickServer = \ case
2041- srv :| [] -> pure srv
2039+ (_, srv) :| [] -> pure srv
20422040 servers -> do
20432041 gen <- asks randomServer
2044- atomically $ (servers L. !! ) <$> stateTVar gen (randomR (0 , L. length servers - 1 ))
2042+ atomically $ snd . (servers L. !! ) <$> stateTVar gen (randomR (0 , L. length servers - 1 ))
20452043
2046- getNextServer :: forall p . (ProtocolTypeI p , UserProtocol p ) => AgentClient -> UserId -> [ProtocolServer p ] -> AM (ProtoServerWithAuth p )
2047- getNextServer c userId usedSrvs = withUserServers c userId $ \ srvs ->
2048- case L. nonEmpty $ deleteFirstsBy sameSrvAddr' (L. toList srvs) (map noAuthSrv usedSrvs) of
2049- Just srvs' -> pickServer srvs'
2050- _ -> pickServer srvs
2044+ getNextServer ::
2045+ (ProtocolTypeI p , UserProtocol p ) =>
2046+ AgentClient ->
2047+ UserId ->
2048+ (UserServers p -> NonEmpty (Maybe OperatorId , ProtoServerWithAuth p )) ->
2049+ [ProtocolServer p ] ->
2050+ AM (ProtoServerWithAuth p )
2051+ getNextServer c userId srvsSel usedSrvs = do
2052+ srvs <- getUserServers_ c userId srvsSel
2053+ snd <$> getNextServer_ srvs (usedOperatorsHosts srvs usedSrvs)
2054+
2055+ usedOperatorsHosts :: NonEmpty (Maybe OperatorId , ProtoServerWithAuth p ) -> [ProtocolServer p ] -> (Set (Maybe OperatorId ), Set TransportHost )
2056+ usedOperatorsHosts srvs usedSrvs = (usedOperators, usedHosts)
2057+ where
2058+ usedHosts = S. unions $ map serverHosts usedSrvs
2059+ usedOperators = S. fromList $ mapMaybe usedOp $ L. toList srvs
2060+ usedOp (op, srv) = if hasUsedHost srv then Just op else Nothing
2061+ hasUsedHost (ProtoServerWithAuth srv _) = any (`S.member` usedHosts) $ serverHosts srv
2062+
2063+ getNextServer_ ::
2064+ (ProtocolTypeI p , UserProtocol p ) =>
2065+ NonEmpty (Maybe OperatorId , ProtoServerWithAuth p ) ->
2066+ (Set (Maybe OperatorId ), Set TransportHost ) ->
2067+ AM (NonEmpty (Maybe OperatorId , ProtoServerWithAuth p ), ProtoServerWithAuth p )
2068+ getNextServer_ servers (usedOperators, usedHosts) = do
2069+ -- choose from servers of unused operators, when possible
2070+ let otherOpsSrvs = filterOrAll ((`S.notMember` usedOperators) . fst ) servers
2071+ -- choose from servers with unused hosts when possible
2072+ unusedSrvs = filterOrAll (isUnusedServer usedHosts) otherOpsSrvs
2073+ (otherOpsSrvs,) <$> pickServer unusedSrvs
2074+ where
2075+ filterOrAll p srvs = fromMaybe srvs $ L. nonEmpty $ L. filter p srvs
2076+
2077+ isUnusedServer :: Set TransportHost -> (Maybe OperatorId , ProtoServerWithAuth p ) -> Bool
2078+ isUnusedServer usedHosts (_, ProtoServerWithAuth ProtocolServer {host} _) = all (`S.notMember` usedHosts) host
20512079
2052- withUserServers :: forall p a . (ProtocolTypeI p , UserProtocol p ) => AgentClient -> UserId -> (NonEmpty (ProtoServerWithAuth p ) -> AM a ) -> AM a
2053- withUserServers c userId action =
2080+ getUserServers_ ::
2081+ (ProtocolTypeI p , UserProtocol p ) =>
2082+ AgentClient ->
2083+ UserId ->
2084+ (UserServers p -> NonEmpty (Maybe OperatorId , ProtoServerWithAuth p )) ->
2085+ AM (NonEmpty (Maybe OperatorId , ProtoServerWithAuth p ))
2086+ getUserServers_ c userId srvsSel =
20542087 liftIO (TM. lookupIO userId $ userServers c) >>= \ case
2055- Just srvs -> action $ enabledSrvs srvs
2088+ Just srvs -> pure $ srvsSel srvs
20562089 _ -> throwE $ INTERNAL " unknown userId - no user servers"
20572090
2058- withNextSrv :: forall p a . (ProtocolTypeI p , UserProtocol p ) => AgentClient -> UserId -> TVar [ProtocolServer p ] -> [ProtocolServer p ] -> (ProtoServerWithAuth p -> AM a ) -> AM a
2059- withNextSrv c userId usedSrvs initUsed action = do
2060- used <- readTVarIO usedSrvs
2061- srvAuth@ (ProtoServerWithAuth srv _) <- getNextServer c userId used
2062- srvs_ <- liftIO $ TM. lookupIO userId $ userServers c
2063- let unused = maybe [] ((\\ used) . map protoServer . L. toList . enabledSrvs) srvs_
2064- used' = if null unused then initUsed else srv : used
2065- atomically $ writeTVar usedSrvs $! used'
2091+ -- This function checks used servers and operators every time to allow
2092+ -- changing configuration while retry look is executing.
2093+ -- This function is not thread safe.
2094+ withNextSrv ::
2095+ (ProtocolTypeI p , UserProtocol p ) =>
2096+ AgentClient ->
2097+ UserId ->
2098+ (UserServers p -> NonEmpty (Maybe OperatorId , ProtoServerWithAuth p )) ->
2099+ TVar (Set TransportHost ) ->
2100+ [ProtocolServer p ] ->
2101+ (ProtoServerWithAuth p -> AM a ) ->
2102+ AM a
2103+ withNextSrv c userId srvsSel triedHosts usedSrvs action = do
2104+ srvs <- getUserServers_ c userId srvsSel
2105+ let (usedOperators, usedHosts) = usedOperatorsHosts srvs usedSrvs
2106+ tried <- readTVarIO triedHosts
2107+ let triedOrUsed = S. union tried usedHosts
2108+ (otherOpsSrvs, srvAuth@ (ProtoServerWithAuth srv _)) <- getNextServer_ srvs (usedOperators, triedOrUsed)
2109+ let newHosts = serverHosts srv
2110+ unusedSrvs = L. filter (isUnusedServer $ S. union triedOrUsed newHosts) otherOpsSrvs
2111+ ! tried' = if null unusedSrvs then S. empty else S. union tried newHosts
2112+ atomically $ writeTVar triedHosts tried'
20662113 action srvAuth
20672114
20682115incSMPServerStat :: AgentClient -> UserId -> SMPServer -> (AgentSMPServerStats -> TVar Int ) -> STM ()
0 commit comments