Skip to content

Commit b3c8358

Browse files
authored
agent: combine connection deletion events (#1442)
1 parent 817f5e1 commit b3c8358

File tree

3 files changed

+44
-54
lines changed

3 files changed

+44
-54
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1805,8 +1805,8 @@ prepareDeleteConnections_ getConnections c waitDelivery connIds = do
18051805
-- ! if it was used to notify about the result, it might be necessary to differentiate
18061806
-- ! between completed deletions of connections, and deletions delayed due to wait for delivery (see deleteConn)
18071807
deliveryTimeout <- if waitDelivery then asks (Just . connDeleteDeliveryTimeout . config) else pure Nothing
1808-
rs' <- lift $ catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) (M.keys delRs))
1809-
forM_ rs' $ \cId -> notify ("", cId, AEvt SAEConn DEL_CONN)
1808+
cIds_ <- lift $ L.nonEmpty . catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) (M.keys delRs))
1809+
forM_ cIds_ $ \cIds -> notify ("", "", AEvt SAEConn $ DEL_CONNS cIds)
18101810
pure (errs' <> delRs, rqs, connIds')
18111811
where
18121812
rcvQueues :: SomeConn -> Either (Either AgentErrorType ()) [RcvQueue]
@@ -1826,32 +1826,33 @@ deleteConnQueues c waitDelivery ntf rqs = do
18261826
rs <- connResults <$> (deleteQueueRecs =<< deleteQueues c rqs)
18271827
let connIds = M.keys $ M.filter isRight rs
18281828
deliveryTimeout <- if waitDelivery then asks (Just . connDeleteDeliveryTimeout . config) else pure Nothing
1829-
rs' <- catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) connIds)
1830-
forM_ rs' $ \cId -> notify ("", cId, AEvt SAEConn DEL_CONN)
1829+
cIds_ <- L.nonEmpty . catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) connIds)
1830+
forM_ cIds_ $ \cIds -> notify ("", "", AEvt SAEConn $ DEL_CONNS cIds)
18311831
pure rs
18321832
where
18331833
deleteQueueRecs :: [(RcvQueue, Either AgentErrorType ())] -> AM' [(RcvQueue, Either AgentErrorType ())]
18341834
deleteQueueRecs rs = do
18351835
maxErrs <- asks $ deleteErrorCount . config
1836-
(rs', notifyActions) <- unzip . rights <$> withStoreBatch' c (\db -> map (deleteQueueRec db maxErrs) rs)
1837-
mapM_ sequence_ notifyActions
1838-
pure rs'
1836+
rs' <- rights <$> withStoreBatch' c (\db -> map (deleteQueueRec db maxErrs) rs)
1837+
let delQ ((rq, _), err_) = (qConnId rq,qServer rq,queueId rq,) <$> err_
1838+
delQs_ = L.nonEmpty $ mapMaybe delQ rs'
1839+
forM_ delQs_ $ \delQs -> notify ("", "", AEvt SAEConn $ DEL_RCVQS delQs)
1840+
pure $ map fst rs'
18391841
where
18401842
deleteQueueRec ::
18411843
DB.Connection ->
18421844
Int ->
18431845
(RcvQueue, Either AgentErrorType ()) ->
1844-
IO ((RcvQueue, Either AgentErrorType ()), Maybe (AM' ()))
1846+
IO ((RcvQueue, Either AgentErrorType ()), Maybe (Maybe AgentErrorType)) -- Nothing - no event, Just Nothing - no error
18451847
deleteQueueRec db maxErrs (rq@RcvQueue {userId, server}, r) = case r of
1846-
Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just (notifyRQ rq Nothing))
1848+
Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just Nothing)
18471849
Left e
18481850
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> incRcvDeleteErrors db rq $> ((rq, r), Nothing)
18491851
| otherwise -> do
18501852
deleteConnRcvQueue db rq
18511853
-- attempts and successes are counted in deleteQueues function
18521854
atomically $ incSMPServerStat c userId server connDeleted
1853-
pure ((rq, Right ()), Just (notifyRQ rq (Just e)))
1854-
notifyRQ rq e_ = notify ("", qConnId rq, AEvt SAEConn $ DEL_RCVQ (qServer rq) (queueId rq) e_)
1855+
pure ((rq, Right ()), Just (Just e))
18551856
notify = when ntf . atomically . writeTBQueue (subQ c)
18561857
connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ())
18571858
connResults = M.map snd . foldl' addResult M.empty

src/Simplex/Messaging/Agent/Protocol.hs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
{-# LANGUAGE CPP #-}
21
{-# LANGUAGE DataKinds #-}
32
{-# LANGUAGE DeriveAnyClass #-}
43
{-# LANGUAGE DuplicateRecordFields #-}
@@ -168,12 +167,13 @@ import Data.Time.Clock.System (SystemTime)
168167
import Data.Type.Equality
169168
import Data.Typeable ()
170169
import Data.Word (Word16, Word32)
170+
import Database.SQLite.Simple.FromField
171+
import Database.SQLite.Simple.ToField
171172
import Simplex.FileTransfer.Description
172173
import Simplex.FileTransfer.Protocol (FileParty (..))
173174
import Simplex.FileTransfer.Transport (XFTPErrorType)
174175
import Simplex.FileTransfer.Types (FileErrorType)
175176
import Simplex.Messaging.Agent.QueryString
176-
import Simplex.Messaging.Agent.Store.DB (Binary (..))
177177
import Simplex.Messaging.Client (ProxyClientError)
178178
import qualified Simplex.Messaging.Crypto as C
179179
import Simplex.Messaging.Crypto.Ratchet
@@ -224,13 +224,6 @@ import Simplex.Messaging.Version
224224
import Simplex.Messaging.Version.Internal
225225
import Simplex.RemoteControl.Types
226226
import UnliftIO.Exception (Exception)
227-
#if defined(dbPostgres)
228-
import Database.PostgreSQL.Simple.FromField (FromField (..))
229-
import Database.PostgreSQL.Simple.ToField (ToField (..))
230-
#else
231-
import Database.SQLite.Simple.FromField (FromField (..))
232-
import Database.SQLite.Simple.ToField (ToField (..))
233-
#endif
234227

235228
-- SMP agent protocol version history:
236229
-- 1 - binary protocol encoding (1/1/2022)
@@ -366,8 +359,8 @@ data AEvent (e :: AEntity) where
366359
MSGNTF :: MsgId -> Maybe UTCTime -> AEvent AEConn
367360
RCVD :: MsgMeta -> NonEmpty MsgReceipt -> AEvent AEConn
368361
QCONT :: AEvent AEConn
369-
DEL_RCVQ :: SMPServer -> SMP.RecipientId -> Maybe AgentErrorType -> AEvent AEConn
370-
DEL_CONN :: AEvent AEConn
362+
DEL_RCVQS :: NonEmpty (ConnId, SMPServer, SMP.RecipientId, Maybe AgentErrorType) -> AEvent AEConn
363+
DEL_CONNS :: NonEmpty ConnId -> AEvent AEConn
371364
DEL_USER :: Int64 -> AEvent AENone
372365
STAT :: ConnectionStats -> AEvent AEConn
373366
OK :: AEvent AEConn
@@ -437,8 +430,8 @@ data AEventTag (e :: AEntity) where
437430
MSGNTF_ :: AEventTag AEConn
438431
RCVD_ :: AEventTag AEConn
439432
QCONT_ :: AEventTag AEConn
440-
DEL_RCVQ_ :: AEventTag AEConn
441-
DEL_CONN_ :: AEventTag AEConn
433+
DEL_RCVQS_ :: AEventTag AEConn
434+
DEL_CONNS_ :: AEventTag AEConn
442435
DEL_USER_ :: AEventTag AENone
443436
STAT_ :: AEventTag AEConn
444437
OK_ :: AEventTag AEConn
@@ -492,8 +485,8 @@ aEventTag = \case
492485
MSGNTF {} -> MSGNTF_
493486
RCVD {} -> RCVD_
494487
QCONT -> QCONT_
495-
DEL_RCVQ {} -> DEL_RCVQ_
496-
DEL_CONN -> DEL_CONN_
488+
DEL_RCVQS _ -> DEL_RCVQS_
489+
DEL_CONNS _ -> DEL_CONNS_
497490
DEL_USER _ -> DEL_USER_
498491
STAT _ -> STAT_
499492
OK -> OK_
@@ -651,7 +644,7 @@ instance ToJSON NotificationsMode where
651644
instance FromJSON NotificationsMode where
652645
parseJSON = strParseJSON "NotificationsMode"
653646

654-
instance ToField NotificationsMode where toField = toField . Binary . strEncode
647+
instance ToField NotificationsMode where toField = toField . strEncode
655648

656649
instance FromField NotificationsMode where fromField = blobFieldDecoder $ parseAll strP
657650

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,8 +2055,8 @@ testAsyncCommands sqSecured alice bob baseId =
20552055
ackMessageAsync alice "7" bobId (baseId + 4) Nothing
20562056
get alice =##> \case ("7", _, OK) -> True; _ -> False
20572057
deleteConnectionAsync alice False bobId
2058-
get alice =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bobId; _ -> False
2059-
get alice =##> \case ("", c, DEL_CONN) -> c == bobId; _ -> False
2058+
get alice =##> \case ("", "", DEL_RCVQS [(c, _, _, Nothing)]) -> c == bobId; _ -> False
2059+
get alice =##> \case ("", "", DEL_CONNS [c]) -> c == bobId; _ -> False
20602060
liftIO $ noMessages alice "nothing else should be delivered to alice"
20612061
where
20622062
msgId = subtract baseId
@@ -2123,12 +2123,9 @@ testDeleteConnectionAsync t =
21232123
runRight_ $ do
21242124
deleteConnectionsAsync a False connIds
21252125
nGet a =##> \case ("", "", DOWN {}) -> True; _ -> False
2126-
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
2127-
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
2128-
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
2129-
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
2130-
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
2131-
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
2126+
let delOk = \case (c, _, _, Just (BROKER _ e)) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
2127+
get a =##> \case ("", "", DEL_RCVQS rs) -> length rs == 3 && all delOk rs; _ -> False
2128+
get a =##> \case ("", "", DEL_CONNS cs) -> length cs == 3 && all (`elem` connIds) cs; _ -> False
21322129
liftIO $ noMessages a "nothing else should be delivered to alice"
21332130

21342131
testWaitDeliveryNoPending :: ATransport -> IO ()
@@ -2147,8 +2144,8 @@ testWaitDeliveryNoPending t = withAgentClients2 $ \alice bob ->
21472144
ackMessage alice bobId (baseId + 2) Nothing
21482145

21492146
deleteConnectionsAsync alice True [bobId]
2150-
get alice =##> \case ("", cId, DEL_RCVQ _ _ Nothing) -> cId == bobId; _ -> False
2151-
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
2147+
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Nothing)]) -> cId == bobId; _ -> False
2148+
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
21522149

21532150
3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2"
21542151
get bob =##> \case ("", cId, MERR mId (SMP _ AUTH)) -> cId == aliceId && mId == (baseId + 3); _ -> False
@@ -2184,14 +2181,14 @@ testWaitDelivery t =
21842181
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
21852182
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
21862183
deleteConnectionsAsync alice True [bobId]
2187-
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
2184+
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
21882185
liftIO $ noMessages alice "nothing else should be delivered to alice"
21892186
liftIO $ noMessages bob "nothing else should be delivered to bob"
21902187

21912188
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
21922189
get alice ##> ("", bobId, SENT $ baseId + 3)
21932190
get alice ##> ("", bobId, SENT $ baseId + 4)
2194-
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
2191+
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
21952192

21962193
liftIO $
21972194
getInAnyOrder
@@ -2231,8 +2228,8 @@ testWaitDeliveryAUTHErr t =
22312228
ackMessage alice bobId (baseId + 2) Nothing
22322229

22332230
deleteConnectionsAsync bob False [aliceId]
2234-
get bob =##> \case ("", cId, DEL_RCVQ _ _ Nothing) -> cId == aliceId; _ -> False
2235-
get bob =##> \case ("", cId, DEL_CONN) -> cId == aliceId; _ -> False
2231+
get bob =##> \case ("", "", DEL_RCVQS [(cId, _, _, Nothing)]) -> cId == aliceId; _ -> False
2232+
get bob =##> \case ("", "", DEL_CONNS [cId]) -> cId == aliceId; _ -> False
22362233

22372234
pure (aliceId, bobId)
22382235

@@ -2241,14 +2238,14 @@ testWaitDeliveryAUTHErr t =
22412238
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
22422239
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
22432240
deleteConnectionsAsync alice True [bobId]
2244-
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
2241+
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
22452242
liftIO $ noMessages alice "nothing else should be delivered to alice"
22462243
liftIO $ noMessages bob "nothing else should be delivered to bob"
22472244

22482245
withSmpServerStoreLogOn t testPort $ \_ -> do
22492246
get alice =##> \case ("", cId, MERR mId (SMP _ AUTH)) -> cId == bobId && mId == (baseId + 3); _ -> False
22502247
get alice =##> \case ("", cId, MERR mId (SMP _ AUTH)) -> cId == bobId && mId == (baseId + 4); _ -> False
2251-
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
2248+
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
22522249

22532250
liftIO $ noMessages alice "nothing else should be delivered to alice"
22542251
liftIO $ noMessages bob "nothing else should be delivered to bob"
@@ -2281,8 +2278,8 @@ testWaitDeliveryTimeout t =
22812278
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
22822279
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
22832280
deleteConnectionsAsync alice True [bobId]
2284-
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
2285-
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
2281+
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
2282+
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
22862283
liftIO $ noMessages alice "nothing else should be delivered to alice"
22872284
liftIO $ noMessages bob "nothing else should be delivered to bob"
22882285

@@ -2321,8 +2318,8 @@ testWaitDeliveryTimeout2 t =
23212318
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
23222319
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
23232320
deleteConnectionsAsync alice True [bobId]
2324-
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
2325-
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
2321+
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
2322+
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
23262323
liftIO $ noMessages alice "nothing else should be delivered to alice"
23272324
liftIO $ noMessages bob "nothing else should be delivered to bob"
23282325

@@ -2430,8 +2427,8 @@ testUsers =
24302427
(aId', bId') <- makeConnectionForUsers a auId b 1
24312428
exchangeGreetings a bId' b aId'
24322429
deleteUser a auId True
2433-
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId'; _ -> False
2434-
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
2430+
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Nothing)]) -> c == bId'; _ -> False
2431+
get a =##> \case ("", "", DEL_CONNS [c]) -> c == bId'; _ -> False
24352432
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
24362433
exchangeGreetingsMsgId 4 a bId b aId
24372434
liftIO $ noMessages a "nothing else should be delivered to alice"
@@ -2462,8 +2459,8 @@ testUsersNoServer t = withAgentClientsCfg2 aCfg agentCfg $ \a b -> do
24622459
nGet b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False
24632460
runRight_ $ do
24642461
deleteUser a auId True
2465-
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
2466-
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
2462+
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Just (BROKER _ e))]) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
2463+
get a =##> \case ("", "", DEL_CONNS [c]) -> c == bId'; _ -> False
24672464
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
24682465
liftIO $ noMessages a "nothing else should be delivered to alice"
24692466
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
@@ -2581,9 +2578,8 @@ testSwitchDelete servers =
25812578
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
25822579
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
25832580
deleteConnectionAsync a False bId
2584-
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId; _ -> False
2585-
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId; _ -> False
2586-
get a =##> \case ("", c, DEL_CONN) -> c == bId; _ -> False
2581+
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Nothing), (c', _, _, Nothing)]) -> c == bId && c' == bId; _ -> False
2582+
get a =##> \case ("", "", DEL_CONNS [c]) -> c == bId; _ -> False
25872583
liftIO $ noMessages a "nothing else should be delivered to alice"
25882584

25892585
testAbortSwitchStarted :: HasCallStack => InitialAgentServers -> IO ()

0 commit comments

Comments
 (0)