Skip to content

Commit 135e7fe

Browse files
committed
add test, treat BAD_SERVICE as temp error, only remove queue associations on service errors
1 parent b26e814 commit 135e7fe

File tree

5 files changed

+65
-18
lines changed

5 files changed

+65
-18
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,10 +1479,12 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14791479
subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do
14801480
withStore' c (\db -> getSubscriptionService db userId srv) >>= \case
14811481
Just serviceSub -> case M.lookup userId useServices of
1482-
-- TODO [certs rcv] improve logic to differentiate between permanent and temporary service subscription errors,
1483-
-- as the current logic would fall back to per-queue subscriptions on ANY service subscription error (e.g., network connection error).
1484-
Just True -> isRight <$> tryAllErrors (subscribeClientService c True userId srv serviceSub)
1485-
_ -> False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs db userId srv)
1482+
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
1483+
Left e | clientServiceError e -> unassocQueues $> False
1484+
_ -> pure True
1485+
_ -> unassocQueues $> False
1486+
where
1487+
unassocQueues = withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
14861488
_ -> pure False
14871489
subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int)
14881490
subscribeUserServer maxPending currPending ((userId, srv), hasService) = do
@@ -1498,7 +1500,6 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14981500
where
14991501
subscribe qs = do
15001502
rs <- subscribeUserServerQueues c userId srv qs
1501-
-- TODO [certs rcv] storeClientServiceAssocs store associations of queues with client service ID
15021503
ns <- asks ntfSupervisor
15031504
whenM (liftIO $ hasInstantNotifications ns) $ sendNtfCreate ns rs
15041505
sendNtfCreate :: NtfSupervisor -> [(RcvQueueSub, Either AgentErrorType (Maybe SMP.ServiceId))] -> AM' ()

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ module Simplex.Messaging.Agent.Client
120120
getAgentSubscriptions,
121121
slowNetworkConfig,
122122
protocolClientError,
123+
clientServiceError,
123124
Worker (..),
124125
SessionVar (..),
125126
SubscriptionsInfo (..),
@@ -1260,7 +1261,6 @@ protocolClientError protocolError_ host = \case
12601261
clientServiceError :: AgentErrorType -> Bool
12611262
clientServiceError = \case
12621263
BROKER _ NO_SERVICE -> True
1263-
BROKER _ (TRANSPORT (TEHandshake BAD_SERVICE)) -> True -- TODO [certs rcv] this error may be temporary, so we should possibly resubscribe.
12641264
SMP _ SMP.SERVICE -> True
12651265
SMP _ (SMP.PROXY (SMP.BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen.
12661266
_ -> False
@@ -1566,6 +1566,8 @@ temporaryAgentError :: AgentErrorType -> Bool
15661566
temporaryAgentError = \case
15671567
BROKER _ e -> tempBrokerError e
15681568
SMP _ (SMP.PROXY (SMP.BROKER e)) -> tempBrokerError e
1569+
SMP _ (SMP.STORE _) -> True
1570+
NTF _ (SMP.STORE _) -> True
15691571
XFTP _ XFTP.TIMEOUT -> True
15701572
PROXY _ _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> tempBrokerError e
15711573
PROXY _ _ (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> True
@@ -1576,6 +1578,7 @@ temporaryAgentError = \case
15761578
tempBrokerError = \case
15771579
NETWORK _ -> True
15781580
TIMEOUT -> True
1581+
TRANSPORT (TEHandshake BAD_SERVICE) -> True -- this error is considered temporary because it is DB error
15791582
_ -> False
15801583

15811584
temporaryOrHostError :: AgentErrorType -> Bool
@@ -1722,8 +1725,12 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
17221725
notifySub' c "" $ ERR e
17231726

17241727
resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult
1725-
resubscribeClientService c tSess serviceSub =
1726-
withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub
1728+
resubscribeClientService c tSess@(userId, srv, _) serviceSub =
1729+
withServiceClient c tSess (\smp _ -> subscribeClientService_ c True tSess smp serviceSub) `catchE` \e -> do
1730+
when (clientServiceError e) $ do
1731+
qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
1732+
void $ lift $ subscribeUserServerQueues c userId srv qs
1733+
throwE e
17271734

17281735
-- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event
17291736
subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult
@@ -1737,16 +1744,10 @@ subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) =
17371744

17381745
withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a
17391746
withServiceClient c tSess@(userId, srv, _) subscribe =
1740-
unassocOnError $ withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
1747+
withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
17411748
case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of
17421749
Just smpServiceId -> subscribe smp smpServiceId
17431750
Nothing -> throwE PCEServiceUnavailable
1744-
where
1745-
unassocOnError a = a `catchE` \e -> do
1746-
when (clientServiceError e) $ do
1747-
qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
1748-
void $ lift $ subscribeUserServerQueues c userId srv qs
1749-
throwE e
17501751

17511752
-- TODO [certs rcv] send subscription error event?
17521753
subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult

src/Simplex/Messaging/Client.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,6 @@ temporaryClientError = \case
782782
smpClientServiceError :: SMPClientError -> Bool
783783
smpClientServiceError = \case
784784
PCEServiceUnavailable -> True
785-
PCETransportError (TEHandshake BAD_SERVICE) -> True -- TODO [certs rcv] this error may be temporary, so we should possibly resubscribe.
786785
PCEProtocolError SERVICE -> True
787786
PCEProtocolError (PROXY (BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen.
788787
_ -> False

src/Simplex/Messaging/Protocol.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ module Simplex.Messaging.Protocol
143143
IdsHash (..),
144144
ServiceSub (..),
145145
ServiceSubResult (..),
146+
ServiceSubError (..),
146147
serviceSubResult,
147148
queueIdsHash,
148149
queueIdHash,

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import Data.ByteString.Char8 (ByteString)
6666
import qualified Data.ByteString.Char8 as B
6767
import Data.Either (isRight)
6868
import Data.Int (Int64)
69-
import Data.List (find, isSuffixOf, nub)
69+
import Data.List (find, isPrefixOf, isSuffixOf, nub)
7070
import Data.List.NonEmpty (NonEmpty)
7171
import qualified Data.Map as M
7272
import Data.Maybe (isJust, isNothing)
@@ -113,7 +113,7 @@ import Simplex.Messaging.Util (bshow, diffToMicroseconds)
113113
import Simplex.Messaging.Version (VersionRange (..))
114114
import qualified Simplex.Messaging.Version as V
115115
import Simplex.Messaging.Version.Internal (Version (..))
116-
import System.Directory (copyFile, renameFile)
116+
import System.Directory (copyFile, removeFile, renameFile)
117117
import Test.Hspec hiding (fit, it)
118118
import UnliftIO
119119
import Util
@@ -124,10 +124,13 @@ import Fixtures
124124
#endif
125125
#if defined(dbServerPostgres)
126126
import qualified Database.PostgreSQL.Simple as PSQL
127+
import qualified Simplex.Messaging.Agent.Store.Postgres as Postgres
128+
import qualified Simplex.Messaging.Agent.Store.Postgres.Common as Postgres
127129
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
128130
import Simplex.Messaging.Server.MsgStore.Postgres (PostgresQueue)
129131
import Simplex.Messaging.Server.MsgStore.Types (QSType (..))
130132
import Simplex.Messaging.Server.QueueStore.Postgres
133+
import Simplex.Messaging.Server.QueueStore.Postgres.Migrations
131134
import Simplex.Messaging.Server.QueueStore.Types (QueueStoreClass (..))
132135
#endif
133136

@@ -476,6 +479,7 @@ functionalAPITests ps = do
476479
withSmpServer ps testTwoUsers
477480
describe "Client service certificates" $ do
478481
it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps
482+
it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps
479483
describe "Connection switch" $ do
480484
describe "should switch delivery to the new queue" $
481485
testServerMatrix2 ps testSwitchConnection
@@ -3715,6 +3719,47 @@ testClientServiceConnection ps = do
37153719
exchangeGreetingsMsgId 4 user sId' service uId'
37163720
exchangeGreetingsMsgId 10 service uId user sId
37173721

3722+
testClientServiceIDChange :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
3723+
testClientServiceIDChange ps@(_, ASType qs _) = do
3724+
(sId, uId) <- withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
3725+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3726+
conns@(sId, uId) <- makeConnection service user
3727+
exchangeGreetings service uId user sId
3728+
pure conns
3729+
_ :: () <- case qs of
3730+
SQSPostgres -> do
3731+
#if defined(dbServerPostgres)
3732+
st <- either (error . show) pure =<< Postgres.createDBStore testStoreDBOpts serverMigrations (MigrationConfig MCError Nothing)
3733+
void $ Postgres.withTransaction st (`PSQL.execute_` "DELETE FROM services")
3734+
#else
3735+
pure ()
3736+
#endif
3737+
SQSMemory -> do
3738+
s <- readFile testStoreLogFile
3739+
removeFile testStoreLogFile
3740+
writeFile testStoreLogFile $ unlines $ filter (not . ("NEW_SERVICE" `isPrefixOf`)) $ lines s
3741+
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
3742+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3743+
subscribeAllConnections service False Nothing
3744+
liftIO $ getInAnyOrder service
3745+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 1 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False,
3746+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False,
3747+
\case ("", "", AEvt SAENone (UP _ _)) -> True; _ -> False
3748+
]
3749+
subscribeAllConnections user False Nothing
3750+
("", "", UP _ [_]) <- nGet user
3751+
exchangeGreetingsMsgId 4 service uId user sId
3752+
-- disable service in the client
3753+
-- The test uses True for non-existing user to make sure it's removed for user 1,
3754+
-- because if no users use services, then it won't be checking them to optimize for most clients.
3755+
withAgentClientsServers2 (agentCfg, initAgentServers {useServices = M.fromList [(100, True)]}) (agentCfg, initAgentServers) $ \notService user -> do
3756+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3757+
subscribeAllConnections notService False Nothing
3758+
("", "", UP _ [_]) <- nGet notService
3759+
subscribeAllConnections user False Nothing
3760+
("", "", UP _ [_]) <- nGet user
3761+
exchangeGreetingsMsgId 6 notService uId user sId
3762+
37183763
getSMPAgentClient' :: Int -> AgentConfig -> InitialAgentServers -> String -> IO AgentClient
37193764
getSMPAgentClient' clientId cfg' initServers dbPath = do
37203765
Right st <- liftIO $ createStore dbPath

0 commit comments

Comments
 (0)