@@ -147,7 +147,6 @@ module Simplex.Messaging.Agent.Client
147147 userServers ,
148148 pickServer ,
149149 getNextServer ,
150- withUserServers ,
151150 withNextSrv ,
152151 incSMPServerStat ,
153152 incSMPServerStat' ,
@@ -191,12 +190,12 @@ import qualified Data.ByteString.Char8 as B
191190import Data.Either (isRight , partitionEithers )
192191import Data.Functor (($>) )
193192import Data.Int (Int64 )
194- import Data.List (deleteFirstsBy , find , foldl' , partition , (\\) )
193+ import Data.List (find , foldl' , partition )
195194import Data.List.NonEmpty (NonEmpty (.. ), (<|) )
196195import qualified Data.List.NonEmpty as L
197196import Data.Map.Strict (Map )
198197import qualified Data.Map.Strict as M
199- import Data.Maybe (catMaybes , fromMaybe , isJust , isNothing , listToMaybe )
198+ import Data.Maybe (catMaybes , fromMaybe , isJust , isNothing , listToMaybe , mapMaybe )
200199import Data.Set (Set )
201200import qualified Data.Set as S
202201import Data.Text (Text )
@@ -262,7 +261,6 @@ import Simplex.Messaging.Protocol
262261 VersionSMPC ,
263262 XFTPServer ,
264263 XFTPServerWithAuth ,
265- sameSrvAddr' ,
266264 pattern NoEntity ,
267265 )
268266import qualified Simplex.Messaging.Protocol as SMP
@@ -617,7 +615,7 @@ getSMPServerClient c@AgentClient {active, smpClients, workerSeq} tSess = do
617615getSMPProxyClient :: AgentClient -> Maybe SMPServerWithAuth -> SMPTransportSession -> AM (SMPConnectedClient , Either AgentErrorType ProxiedRelay )
618616getSMPProxyClient c@ AgentClient {active, smpClients, smpProxiedRelays, workerSeq} proxySrv_ destSess@ (userId, destSrv, qId) = do
619617 unlessM (readTVarIO active) $ throwE INACTIVE
620- proxySrv <- maybe (getNextServer c userId [destSrv]) pure proxySrv_
618+ proxySrv <- maybe (getNextServer c userId proxySrvs [destSrv]) pure proxySrv_
621619 ts <- liftIO getCurrentTime
622620 atomically (getClientVar proxySrv ts) >>= \ (tSess, auth, v) ->
623621 either (newProxyClient tSess auth ts) (waitForProxyClient tSess auth) v
@@ -1072,7 +1070,7 @@ sendOrProxySMPCommand ::
10721070 (SMPClient -> ProxiedRelay -> ExceptT SMPClientError IO (Either ProxyClientError () )) ->
10731071 (SMPClient -> ExceptT SMPClientError IO () ) ->
10741072 AM (Maybe SMPServer )
1075- sendOrProxySMPCommand c userId destSrv connId cmdStr senderId sendCmdViaProxy sendCmdDirectly = do
1073+ sendOrProxySMPCommand c userId destSrv@ ProtocolServer {host = destHosts} connId cmdStr senderId sendCmdViaProxy sendCmdDirectly = do
10761074 tSess <- mkTransportSession c userId destSrv connId
10771075 ifM shouldUseProxy (sendViaProxy Nothing tSess) (sendDirectly tSess $> Nothing )
10781076 where
@@ -1091,7 +1089,7 @@ sendOrProxySMPCommand c userId destSrv connId cmdStr senderId sendCmdViaProxy se
10911089 SPFAllow -> True
10921090 SPFAllowProtected -> ipAddressProtected cfg destSrv
10931091 SPFProhibit -> False
1094- unknownServer = liftIO $ maybe True (notElem destSrv . knownSrvs ) <$> TM. lookupIO userId (smpServers c)
1092+ unknownServer = liftIO $ maybe True (\ srvs -> all ( `S.notMember` knownHosts srvs) destHosts ) <$> TM. lookupIO userId (smpServers c)
10951093 sendViaProxy :: Maybe SMPServerWithAuth -> SMPTransportSession -> AM (Maybe SMPServer )
10961094 sendViaProxy proxySrv_ destSess@ (_, _, connId_) = do
10971095 r <- tryAgentError . withProxySession c proxySrv_ destSess senderId (" PFWD " <> cmdStr) $ \ (SMPConnectedClient smp _, proxySess@ ProxiedRelay {prBasicAuth}) -> do
@@ -2033,33 +2031,82 @@ userServers c = case protocolTypeI @p of
20332031 SPXFTP -> xftpServers c
20342032{-# INLINE userServers #-}
20352033
2036- pickServer :: forall p . NonEmpty (ProtoServerWithAuth p ) -> AM (ProtoServerWithAuth p )
2034+ pickServer :: NonEmpty (OperatorId , ProtoServerWithAuth p ) -> AM (ProtoServerWithAuth p )
20372035pickServer = \ case
2038- srv :| [] -> pure srv
2036+ (_, srv) :| [] -> pure srv
20392037 servers -> do
20402038 gen <- asks randomServer
2041- atomically $ (servers L. !! ) <$> stateTVar gen (randomR (0 , L. length servers - 1 ))
2039+ atomically $ snd . (servers L. !! ) <$> stateTVar gen (randomR (0 , L. length servers - 1 ))
20422040
2043- getNextServer :: forall p . (ProtocolTypeI p , UserProtocol p ) => AgentClient -> UserId -> [ProtocolServer p ] -> AM (ProtoServerWithAuth p )
2044- getNextServer c userId usedSrvs = withUserServers c userId $ \ srvs ->
2045- case L. nonEmpty $ deleteFirstsBy sameSrvAddr' (L. toList srvs) (map noAuthSrv usedSrvs) of
2046- Just srvs' -> pickServer srvs'
2047- _ -> pickServer srvs
2041+ getNextServer ::
2042+ (ProtocolTypeI p , UserProtocol p ) =>
2043+ AgentClient ->
2044+ UserId ->
2045+ (UserServers p -> NonEmpty (OperatorId , ProtoServerWithAuth p )) ->
2046+ [ProtocolServer p ] ->
2047+ AM (ProtoServerWithAuth p )
2048+ getNextServer c userId srvsSel usedSrvs = do
2049+ srvs <- getUserServers_ c userId srvsSel
2050+ snd <$> getNextServer_ srvs (usedOperatorsHosts srvs usedSrvs)
2051+
2052+ usedOperatorsHosts :: NonEmpty (OperatorId , ProtoServerWithAuth p ) -> [ProtocolServer p ] -> (Set OperatorId , Set TransportHost )
2053+ usedOperatorsHosts srvs usedSrvs = (usedOperators, usedHosts)
2054+ where
2055+ usedHosts = S. unions $ map serverHosts usedSrvs
2056+ usedOperators = S. fromList $ mapMaybe usedOp $ L. toList srvs
2057+ usedOp (op, srv) = if hasUsedHost srv then Just op else Nothing
2058+ hasUsedHost (ProtoServerWithAuth srv _) = any (`S.member` usedHosts) $ serverHosts srv
2059+
2060+ getNextServer_ ::
2061+ (ProtocolTypeI p , UserProtocol p ) =>
2062+ NonEmpty (OperatorId , ProtoServerWithAuth p ) ->
2063+ (Set OperatorId , Set TransportHost ) ->
2064+ AM (NonEmpty (OperatorId , ProtoServerWithAuth p ), ProtoServerWithAuth p )
2065+ getNextServer_ servers (usedOperators, usedHosts) = do
2066+ -- choose from servers of unused operators, when possible
2067+ let otherOpsSrvs = filterOrAll ((`S.notMember` usedOperators) . fst ) servers
2068+ -- choose from servers with unused hosts when possible
2069+ unusedSrvs = filterOrAll (isUnusedServer usedHosts) otherOpsSrvs
2070+ (otherOpsSrvs,) <$> pickServer unusedSrvs
2071+ where
2072+ filterOrAll p srvs = fromMaybe srvs $ L. nonEmpty $ L. filter p srvs
2073+
2074+ isUnusedServer :: Set TransportHost -> (OperatorId , ProtoServerWithAuth p ) -> Bool
2075+ isUnusedServer usedHosts (_, ProtoServerWithAuth ProtocolServer {host} _) = all (`S.notMember` usedHosts) host
20482076
2049- withUserServers :: forall p a . (ProtocolTypeI p , UserProtocol p ) => AgentClient -> UserId -> (NonEmpty (ProtoServerWithAuth p ) -> AM a ) -> AM a
2050- withUserServers c userId action =
2077+ getUserServers_ ::
2078+ (ProtocolTypeI p , UserProtocol p ) =>
2079+ AgentClient ->
2080+ UserId ->
2081+ (UserServers p -> NonEmpty (OperatorId , ProtoServerWithAuth p )) ->
2082+ AM (NonEmpty (OperatorId , ProtoServerWithAuth p ))
2083+ getUserServers_ c userId srvsSel =
20512084 liftIO (TM. lookupIO userId $ userServers c) >>= \ case
2052- Just srvs -> action $ enabledSrvs srvs
2085+ Just srvs -> pure $ srvsSel srvs
20532086 _ -> throwE $ INTERNAL " unknown userId - no user servers"
20542087
2055- withNextSrv :: forall p a . (ProtocolTypeI p , UserProtocol p ) => AgentClient -> UserId -> TVar [ProtocolServer p ] -> [ProtocolServer p ] -> (ProtoServerWithAuth p -> AM a ) -> AM a
2056- withNextSrv c userId usedSrvs initUsed action = do
2057- used <- readTVarIO usedSrvs
2058- srvAuth@ (ProtoServerWithAuth srv _) <- getNextServer c userId used
2059- srvs_ <- liftIO $ TM. lookupIO userId $ userServers c
2060- let unused = maybe [] ((\\ used) . map protoServer . L. toList . enabledSrvs) srvs_
2061- used' = if null unused then initUsed else srv : used
2062- atomically $ writeTVar usedSrvs $! used'
2088+ -- This function checks used servers and operators every time to allow
2089+ -- changing configuration while retry look is executing.
2090+ -- This function is not thread safe.
2091+ withNextSrv ::
2092+ (ProtocolTypeI p , UserProtocol p ) =>
2093+ AgentClient ->
2094+ UserId ->
2095+ (UserServers p -> NonEmpty (OperatorId , ProtoServerWithAuth p )) ->
2096+ TVar (Set TransportHost ) ->
2097+ [ProtocolServer p ] ->
2098+ (ProtoServerWithAuth p -> AM a ) ->
2099+ AM a
2100+ withNextSrv c userId srvsSel triedHosts usedSrvs action = do
2101+ srvs <- getUserServers_ c userId srvsSel
2102+ let (usedOperators, usedHosts) = usedOperatorsHosts srvs usedSrvs
2103+ tried <- readTVarIO triedHosts
2104+ let triedOrUsed = S. union tried usedHosts
2105+ (otherOpsSrvs, srvAuth@ (ProtoServerWithAuth srv _)) <- getNextServer_ srvs (usedOperators, triedOrUsed)
2106+ let newHosts = serverHosts srv
2107+ unusedSrvs = L. filter (isUnusedServer $ S. union triedOrUsed newHosts) otherOpsSrvs
2108+ ! tried' = if null unusedSrvs then S. empty else S. union tried newHosts
2109+ atomically $ writeTVar triedHosts tried'
20632110 action srvAuth
20642111
20652112incSMPServerStat :: AgentClient -> UserId -> SMPServer -> (AgentSMPServerStats -> TVar Int ) -> STM ()
0 commit comments