Skip to content

Commit cb59a44

Browse files
authored
agent: return error and message absence differently when getting notification messages (#1535)
* agent: return error and message absence differently when getting notification messages * fix test * mapM * inline nse functions, release lock on error or no message
1 parent a632eea commit cb59a44

File tree

5 files changed

+20
-17
lines changed

5 files changed

+20
-17
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ import Simplex.RemoteControl.Client
224224
import Simplex.RemoteControl.Invitation
225225
import Simplex.RemoteControl.Types
226226
import System.Mem.Weak (deRefWeak)
227-
import UnliftIO.Async (mapConcurrently)
228227
import UnliftIO.Concurrent (forkFinally, forkIO, killThread, mkWeakThreadId, threadDelay)
229228
import qualified UnliftIO.Exception as E
230229
import UnliftIO.STM
@@ -440,7 +439,7 @@ subscribeConnections c = withAgentEnv c . subscribeConnections' c
440439
{-# INLINE subscribeConnections #-}
441440

442441
-- | Get messages for connections (GET commands)
443-
getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Maybe SMPMsgMeta))
442+
getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
444443
getConnectionMessages c = withAgentEnv' c . getConnectionMessages' c
445444
{-# INLINE getConnectionMessages #-}
446445

@@ -1277,26 +1276,26 @@ resubscribeConnections' c connIds = do
12771276
-- union is left-biased, so results returned by subscribeConnections' take precedence
12781277
(`M.union` r) <$> subscribeConnections' c connIds'
12791278

1280-
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Maybe SMPMsgMeta))
1281-
getConnectionMessages' c =
1282-
mapConcurrently $ \cmr ->
1283-
getConnectionMessage cmr `catchAgentError'` \e -> do
1284-
logError $ "Error loading message: " <> tshow e
1285-
pure Nothing
1279+
-- requesting messages sequentially, to reduce memory usage
1280+
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
1281+
getConnectionMessages' c = mapM $ tryAgentError' . getConnectionMessage
12861282
where
12871283
getConnectionMessage :: ConnMsgReq -> AM (Maybe SMPMsgMeta)
12881284
getConnectionMessage (ConnMsgReq connId dbQueueId msgTs_) = do
12891285
whenM (atomically $ hasActiveSubscription c connId) . throwE $ CMD PROHIBITED "getConnectionMessage: subscribed"
12901286
SomeConn _ conn <- withStore c (`getConn` connId)
1291-
msg_ <- case conn of
1292-
DuplexConnection _ (rq :| _) _ -> getQueueMessage c rq
1293-
RcvConnection _ rq -> getQueueMessage c rq
1294-
ContactConnection _ rq -> getQueueMessage c rq
1287+
rq <- case conn of
1288+
DuplexConnection _ (rq :| _) _ -> pure rq
1289+
RcvConnection _ rq -> pure rq
1290+
ContactConnection _ rq -> pure rq
12951291
SndConnection _ _ -> throwE $ CONN SIMPLEX
12961292
NewConnection _ -> throwE $ CMD PROHIBITED "getConnectionMessage: NewConnection"
1297-
when (isNothing msg_) $
1293+
msg_ <- getQueueMessage c rq `catchAgentError` \e -> atomically (releaseGetLock c rq) >> throwError e
1294+
when (isNothing msg_) $ do
1295+
atomically $ releaseGetLock c rq
12981296
forM_ msgTs_ $ \msgTs -> withStore' c $ \db -> setLastBrokerTs db connId (DBQueueId dbQueueId) msgTs
12991297
pure msg_
1298+
{-# INLINE getConnectionMessages' #-}
13001299

13011300
getNotificationConns' :: AgentClient -> C.CbNonce -> ByteString -> AM (NonEmpty NotificationInfo)
13021301
getNotificationConns' c nonce encNtfInfo =
@@ -1330,6 +1329,7 @@ getNotificationConns' c nonce encNtfInfo =
13301329
Just SMP.NMsgMeta {msgTs}
13311330
| maybe True (systemToUTCTime msgTs >) lastBrokerTs_ -> Just ntfInfo
13321331
_ -> Nothing
1332+
{-# INLINE getNotificationConns' #-}
13331333

13341334
-- | Send message to the connection (SEND command) in Reader monad
13351335
sendMessage' :: AgentClient -> ConnId -> PQEncryption -> MsgFlags -> MsgBody -> AM (AgentMsgId, PQEncryption)

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,6 +1654,7 @@ getQueueMessage c rq@RcvQueue {server, rcvId, rcvPrivateKey} = do
16541654
l <- maybe (newTMVar ()) pure l_
16551655
takeTMVar l
16561656
pure $ Just l
1657+
{-# INLINE getQueueMessage #-}
16571658

16581659
decryptSMPMessage :: RcvQueue -> SMP.RcvMessage -> AM SMP.ClientRcvMsgBody
16591660
decryptSMPMessage rq SMP.RcvMessage {msgId, msgBody = SMP.EncRcvMsgBody body} =
@@ -1743,10 +1744,12 @@ sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId =
17431744
hasGetLock :: AgentClient -> RcvQueue -> IO Bool
17441745
hasGetLock c RcvQueue {server, rcvId} =
17451746
TM.memberIO (server, rcvId) $ getMsgLocks c
1747+
{-# INLINE hasGetLock #-}
17461748

17471749
releaseGetLock :: AgentClient -> RcvQueue -> STM ()
17481750
releaseGetLock c RcvQueue {server, rcvId} =
17491751
TM.lookup (server, rcvId) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ())
1752+
{-# INLINE releaseGetLock #-}
17501753

17511754
suspendQueue :: AgentClient -> RcvQueue -> AM ()
17521755
suspendQueue c rq@RcvQueue {rcvId, rcvPrivateKey} =

src/Simplex/Messaging/Client.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,7 @@ getSMPMessage c rpKey rId =
801801
OK -> pure Nothing
802802
cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg
803803
r -> throwE $ unexpectedResponse r
804+
{-# INLINE getSMPMessage #-}
804805

805806
-- | Subscribe to the SMP queue notifications.
806807
--

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1946,7 +1946,7 @@ testOnlyCreatePullSlowHandshake = withAgentClientsCfg2 agentProxyCfgV8 agentProx
19461946
getMsg :: AgentClient -> ConnId -> ExceptT AgentErrorType IO a -> ExceptT AgentErrorType IO a
19471947
getMsg c cId action = do
19481948
liftIO $ noMessages c "nothing should be delivered before GET"
1949-
[Just _] <- lift $ getConnectionMessages c [ConnMsgReq cId 1 Nothing]
1949+
[Right (Just _)] <- lift $ getConnectionMessages c [ConnMsgReq cId 1 Nothing]
19501950
action
19511951

19521952
getMSGNTF :: AgentClient -> ConnId -> ExceptT AgentErrorType IO ()

tests/AgentTests/NotificationTests.hs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -562,8 +562,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
562562
Right [NotificationInfo {ntfConnId = cId, ntfMsgMeta = Just NMsgMeta {msgTs}}] <- runExceptT $ getNotificationConns alice nonce message
563563
cId `shouldBe` bobId
564564
-- alice client already has subscription for the connection,
565-
-- so get fails with CMD PROHIBITED (transformed into Nothing in catch)
566-
[Nothing] <- getConnectionMessages alice [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs]
565+
[Left (CMD PROHIBITED _)] <- getConnectionMessages alice [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs]
567566

568567
threadDelay 500000
569568
suspendAgent alice 0
@@ -573,7 +572,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
573572

574573
-- aliceNtf client doesn't have subscription and is allowed to get notification message
575574
withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> do
576-
(Just SMPMsgMeta {msgFlags = MsgFlags True}) :| _ <- getConnectionMessages aliceNtf [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs]
575+
(Right (Just SMPMsgMeta {msgFlags = MsgFlags True})) :| _ <- getConnectionMessages aliceNtf [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs]
577576
pure ()
578577

579578
threadDelay 1000000

0 commit comments

Comments
 (0)