Skip to content

Commit 197e3c8

Browse files
committed
agent: test migrating to/from service subscriptions, fixes
1 parent 2000b11 commit 197e3c8

File tree

4 files changed

+183
-16
lines changed

4 files changed

+183
-16
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,9 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14841484
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
14851485
Right (ServiceSubResult e _) -> case e of
14861486
Just SSErrorServiceId {} -> unassocQueues
1487+
-- Below would resubscribe all queues after service was disabled and re-enabled
1488+
-- Possibly, we should always resubscribe all with expected is greated than subscribed
1489+
Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
14871490
_ -> pure True
14881491
Left e -> do
14891492
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,23 +1526,23 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
15261526

15271527
processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> Maybe ServiceId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ([RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)])
15281528
processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
1529-
pending <- SS.getPendingSubs tSess $ currentSubs c
1530-
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pending) (M.empty, ([], []), [], 0) rs
1529+
pendingSubs <- SS.getPendingQueueSubs tSess $ currentSubs c
1530+
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pendingSubs) (M.empty, ([], []), [], 0) rs
15311531
unless (M.null failed) $ do
15321532
incSMPServerStat' c userId srv connSubErrs $ M.size failed
15331533
failSubscriptions c tSess failed
15341534
unless (null qs && null sQs) $ do
15351535
incSMPServerStat' c userId srv connSubscribed $ length qs + length sQs
1536-
SS.batchAddActiveSubs tSess sessId subscribed $ currentSubs c
1536+
SS.batchAddActiveSubs tSess sessId serviceId_ subscribed $ currentSubs c
15371537
unless (ignored == 0) $ incSMPServerStat' c userId srv connSubIgnored ignored
15381538
pure (sQs, notices)
15391539
where
15401540
partitionResults ::
1541-
(Map SMP.RecipientId RcvQueueSub, Maybe ServiceSub) ->
1541+
Map SMP.RecipientId RcvQueueSub ->
15421542
(RcvQueueSub, Either SMPClientError (Maybe ServiceId)) ->
15431543
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int) ->
15441544
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int)
1545-
partitionResults (pendingSubs, pendingSS) (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of
1545+
partitionResults pendingSubs (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of
15461546
Left e -> case smpErrorClientNotice e of
15471547
Just notice_ -> (failed', subscribed, (rq, notice_) : notices, ignored)
15481548
where
@@ -1554,8 +1554,8 @@ processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
15541554
failed' = M.insert rcvId e failed
15551555
Right serviceId_'
15561556
| rcvId `M.member` pendingSubs ->
1557-
let subscribed' = case (serviceId_, serviceId_', pendingSS) of
1558-
(Just sId, Just sId', Just ServiceSub {smpServiceId}) | sId == sId' && sId == smpServiceId -> (qs, rq : sQs)
1557+
let subscribed' = case (serviceId_, serviceId_') of
1558+
(Just sId, Just sId') | sId == sId' -> (qs, rq : sQs)
15591559
_ -> (rq : qs, sQs)
15601560
in (failed, subscribed', notices', ignored)
15611561
| otherwise -> (failed, subscribed, notices', ignored + 1)

src/Simplex/Messaging/Agent/TSessionSubs.hs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ module Simplex.Messaging.Agent.TSessionSubs
2626
deleteServiceSub,
2727
hasPendingSubs,
2828
getPendingSubs,
29+
getPendingQueueSubs,
2930
getActiveSubs,
3031
setSubsPending,
3132
updateClientNotices,
@@ -40,12 +41,12 @@ import Data.Int (Int64)
4041
import Data.List (foldl')
4142
import Data.Map.Strict (Map)
4243
import qualified Data.Map.Strict as M
43-
import Data.Maybe (isJust)
44+
import Data.Maybe (fromMaybe, isJust)
4445
import qualified Data.Set as S
4546
import Simplex.Messaging.Agent.Protocol (SMPQueue (..))
4647
import Simplex.Messaging.Agent.Store (RcvQueue, RcvQueueSub (..), SomeRcvQueue, StoredRcvQueue (rcvServiceAssoc), rcvQueueSub)
4748
import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..))
48-
import Simplex.Messaging.Protocol (RecipientId, ServiceSub (..), queueIdHash)
49+
import Simplex.Messaging.Protocol (RecipientId, ServiceSub (..), noIdsHash, queueIdHash)
4950
import Simplex.Messaging.TMap (TMap)
5051
import qualified Simplex.Messaging.TMap as TM
5152
import Simplex.Messaging.Transport
@@ -138,22 +139,27 @@ addActiveSub' tSess sessId rq serviceAssoc ss = do
138139
in modifyTVar' (activeServiceSub s) (updateServiceSub <$>)
139140
else TM.insert rId rq $ pendingSubs s
140141

141-
batchAddActiveSubs :: SMPTransportSession -> SessionId -> ([RcvQueueSub], [RcvQueueSub]) -> TSessionSubs -> STM ()
142-
batchAddActiveSubs tSess sessId (rqs, serviceRQs) ss = do
142+
batchAddActiveSubs :: SMPTransportSession -> SessionId -> Maybe ServiceId -> ([RcvQueueSub], [RcvQueueSub]) -> TSessionSubs -> STM ()
143+
batchAddActiveSubs tSess sessId serviceId_ (rqs, serviceRQs) ss = do
143144
s <- getSessSubs tSess ss
144145
sessId' <- readTVar $ subsSessId s
145146
let qs = M.fromList $ map (\rq -> (rcvId rq, rq)) rqs
147+
serviceQs = M.fromList $ map (\rq -> (rcvId rq, rq)) serviceRQs
146148
if Just sessId == sessId'
147149
then do
148150
TM.union qs $ activeSubs s
149151
modifyTVar' (pendingSubs s) (`M.difference` qs)
150-
serviceSub_ <- readTVar $ activeServiceSub s
151-
forM_ serviceSub_ $ \(ServiceSub serviceId n idsHash) -> do
152-
unless (null serviceRQs) $ do
152+
forM_ serviceId_ $ \serviceId -> unless (null serviceRQs) $ do
153+
modifyTVar' (pendingSubs s) (`M.difference` serviceQs)
154+
ServiceSub serviceId' n idsHash <-
155+
fromMaybe (ServiceSub serviceId 0 noIdsHash) <$> readTVar (activeServiceSub s)
156+
when (serviceId == serviceId') $ do
153157
let idsHash' = idsHash <> mconcat (map (queueIdHash . rcvId) serviceRQs)
154158
n' = n + fromIntegral (length serviceRQs)
155159
writeTVar (activeServiceSub s) $ Just $ ServiceSub serviceId n' idsHash'
156-
else TM.union qs $ pendingSubs s
160+
else do
161+
TM.union qs $ pendingSubs s
162+
when (isJust serviceId_ && not (null serviceRQs)) $ TM.union serviceQs $ pendingSubs s
157163

158164
batchAddPendingSubs :: SMPTransportSession -> [RcvQueueSub] -> TSessionSubs -> STM ()
159165
batchAddPendingSubs tSess rqs ss = do
@@ -191,6 +197,10 @@ getPendingSubs tSess = lookupSubs tSess >=> maybe (pure (M.empty, Nothing)) get
191197
where
192198
get s = liftM2 (,) (readTVar $ pendingSubs s) (readTVar $ pendingServiceSub s)
193199

200+
getPendingQueueSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
201+
getPendingQueueSubs = getSubs_ pendingSubs
202+
{-# INLINE getPendingQueueSubs #-}
203+
194204
getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
195205
getActiveSubs = getSubs_ activeSubs
196206
{-# INLINE getActiveSubs #-}

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ functionalAPITests ps = do
480480
describe "Client service certificates" $ do
481481
it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps
482482
it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps
483+
it "migrate connections to and from service" $ testMigrateConnectionsToService ps
483484
describe "Connection switch" $ do
484485
describe "should switch delivery to the new queue" $
485486
testServerMatrix2 ps testSwitchConnection
@@ -3743,7 +3744,7 @@ testClientServiceIDChange ps@(_, ASType qs _) = do
37433744
liftIO $ getInAnyOrder service
37443745
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 1 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False,
37453746
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False,
3746-
\case ("", "", AEvt SAENone (UP _ _)) -> True; _ -> False
3747+
\case ("", "", AEvt SAENone (UP _ [_])) -> True; _ -> False
37473748
]
37483749
subscribeAllConnections user False Nothing
37493750
("", "", UP _ [_]) <- nGet user
@@ -3759,6 +3760,159 @@ testClientServiceIDChange ps@(_, ASType qs _) = do
37593760
("", "", UP _ [_]) <- nGet user
37603761
exchangeGreetingsMsgId 6 notService uId user sId
37613762

3763+
testMigrateConnectionsToService :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
3764+
testMigrateConnectionsToService ps = do
3765+
(((sId1, uId1), (uId2, sId2)), ((sId3, uId3), (uId4, sId4)), ((sId5, uId5), (uId6, sId6))) <-
3766+
withSmpServerStoreLogOn ps testPort $ \_ -> do
3767+
-- starting without service
3768+
cs12@((sId1, uId1), (uId2, sId2)) <-
3769+
withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user ->
3770+
runRight $ (,) <$> makeConnection notService user <*> makeConnection user notService
3771+
-- migrating to service
3772+
cs34@((sId3, uId3), (uId4, sId4)) <-
3773+
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> runRight $ do
3774+
subscribeAllConnections service False Nothing
3775+
service `up` 2
3776+
subscribeAllConnections user False Nothing
3777+
user `up` 2
3778+
exchangeGreetingsMsgId 2 service uId1 user sId1
3779+
exchangeGreetingsMsgId 2 service uId2 user sId2
3780+
(,) <$> makeConnection service user <*> makeConnection user service
3781+
-- starting as service
3782+
cs56 <-
3783+
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> runRight $ do
3784+
subscribeAllConnections service False Nothing
3785+
liftIO $ getInAnyOrder service
3786+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 4 _)))) -> True; _ -> False,
3787+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
3788+
]
3789+
subscribeAllConnections user False Nothing
3790+
user `up` 4
3791+
exchangeGreetingsMsgId 4 service uId1 user sId1
3792+
exchangeGreetingsMsgId 4 service uId2 user sId2
3793+
exchangeGreetingsMsgId 2 service uId3 user sId3
3794+
exchangeGreetingsMsgId 2 service uId4 user sId4
3795+
(,) <$> makeConnection service user <*> makeConnection user service
3796+
pure (cs12, cs34, cs56)
3797+
-- server reconnecting resubscribes service
3798+
let testSendMessages6 s u n = do
3799+
exchangeGreetingsMsgId (n + 4) s uId1 u sId1
3800+
exchangeGreetingsMsgId (n + 4) s uId2 u sId2
3801+
exchangeGreetingsMsgId (n + 2) s uId3 u sId3
3802+
exchangeGreetingsMsgId (n + 2) s uId4 u sId4
3803+
exchangeGreetingsMsgId n s uId5 u sId5
3804+
exchangeGreetingsMsgId n s uId6 u sId6
3805+
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
3806+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
3807+
subscribeAllConnections service False Nothing
3808+
liftIO $ getInAnyOrder service
3809+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 6 _)))) -> True; _ -> False,
3810+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
3811+
]
3812+
subscribeAllConnections user False Nothing
3813+
user `up` 6
3814+
testSendMessages6 service user 2
3815+
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 6 _)) <- nGet service
3816+
user `down` 6
3817+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
3818+
liftIO $ getInAnyOrder service
3819+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 6 _)))) -> True; _ -> False,
3820+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
3821+
]
3822+
user `up` 6
3823+
testSendMessages6 service user 4
3824+
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 6 _)) <- nGet service
3825+
user `down` 6
3826+
-- disabling service and adding connections
3827+
((sId7, uId7), (uId8, sId8)) <-
3828+
withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user -> do
3829+
cs78@((sId7, uId7), (uId8, sId8)) <-
3830+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3831+
subscribeAllConnections notService False Nothing
3832+
notService `up` 6
3833+
subscribeAllConnections user False Nothing
3834+
user `up` 6
3835+
testSendMessages6 notService user 6
3836+
(,) <$> makeConnection notService user <*> makeConnection user notService
3837+
notService `down` 8
3838+
user `down` 8
3839+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3840+
notService `up` 8
3841+
user `up` 8
3842+
testSendMessages6 notService user 8
3843+
exchangeGreetingsMsgId 2 notService uId7 user sId7
3844+
exchangeGreetingsMsgId 2 notService uId8 user sId8
3845+
notService `down` 8
3846+
user `down` 8
3847+
pure cs78
3848+
let testSendMessages8 s u n = do
3849+
testSendMessages6 s u (n + 8)
3850+
exchangeGreetingsMsgId (n + 2) s uId7 u sId7
3851+
exchangeGreetingsMsgId (n + 2) s uId8 u sId8
3852+
-- re-enabling service and adding connections
3853+
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
3854+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
3855+
subscribeAllConnections service False Nothing
3856+
-- the "error" in SERVICE_UP event is expected, because when service was disabled for the user,
3857+
-- the service and associations were not removed, to optimize non-service clients.
3858+
liftIO $ getInAnyOrder service
3859+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 6 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False,
3860+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
3861+
]
3862+
service `up` 8
3863+
subscribeAllConnections user False Nothing
3864+
user `up` 8
3865+
testSendMessages8 service user 2
3866+
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 8 _)) <- nGet service
3867+
user `down` 8
3868+
-- re-connect to server
3869+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
3870+
liftIO $ getInAnyOrder service
3871+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False,
3872+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
3873+
]
3874+
user `up` 8
3875+
testSendMessages8 service user 4
3876+
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ _ _)) <- nGet service -- should be 8 here
3877+
user `down` 8
3878+
-- restart agents
3879+
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
3880+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
3881+
subscribeAllConnections service False Nothing
3882+
liftIO $ getInAnyOrder service
3883+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False,
3884+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
3885+
]
3886+
subscribeAllConnections user False Nothing
3887+
user `up` 8
3888+
testSendMessages8 service user 6
3889+
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 8 _)) <- nGet service
3890+
user `down` 8
3891+
runRight_ $ do
3892+
void $ sendMessage user sId7 SMP.noMsgFlags "hello 1"
3893+
void $ sendMessage user sId8 SMP.noMsgFlags "hello 2"
3894+
-- re-connect to server
3895+
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
3896+
liftIO $ getInAnyOrder service
3897+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False,
3898+
\case ("", c, AEvt SAEConn (Msg "hello 1")) -> c == uId7; _ -> False,
3899+
\case ("", c, AEvt SAEConn (Msg "hello 2")) -> c == uId8; _ -> False,
3900+
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
3901+
]
3902+
liftIO $ getInAnyOrder user
3903+
[ \case ("", "", AEvt SAENone (UP _ [_, _, _, _, _, _, _, _])) -> True; _ -> False,
3904+
\case ("", c, AEvt SAEConn (SENT 10)) -> c == sId7; _ -> False,
3905+
\case ("", c, AEvt SAEConn (SENT 10)) -> c == sId8; _ -> False
3906+
]
3907+
testSendMessages6 service user 16
3908+
where
3909+
up c n = do
3910+
("", "", UP _ conns) <- nGet c
3911+
liftIO $ length conns `shouldBe` n
3912+
down c n = do
3913+
("", "", DOWN _ conns) <- nGet c
3914+
liftIO $ length conns `shouldBe` n
3915+
37623916
getSMPAgentClient' :: Int -> AgentConfig -> InitialAgentServers -> String -> IO AgentClient
37633917
getSMPAgentClient' clientId cfg' initServers dbPath = do
37643918
Right st <- liftIO $ createStore dbPath

0 commit comments

Comments
 (0)