Skip to content

Commit 3d10c9b

Browse files
authored
agent: handle cases when last message ts is not set for notifications; set last ts for "stale" notifications when messages expired and queue is empty, to prevent repeated processing (#1531)
* agent: handle cases when last message ts is not set for notifications; set last ts for "stale" notifications when messages expired and queue is empty, to prevent repeated processing * only log errors if they exist * only set last ts for queue that delivered notification
1 parent 08b84de commit 3d10c9b

File tree

5 files changed

+47
-27
lines changed

5 files changed

+47
-27
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ import Simplex.RemoteControl.Client
224224
import Simplex.RemoteControl.Invitation
225225
import Simplex.RemoteControl.Types
226226
import System.Mem.Weak (deRefWeak)
227+
import UnliftIO.Async (mapConcurrently)
227228
import UnliftIO.Concurrent (forkFinally, forkIO, killThread, mkWeakThreadId, threadDelay)
228229
import qualified UnliftIO.Exception as E
229230
import UnliftIO.STM
@@ -439,7 +440,7 @@ subscribeConnections c = withAgentEnv c . subscribeConnections' c
439440
{-# INLINE subscribeConnections #-}
440441

441442
-- | Get messages for connections (GET commands)
442-
getConnectionMessages :: AgentClient -> NonEmpty ConnId -> IO (NonEmpty (Maybe SMPMsgMeta))
443+
getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Maybe SMPMsgMeta))
443444
getConnectionMessages c = withAgentEnv' c . getConnectionMessages' c
444445
{-# INLINE getConnectionMessages #-}
445446

@@ -1276,24 +1277,26 @@ resubscribeConnections' c connIds = do
12761277
-- union is left-biased, so results returned by subscribeConnections' take precedence
12771278
(`M.union` r) <$> subscribeConnections' c connIds'
12781279

1279-
getConnectionMessages' :: AgentClient -> NonEmpty ConnId -> AM' (NonEmpty (Maybe SMPMsgMeta))
1280-
getConnectionMessages' c = mapM getMsg
1280+
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Maybe SMPMsgMeta))
1281+
getConnectionMessages' c =
1282+
mapConcurrently $ \cmr ->
1283+
getConnectionMessage cmr `catchAgentError'` \e -> do
1284+
logError $ "Error loading message: " <> tshow e
1285+
pure Nothing
12811286
where
1282-
getMsg :: ConnId -> AM' (Maybe SMPMsgMeta)
1283-
getMsg connId =
1284-
getConnectionMessage connId `catchAgentError'` \e -> do
1285-
logError $ "Error loading message: " <> tshow e
1286-
pure Nothing
1287-
getConnectionMessage :: ConnId -> AM (Maybe SMPMsgMeta)
1288-
getConnectionMessage connId = do
1287+
getConnectionMessage :: ConnMsgReq -> AM (Maybe SMPMsgMeta)
1288+
getConnectionMessage (ConnMsgReq connId dbQueueId msgTs_) = do
12891289
whenM (atomically $ hasActiveSubscription c connId) . throwE $ CMD PROHIBITED "getConnectionMessage: subscribed"
12901290
SomeConn _ conn <- withStore c (`getConn` connId)
1291-
case conn of
1291+
msg_ <- case conn of
12921292
DuplexConnection _ (rq :| _) _ -> getQueueMessage c rq
12931293
RcvConnection _ rq -> getQueueMessage c rq
12941294
ContactConnection _ rq -> getQueueMessage c rq
12951295
SndConnection _ _ -> throwE $ CONN SIMPLEX
12961296
NewConnection _ -> throwE $ CMD PROHIBITED "getConnectionMessage: NewConnection"
1297+
when (isNothing msg_) $
1298+
forM_ msgTs_ $ \msgTs -> withStore' c $ \db -> setLastBrokerTs db connId (DBQueueId dbQueueId) msgTs
1299+
pure msg_
12971300

12981301
getNotificationConns' :: AgentClient -> C.CbNonce -> ByteString -> AM (NonEmpty NotificationInfo)
12991302
getNotificationConns' c nonce encNtfInfo =
@@ -1308,24 +1311,24 @@ getNotificationConns' c nonce encNtfInfo =
13081311
lastNtfInfo = Just . fst <$$> getNtfInfo db lastNtf
13091312
in initNtfInfos <> [lastNtfInfo]
13101313
let (errs, ntfInfos_) = partitionEithers rs
1311-
logError $ "Error(s) loading notifications: " <> tshow errs
1314+
unless (null errs) $ logError $ "Error(s) loading notifications: " <> tshow errs
13121315
case L.nonEmpty $ catMaybes ntfInfos_ of
13131316
Just r -> pure r
13141317
Nothing -> throwE $ INTERNAL "getNotificationConns: couldn't get conn info"
13151318
_ -> throwE $ CMD PROHIBITED "getNotificationConns"
13161319
where
13171320
getNtfInfo :: DB.Connection -> PNMessageData -> IO (Either AgentErrorType (NotificationInfo, Maybe UTCTime))
13181321
getNtfInfo db PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} = runExceptT $ do
1319-
(ntfConnId, rcvNtfDhSecret, lastBrokerTs_) <- liftError' storeError $ getNtfRcvQueue db smpQueue
1322+
(ntfConnId, ntfDbQueueId, rcvNtfDhSecret, lastBrokerTs_) <- liftError' storeError $ getNtfRcvQueue db smpQueue
13201323
let ntfMsgMeta = eitherToMaybe $ smpDecode =<< first show (C.cbDecrypt rcvNtfDhSecret nmsgNonce encNMsgMeta)
1321-
ntfInfo = NotificationInfo {ntfConnId, ntfTs, ntfMsgMeta}
1324+
ntfInfo = NotificationInfo {ntfConnId, ntfDbQueueId, ntfTs, ntfMsgMeta}
13221325
pure (ntfInfo, lastBrokerTs_)
13231326
getInitNtfInfo :: DB.Connection -> PNMessageData -> IO (Either AgentErrorType (Maybe NotificationInfo))
13241327
getInitNtfInfo db msgData = runExceptT $ do
1325-
(nftInfo, lastBrokerTs_) <- ExceptT $ getNtfInfo db msgData
1326-
pure $ case (ntfMsgMeta nftInfo, lastBrokerTs_) of
1327-
(Just SMP.NMsgMeta {msgTs}, Just lastBrokerTs)
1328-
| systemToUTCTime msgTs > lastBrokerTs -> Just nftInfo
1328+
(ntfInfo, lastBrokerTs_) <- ExceptT $ getNtfInfo db msgData
1329+
pure $ case ntfMsgMeta ntfInfo of
1330+
Just SMP.NMsgMeta {msgTs}
1331+
| maybe True (systemToUTCTime msgTs >) lastBrokerTs_ -> Just ntfInfo
13291332
_ -> Nothing
13301333

13311334
-- | Send message to the connection (SEND command) in Reader monad

src/Simplex/Messaging/Agent/Protocol.hs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ module Simplex.Messaging.Agent.Protocol
147147
AgentMsgId,
148148
NotificationsMode (..),
149149
NotificationInfo (..),
150+
ConnMsgReq (..),
150151

151152
-- * Encode/decode
152153
serializeCommand,
@@ -678,11 +679,21 @@ instance FromField NotificationsMode where fromField = blobFieldDecoder $ parseA
678679

679680
data NotificationInfo = NotificationInfo
680681
{ ntfConnId :: ConnId,
682+
ntfDbQueueId :: Int64,
681683
ntfTs :: SystemTime,
684+
-- Nothing means that the message failed to decrypt or to decode,
685+
-- we can still show event notification
682686
ntfMsgMeta :: Maybe NMsgMeta
683687
}
684688
deriving (Show)
685689

690+
data ConnMsgReq = ConnMsgReq
691+
{ msgConnId :: ConnId,
692+
msgDbQueueId :: Int64,
693+
msgTs :: Maybe UTCTime
694+
}
695+
deriving (Show)
696+
686697
data ConnectionMode = CMInvitation | CMContact
687698
deriving (Eq, Show)
688699

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
9898
-- Messages
9999
updateRcvIds,
100100
createRcvMsg,
101+
setLastBrokerTs,
101102
updateRcvMsgHash,
102103
createSndMsgBody,
103104
updateSndIds,
@@ -855,7 +856,11 @@ createRcvMsg db connId rq@RcvQueue {dbQueueId} rcvMsgData@RcvMsgData {msgMeta =
855856
insertRcvMsgBase_ db connId rcvMsgData
856857
insertRcvMsgDetails_ db connId rq rcvMsgData
857858
updateRcvMsgHash db connId sndMsgId internalRcvId internalHash
858-
DB.execute db "UPDATE rcv_queues SET last_broker_ts = ? WHERE conn_id = ? AND rcv_queue_id = ?" (brokerTs, connId, dbQueueId)
859+
setLastBrokerTs db connId dbQueueId brokerTs
860+
861+
setLastBrokerTs :: DB.Connection -> ConnId -> DBQueueId 'QSStored -> UTCTime -> IO ()
862+
setLastBrokerTs db connId dbQueueId brokerTs =
863+
DB.execute db "UPDATE rcv_queues SET last_broker_ts = ? WHERE conn_id = ? AND rcv_queue_id = ? AND last_broker_ts < ?" (brokerTs, connId, dbQueueId, brokerTs)
859864

860865
createSndMsgBody :: DB.Connection -> AMessage -> IO Int64
861866
createSndMsgBody db aMessage =
@@ -1781,19 +1786,19 @@ getActiveNtfToken db =
17811786
ntfMode = fromMaybe NMPeriodic ntfMode_
17821787
in NtfToken {deviceToken = DeviceToken provider dt, ntfServer, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhKeys, ntfDhSecret, ntfTknStatus, ntfTknAction, ntfMode}
17831788

1784-
getNtfRcvQueue :: DB.Connection -> SMPQueueNtf -> IO (Either StoreError (ConnId, RcvNtfDhSecret, Maybe UTCTime))
1789+
getNtfRcvQueue :: DB.Connection -> SMPQueueNtf -> IO (Either StoreError (ConnId, Int64, RcvNtfDhSecret, Maybe UTCTime))
17851790
getNtfRcvQueue db SMPQueueNtf {smpServer = (SMPServer host port _), notifierId} =
17861791
firstRow' res SEConnNotFound $
17871792
DB.query
17881793
db
17891794
[sql|
1790-
SELECT conn_id, rcv_ntf_dh_secret, last_broker_ts
1795+
SELECT conn_id, rcv_queue_id, rcv_ntf_dh_secret, last_broker_ts
17911796
FROM rcv_queues
17921797
WHERE host = ? AND port = ? AND ntf_id = ? AND deleted = 0
17931798
|]
17941799
(host, port, notifierId)
17951800
where
1796-
res (connId, Just rcvNtfDhSecret, lastBrokerTs_) = Right (connId, rcvNtfDhSecret, lastBrokerTs_)
1801+
res (connId, dbQueueId, Just rcvNtfDhSecret, lastBrokerTs_) = Right (connId, dbQueueId, rcvNtfDhSecret, lastBrokerTs_)
17971802
res _ = Left SEConnNotFound
17981803

17991804
setConnectionNtfs :: DB.Connection -> ConnId -> Bool -> IO ()

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1946,7 +1946,7 @@ testOnlyCreatePullSlowHandshake = withAgentClientsCfg2 agentProxyCfgV8 agentProx
19461946
getMsg :: AgentClient -> ConnId -> ExceptT AgentErrorType IO a -> ExceptT AgentErrorType IO a
19471947
getMsg c cId action = do
19481948
liftIO $ noMessages c "nothing should be delivered before GET"
1949-
[Just _] <- lift $ getConnectionMessages c [cId]
1949+
[Just _] <- lift $ getConnectionMessages c [ConnMsgReq cId 1 Nothing]
19501950
action
19511951

19521952
getMSGNTF :: AgentClient -> ConnId -> ExceptT AgentErrorType IO ()

tests/AgentTests/NotificationTests.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import Data.Text (Text)
5757
import qualified Data.Text as T
5858
import Data.Text.Encoding (encodeUtf8)
5959
import qualified Data.Text.IO as TIO
60+
import Data.Time.Clock.System (systemToUTCTime)
6061
import NtfClient
6162
import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer, testNtfServer2)
6263
import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'')
@@ -75,7 +76,7 @@ import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..))
7576
import Simplex.Messaging.Notifications.Server.Push.APNS
7677
import Simplex.Messaging.Notifications.Types (NtfTknAction (..), NtfToken (..))
7778
import Simplex.Messaging.Parsers (parseAll)
78-
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..))
79+
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NMsgMeta (..), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..))
7980
import qualified Simplex.Messaging.Protocol as SMP
8081
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..))
8182
import Simplex.Messaging.Transport (ATransport)
@@ -558,11 +559,11 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
558559
(nonce, message) <- messageNotification apns tkn
559560
pure (bobId, aliceId, nonce, message)
560561

561-
Right [NotificationInfo {ntfConnId = cId}] <- runExceptT $ getNotificationConns alice nonce message
562+
Right [NotificationInfo {ntfConnId = cId, ntfMsgMeta = Just NMsgMeta {msgTs}}] <- runExceptT $ getNotificationConns alice nonce message
562563
cId `shouldBe` bobId
563564
-- alice client already has subscription for the connection,
564565
-- so get fails with CMD PROHIBITED (transformed into Nothing in catch)
565-
[Nothing] <- getConnectionMessages alice [cId]
566+
[Nothing] <- getConnectionMessages alice [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs]
566567

567568
threadDelay 500000
568569
suspendAgent alice 0
@@ -572,7 +573,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
572573

573574
-- aliceNtf client doesn't have subscription and is allowed to get notification message
574575
withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> do
575-
(Just SMPMsgMeta {msgFlags = MsgFlags True}) :| _ <- getConnectionMessages aliceNtf [cId]
576+
(Just SMPMsgMeta {msgFlags = MsgFlags True}) :| _ <- getConnectionMessages aliceNtf [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs]
576577
pure ()
577578

578579
threadDelay 1000000

0 commit comments

Comments
 (0)