Skip to content

Commit beafac1

Browse files
authored
agent: make agent workers usable from other contexts (#1614)
1 parent a2d777b commit beafac1

File tree

12 files changed

+119
-160
lines changed

12 files changed

+119
-160
lines changed

src/Simplex/FileTransfer/Agent.hs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ import Simplex.Messaging.Encoding
7575
import Simplex.Messaging.Encoding.String (strDecode, strEncode)
7676
import Simplex.Messaging.Protocol (ProtocolServer, ProtocolType (..), XFTPServer)
7777
import qualified Simplex.Messaging.TMap as TM
78-
import Simplex.Messaging.Util (catchAll_, liftError, tshow, unlessM, whenM)
78+
import Simplex.Messaging.Util (allFinally, catchAll_, catchAllErrors, liftError, tshow, unlessM, whenM)
7979
import System.FilePath (takeFileName, (</>))
8080
import UnliftIO
8181
import UnliftIO.Directory
@@ -198,10 +198,10 @@ runXFTPRcvWorker c srv Worker {doWork} = do
198198
liftIO $ waitForUserNetwork c
199199
atomically $ incXFTPServerStat c userId srv downloadAttempts
200200
downloadFileChunk fc replica approvedRelays
201-
`catchAgentError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e
201+
`catchAllErrors` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e
202202
where
203203
retryLoop loop e replicaDelay = do
204-
flip catchAgentError (\_ -> pure ()) $ do
204+
flip catchAllErrors (\_ -> pure ()) $ do
205205
when (serverHostError e) $ notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFWARN e)
206206
liftIO $ closeXFTPServerClient c userId server digest
207207
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
@@ -280,7 +280,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
280280
runXFTPOperation AgentConfig {rcvFilesTTL} =
281281
withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $
282282
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath, redirect} ->
283-
decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId (redirectEntityId <$> redirect) tmpPath
283+
decryptFile f `catchAllErrors` rcvWorkerInternalError c rcvFileId rcvFileEntityId (redirectEntityId <$> redirect) tmpPath
284284
decryptFile :: RcvFile -> AM ()
285285
decryptFile RcvFile {rcvFileId, rcvFileEntityId, size, digest, key, nonce, tmpPath, saveFile, status, chunks, redirect} = do
286286
let CryptoFile savePath cfArgs = saveFile
@@ -307,7 +307,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
307307
liftIO $ waitUntilForeground c
308308
withStore' c (`updateRcvFileComplete` rcvFileId)
309309
-- proceed with redirect
310-
yaml <- liftError (FILE . FILE_IO . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `agentFinally` (lift $ toFSFilePath fsSavePath >>= removePath)
310+
yaml <- liftError (FILE . FILE_IO . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `allFinally` (lift $ toFSFilePath fsSavePath >>= removePath)
311311
next@FileDescription {chunks = nextChunks} <- case strDecode (LB.toStrict yaml) of
312312
-- TODO switch to another error constructor
313313
Left _ -> throwE . FILE $ REDIRECT "decode error"
@@ -399,7 +399,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
399399
runXFTPOperation cfg@AgentConfig {sndFilesTTL} =
400400
withWork c doWork (`getNextSndFileToPrepare` sndFilesTTL) $
401401
\f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
402-
prepareFile cfg f `catchAgentError` sndWorkerInternalError c sndFileId sndFileEntityId prefixPath
402+
prepareFile cfg f `catchAllErrors` sndWorkerInternalError c sndFileId sndFileEntityId prefixPath
403403
prepareFile :: AgentConfig -> SndFile -> AM ()
404404
prepareFile _ SndFile {prefixPath = Nothing} =
405405
throwE $ INTERNAL "no prefix path"
@@ -468,11 +468,11 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
468468
liftIO $ waitForUserNetwork c
469469
let triedAllSrvs = n > userSrvCount
470470
createWithNextSrv triedHosts
471-
`catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop triedAllSrvs e) (throwE e) e
471+
`catchAllErrors` \e -> retryOnError "XFTP prepare worker" (retryLoop loop triedAllSrvs e) (throwE e) e
472472
where
473473
-- we don't do closeXFTPServerClient here to not risk closing connection for concurrent chunk upload
474474
retryLoop loop triedAllSrvs e = do
475-
flip catchAgentError (\_ -> pure ()) $ do
475+
flip catchAllErrors (\_ -> pure ()) $ do
476476
when (triedAllSrvs && serverHostError e) $ notify c sndFileEntityId $ SFWARN e
477477
liftIO $ assertAgentForeground c
478478
loop
@@ -508,10 +508,10 @@ runXFTPSndWorker c srv Worker {doWork} = do
508508
liftIO $ waitForUserNetwork c
509509
atomically $ incXFTPServerStat c userId srv uploadAttempts
510510
uploadFileChunk cfg fc replica
511-
`catchAgentError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e
511+
`catchAllErrors` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e
512512
where
513513
retryLoop loop e replicaDelay = do
514-
flip catchAgentError (\_ -> pure ()) $ do
514+
flip catchAllErrors (\_ -> pure ()) $ do
515515
when (serverHostError e) $ notify c sndFileEntityId $ SFWARN e
516516
liftIO $ closeXFTPServerClient c userId server digest
517517
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
@@ -681,10 +681,10 @@ runXFTPDelWorker c srv Worker {doWork} = do
681681
liftIO $ waitForUserNetwork c
682682
atomically $ incXFTPServerStat c userId srv deleteAttempts
683683
deleteChunkReplica
684-
`catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e
684+
`catchAllErrors` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e
685685
where
686686
retryLoop loop e replicaDelay = do
687-
flip catchAgentError (\_ -> pure ()) $ do
687+
flip catchAllErrors (\_ -> pure ()) $ do
688688
when (serverHostError e) $ notify c "" $ SFWARN e
689689
liftIO $ closeXFTPServerClient c userId server chunkDigest
690690
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay

src/Simplex/Messaging/Agent.hs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -284,13 +284,13 @@ saveServersStats c@AgentClient {subQ, smpServersStats, xftpServersStats, ntfServ
284284
xss <- mapM (liftIO . getAgentXFTPServerStats) =<< readTVarIO xftpServersStats
285285
nss <- mapM (liftIO . getAgentNtfServerStats) =<< readTVarIO ntfServersStats
286286
let stats = AgentPersistedServerStats {smpServersStats = sss, xftpServersStats = xss, ntfServersStats = OptionalMap nss}
287-
tryAgentError' (withStore' c (`updateServersStats` stats)) >>= \case
287+
tryAllErrors' (withStore' c (`updateServersStats` stats)) >>= \case
288288
Left e -> atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL $ show e)
289289
Right () -> pure ()
290290

291291
restoreServersStats :: AgentClient -> AM' ()
292292
restoreServersStats c@AgentClient {smpServersStats, xftpServersStats, ntfServersStats, srvStatsStartedAt} = do
293-
tryAgentError' (withStore c getServersStats) >>= \case
293+
tryAllErrors' (withStore c getServersStats) >>= \case
294294
Left e -> atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ INTERNAL $ show e)
295295
Right (startedAt, Nothing) -> atomically $ writeTVar srvStatsStartedAt startedAt
296296
Right (startedAt, Just AgentPersistedServerStats {smpServersStats = sss, xftpServersStats = xss, ntfServersStats = OptionalMap nss}) -> do
@@ -774,7 +774,7 @@ acceptContactAsync' :: AgentClient -> UserId -> ACorrId -> Bool -> InvitationId
774774
acceptContactAsync' c userId corrId enableNtfs invId ownConnInfo pqSupport subMode = do
775775
Invitation {connReq} <- withStore c $ \db -> getInvitation db "acceptContactAsync'" invId
776776
withStore' c $ \db -> acceptInvitation db invId ownConnInfo
777-
joinConnAsync c userId corrId enableNtfs connReq ownConnInfo pqSupport subMode `catchAgentError` \err -> do
777+
joinConnAsync c userId corrId enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do
778778
withStore' c (`unacceptInvitation` invId)
779779
throwE err
780780

@@ -961,7 +961,7 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userData_ clientData pqInitKey
961961
createRcvQueue nonce_ qd e2eKeys = do
962962
AgentConfig {smpClientVRange = vr} <- asks config
963963
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
964-
(rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAgentError` \e -> liftIO (print e) >> throwE e
964+
(rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
965965
atomically $ incSMPServerStat c userId srv connCreated
966966
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq
967967
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
@@ -1351,7 +1351,7 @@ subscribeClientService' = undefined
13511351

13521352
-- requesting messages sequentially, to reduce memory usage
13531353
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
1354-
getConnectionMessages' c = mapM $ tryAgentError' . getConnectionMessage
1354+
getConnectionMessages' c = mapM $ tryAllErrors' . getConnectionMessage
13551355
where
13561356
getConnectionMessage :: ConnMsgReq -> AM (Maybe SMPMsgMeta)
13571357
getConnectionMessage (ConnMsgReq connId dbQueueId msgTs_) = do
@@ -1363,7 +1363,7 @@ getConnectionMessages' c = mapM $ tryAgentError' . getConnectionMessage
13631363
ContactConnection _ rq -> pure rq
13641364
SndConnection _ _ -> throwE $ CONN SIMPLEX "getConnectionMessage"
13651365
NewConnection _ -> throwE $ CMD PROHIBITED "getConnectionMessage: NewConnection"
1366-
msg_ <- getQueueMessage c rq `catchAgentError` \e -> atomically (releaseGetLock c rq) >> throwError e
1366+
msg_ <- getQueueMessage c rq `catchAllErrors` \e -> atomically (releaseGetLock c rq) >> throwError e
13671367
when (isNothing msg_) $ do
13681368
atomically $ releaseGetLock c rq
13691369
forM_ msgTs_ $ \msgTs -> withStore' c $ \db -> setLastBrokerTs db connId (DBEntityId dbQueueId) msgTs
@@ -1534,7 +1534,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
15341534
pure CCCompleted
15351535
-- duplex connection is matched to handle SKEY retries
15361536
DuplexConnection cData _ (sq :| _) -> do
1537-
tryAgentError (mapM_ (connectReplyQueues c cData ownConnInfo (Just sq)) (L.nonEmpty $ smpReplyQueues senderConf)) >>= \case
1537+
tryAllErrors (mapM_ (connectReplyQueues c cData ownConnInfo (Just sq)) (L.nonEmpty $ smpReplyQueues senderConf)) >>= \case
15381538
Right () -> pure CCCompleted
15391539
Left e
15401540
| temporaryOrHostError e && Just server /= server_ -> do
@@ -1621,7 +1621,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
16211621
tryMoveableCommand action = withRetryInterval ri $ \_ loop -> do
16221622
liftIO $ waitWhileSuspended c
16231623
liftIO $ waitForUserNetwork c
1624-
tryAgentError action >>= \case
1624+
tryAllErrors action >>= \case
16251625
Left e
16261626
| temporaryOrHostError e -> retrySndOp c loop
16271627
| otherwise -> cmdError e
@@ -2065,7 +2065,7 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni
20652065
ackQueueMessage :: AgentClient -> RcvQueue -> SMP.MsgId -> AM ()
20662066
ackQueueMessage c rq@RcvQueue {userId, connId, server} srvMsgId = do
20672067
atomically $ incSMPServerStat c userId server ackAttempts
2068-
tryAgentError (sendAck c rq srvMsgId) >>= \case
2068+
tryAllErrors (sendAck c rq srvMsgId) >>= \case
20692069
Right _ -> sendMsgNtf ackMsgs
20702070
Left (SMP _ SMP.NO_MSG) -> sendMsgNtf ackNoMsgErrs
20712071
Left e -> do
@@ -2076,7 +2076,7 @@ ackQueueMessage c rq@RcvQueue {userId, connId, server} srvMsgId = do
20762076
atomically $ incSMPServerStat c userId server stat
20772077
whenM (liftIO $ hasGetLock c rq) $ do
20782078
atomically $ releaseGetLock c rq
2079-
brokerTs_ <- eitherToMaybe <$> tryAgentError (withStore c $ \db -> getRcvMsgBrokerTs db connId srvMsgId)
2079+
brokerTs_ <- eitherToMaybe <$> tryAllErrors (withStore c $ \db -> getRcvMsgBrokerTs db connId srvMsgId)
20802080
atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ MSGNTF srvMsgId brokerTs_)
20812081

20822082
-- | Suspend SMP agent connection (OFF command) in Reader monad
@@ -2307,7 +2307,7 @@ registerNtfToken' c nm suppliedDeviceToken suppliedNtfMode =
23072307
replaceToken :: NtfTokenId -> AM NtfTknStatus
23082308
replaceToken tknId = do
23092309
ns <- asks ntfSupervisor
2310-
tryReplace ns `catchAgentError` \e ->
2310+
tryReplace ns `catchAllErrors` \e ->
23112311
if temporaryOrHostError e
23122312
then throwE e
23132313
else do
@@ -2564,7 +2564,7 @@ cleanupManager c@AgentClient {subQ} = do
25642564
where
25652565
run :: forall e. AEntityI e => (AgentErrorType -> AEvent e) -> AM () -> AM' ()
25662566
run err a = do
2567-
waitActive . runExceptT $ a `catchAgentError` (notify "" . err)
2567+
waitActive . runExceptT $ a `catchAllErrors` (notify "" . err)
25682568
step <- asks $ cleanupStepInterval . config
25692569
liftIO $ threadDelay step
25702570
-- we are catching it to avoid CRITICAL errors in tests when this is the only remaining handle to active
@@ -2578,33 +2578,33 @@ cleanupManager c@AgentClient {subQ} = do
25782578
deleteRcvFilesExpired = do
25792579
rcvFilesTTL <- asks $ rcvFilesTTL . config
25802580
rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL)
2581-
forM_ rcvExpired $ \(dbId, entId, p) -> flip catchAgentError (notify entId . RFERR) $ do
2581+
forM_ rcvExpired $ \(dbId, entId, p) -> flip catchAllErrors (notify entId . RFERR) $ do
25822582
lift $ removePath =<< toFSFilePath p
25832583
withStore' c (`deleteRcvFile'` dbId)
25842584
deleteRcvFilesDeleted = do
25852585
rcvDeleted <- withStore' c getCleanupRcvFilesDeleted
2586-
forM_ rcvDeleted $ \(dbId, entId, p) -> flip catchAgentError (notify entId . RFERR) $ do
2586+
forM_ rcvDeleted $ \(dbId, entId, p) -> flip catchAllErrors (notify entId . RFERR) $ do
25872587
lift $ removePath =<< toFSFilePath p
25882588
withStore' c (`deleteRcvFile'` dbId)
25892589
deleteRcvFilesTmpPaths = do
25902590
rcvTmpPaths <- withStore' c getCleanupRcvFilesTmpPaths
2591-
forM_ rcvTmpPaths $ \(dbId, entId, p) -> flip catchAgentError (notify entId . RFERR) $ do
2591+
forM_ rcvTmpPaths $ \(dbId, entId, p) -> flip catchAllErrors (notify entId . RFERR) $ do
25922592
lift $ removePath =<< toFSFilePath p
25932593
withStore' c (`updateRcvFileNoTmpPath` dbId)
25942594
deleteSndFilesExpired = do
25952595
sndFilesTTL <- asks $ sndFilesTTL . config
25962596
sndExpired <- withStore' c (`getSndFilesExpired` sndFilesTTL)
2597-
forM_ sndExpired $ \(dbId, entId, p) -> flip catchAgentError (notify entId . SFERR) $ do
2597+
forM_ sndExpired $ \(dbId, entId, p) -> flip catchAllErrors (notify entId . SFERR) $ do
25982598
lift . forM_ p $ removePath <=< toFSFilePath
25992599
withStore' c (`deleteSndFile'` dbId)
26002600
deleteSndFilesDeleted = do
26012601
sndDeleted <- withStore' c getCleanupSndFilesDeleted
2602-
forM_ sndDeleted $ \(dbId, entId, p) -> flip catchAgentError (notify entId . SFERR) $ do
2602+
forM_ sndDeleted $ \(dbId, entId, p) -> flip catchAllErrors (notify entId . SFERR) $ do
26032603
lift . forM_ p $ removePath <=< toFSFilePath
26042604
withStore' c (`deleteSndFile'` dbId)
26052605
deleteSndFilesPrefixPaths = do
26062606
sndPrefixPaths <- withStore' c getCleanupSndFilesPrefixPaths
2607-
forM_ sndPrefixPaths $ \(dbId, entId, p) -> flip catchAgentError (notify entId . SFERR) $ do
2607+
forM_ sndPrefixPaths $ \(dbId, entId, p) -> flip catchAllErrors (notify entId . SFERR) $ do
26082608
lift $ removePath =<< toFSFilePath p
26092609
withStore' c (`updateSndFileNoPrefixPath` dbId)
26102610
deleteExpiredReplicasForDeletion = do
@@ -2652,10 +2652,10 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
26522652
where
26532653
withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' ()
26542654
withRcvConn rId a = do
2655-
tryAgentError' (withStore c $ \db -> getRcvConn db srv rId) >>= \case
2655+
tryAllErrors' (withStore c $ \db -> getRcvConn db srv rId) >>= \case
26562656
Left e -> notify' "" (ERR e)
26572657
Right (rq@RcvQueue {connId}, SomeConn _ conn) ->
2658-
tryAgentError' (a rq conn) >>= \case
2658+
tryAllErrors' (a rq conn) >>= \case
26592659
Left e -> notify' connId (ERR e)
26602660
Right () -> pure ()
26612661
processSubOk :: RcvQueue -> TVar [ConnId] -> AM ()
@@ -2739,7 +2739,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
27392739
_ -> pure ()
27402740
let encryptedMsgHash = C.sha256Hash encAgentMessage
27412741
g <- asks random
2742-
tryAgentError (agentClientMsg g encryptedMsgHash) >>= \case
2742+
tryAllErrors (agentClientMsg g encryptedMsgHash) >>= \case
27432743
Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do
27442744
conn'' <- resetRatchetSync
27452745
case aMessage of
@@ -2848,7 +2848,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
28482848
ackDel :: InternalId -> AM ACKd
28492849
ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd
28502850
handleNotifyAck :: AM ACKd -> AM ACKd
2851-
handleNotifyAck m = m `catchAgentError` \e -> notify (ERR e) >> ack
2851+
handleNotifyAck m = m `catchAllErrors` \e -> notify (ERR e) >> ack
28522852
SMP.END ->
28532853
atomically (ifM (activeClientSession c tSess sessId) (removeSubscription c connId $> True) (pure False))
28542854
>>= notifyEnd
@@ -3006,7 +3006,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
30063006
messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> AM ACKd
30073007
messagesRcvd rcpts msgMeta@MsgMeta {broker = (srvMsgId, _)} _ = do
30083008
logServer "<--" c srv rId $ "MSG <RCPT>:" <> logSecret' srvMsgId
3009-
rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing
3009+
rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAllErrors` \e -> notify (ERR e) $> Nothing
30103010
case L.nonEmpty . catMaybes $ L.toList rs of
30113011
Just rs' -> notify (RCVD msgMeta rs') $> ACKPending
30123012
Nothing -> ack

0 commit comments

Comments
 (0)