Skip to content

Commit 7a3713f

Browse files
committed
Merge branch 'master' into rcv-services
2 parents d930bba + f8f172f commit 7a3713f

File tree

5 files changed

+35
-20
lines changed

5 files changed

+35
-20
lines changed

simplexmq.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cabal-version: 1.12
22

33
name: simplexmq
4-
version: 6.5.0.11
4+
version: 6.5.0.12
55
synopsis: SimpleXMQ message broker
66
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
77
<./docs/Simplex-Messaging-Client.html client> and

src/Simplex/Messaging/Agent.hs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,9 @@ setConnShortLinkAsync :: AgentClient -> ACorrId -> ConnId -> UserConnLinkData 'C
365365
setConnShortLinkAsync c = withAgentEnv c .:: setConnShortLinkAsync' c
366366
{-# INLINE setConnShortLinkAsync #-}
367367

368-
-- | Get and verify data from short link (LGET/LKEY command) asynchronously, synchronous response is new connection id
369-
getConnShortLinkAsync :: AgentClient -> UserId -> ACorrId -> ConnShortLink 'CMContact -> AE ConnId
370-
getConnShortLinkAsync c = withAgentEnv c .:. getConnShortLinkAsync' c
368+
-- | Get and verify data from short link (LGET/LKEY command) asynchronously, synchronous response is new/passed connection id
369+
getConnShortLinkAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> ConnShortLink 'CMContact -> AE ConnId
370+
getConnShortLinkAsync c = withAgentEnv c .:: getConnShortLinkAsync' c
371371
{-# INLINE getConnShortLinkAsync #-}
372372

373373
-- | Join SMP agent connection (JOIN command) asynchronously, synchronous response is new connection id.
@@ -1041,14 +1041,22 @@ setConnShortLinkAsync' c corrId connId userLinkData clientData =
10411041
_ -> throwE $ CMD PROHIBITED "setConnShortLinkAsync: invalid connection or mode"
10421042
enqueueCommand c corrId connId (Just srv) $ AClientCommand $ LSET userLinkData clientData
10431043

1044-
getConnShortLinkAsync' :: AgentClient -> UserId -> ACorrId -> ConnShortLink 'CMContact -> AM ConnId
1045-
getConnShortLinkAsync' c userId corrId shortLink@(CSLContact _ _ srv _) = do
1046-
g <- asks random
1047-
connId <- withStore c $ \db -> do
1048-
-- server is created so the command is processed in server queue,
1049-
-- not blocking other "no server" commands
1050-
void $ createServer db srv
1051-
prepareNewConn db g
1044+
getConnShortLinkAsync' :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> ConnShortLink 'CMContact -> AM ConnId
1045+
getConnShortLinkAsync' c userId corrId connId_ shortLink@(CSLContact _ _ srv _) = do
1046+
connId <- case connId_ of
1047+
Just existingConnId -> do
1048+
-- connId and srv can be unrelated: connId is used as "mailbox" for LDATA delivery,
1049+
-- while srv is the short link's server for the LGET request.
1050+
-- E.g., owner's relay connection (connId, on server A) fetches relay's group link data (srv = server B).
1051+
-- This works because enqueueCommand stores (connId, srv) independently in the commands table,
1052+
-- the network request targets srv, and event delivery uses connId via corrId correlation.
1053+
withStore' c $ \db -> void $ createServer db srv
1054+
pure existingConnId
1055+
Nothing -> do
1056+
g <- asks random
1057+
withStore c $ \db -> do
1058+
void $ createServer db srv
1059+
prepareNewConn db g
10521060
enqueueCommand c corrId connId (Just srv) $ AClientCommand $ LGET shortLink
10531061
pure connId
10541062
where

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -808,13 +808,17 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
808808
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
809809
(Just <$> getSessVar workerSeq tSess smpSubWorkers ts)
810810
newSubWorker v = do
811-
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
811+
a <- async $ void $ E.tryAny $ runSubWorker v
812812
atomically $ putTMVar (sessionVar v) a
813-
runSubWorker = do
813+
runSubWorker v = do
814814
ri <- asks $ reconnectInterval . config
815815
withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do
816-
(pendingSubs, pendingSS) <- atomically $ SS.getPendingSubs tSess $ currentSubs c
817-
unless (M.null pendingSubs && isNothing pendingSS) $ do
816+
pending_ <- atomically $ do
817+
pending@(pendingSubs, pendingSS) <- SS.getPendingSubs tSess $ currentSubs c
818+
if M.null pendingSubs && isNothing pendingSS
819+
then cleanup v $> Nothing
820+
else pure $ Just pending
821+
forM_ pending_ $ \(pendingSubs, pendingSS) -> do
818822
liftIO $ waitUntilForeground c
819823
liftIO $ waitForUserNetwork c
820824
mapM_ (handleNotify . void . runExceptT . resubscribeClientService c tSess) pendingSS

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,14 @@ reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} s
324324
(Just <$> getSessVar workerSeq srv smpSubWorkers ts)
325325
newSubWorker :: SessionVar (Async ()) -> IO ()
326326
newSubWorker v = do
327-
a <- async $ void (E.try @E.SomeException runSubWorker) >> atomically (cleanup v)
327+
a <- async $ void $ E.try @E.SomeException $ runSubWorker v
328328
atomically $ putTMVar (sessionVar v) a
329-
runSubWorker =
329+
runSubWorker v =
330330
withRetryInterval (reconnectInterval agentCfg) $ \_ loop -> do
331-
subs <- getPending TM.lookupIO readTVarIO
331+
subs <- atomically $ do
332+
s <- getPending TM.lookup readTVar
333+
when (noPending s) $ cleanup v
334+
pure s
332335
unless (noPending subs) $ whenM (readTVarIO active) $ do
333336
void $ netTimeoutInt tcpConnectTimeout NRMBackground `timeout` runExceptT (reconnectSMPClient ca srv subs)
334337
loop

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2740,7 +2740,7 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob ->
27402740
newLinkData = UserContactLinkData userCtData
27412741
(_, CCLink qInfo (Just shortLink)) <- A.createConnection alice NRMInteractive 1 True True SCMContact (Just newLinkData) Nothing IKPQOn SMSubscribe
27422742
-- get link data async - creates new connection for bob
2743-
newId <- getConnShortLinkAsync bob 1 "1" shortLink
2743+
newId <- getConnShortLinkAsync bob 1 "1" Nothing shortLink
27442744
("1", newId', LDATA FixedLinkData {linkConnReq = qInfo'} (ContactLinkData _ userCtData')) <- get bob
27452745
liftIO $ newId' `shouldBe` newId
27462746
liftIO $ qInfo' `shouldBe` qInfo

0 commit comments

Comments
 (0)