Skip to content

Commit e9c4db7

Browse files
committed
smp server: expire messages in idle message queues (including not opened)
1 parent 3017d14 commit e9c4db7

File tree

7 files changed

+73
-36
lines changed

7 files changed

+73
-36
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
296296
deliverNtfs ns stats (AClient _ Client {clientId, ntfSubscriptions, sndQ, connected}) =
297297
whenM (currentClient readTVarIO) $ do
298298
subs <- readTVarIO ntfSubscriptions
299-
logDebug $ "NOTIFICATIONS: client #" <> tshow clientId <> " is current with " <> tshow (M.size subs) <> " subs"
300299
ntfQs <- M.assocs . M.filterWithKey (\nId _ -> M.member nId subs) <$> readTVarIO ns
301300
tryAny (atomically $ flushSubscribedNtfs ntfQs) >>= \case
302301
Right len -> updateNtfStats len
@@ -321,12 +320,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
321320
writeTVar v []
322321
pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time
323322
nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta)
324-
updateNtfStats 0 = logDebug $ "NOTIFICATIONS: no ntfs for client #" <> tshow clientId
323+
updateNtfStats 0 = pure ()
325324
updateNtfStats len = liftIO $ do
326325
atomicModifyIORef'_ (ntfCount stats) (subtract len)
327326
atomicModifyIORef'_ (msgNtfs stats) (+ len)
328327
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
329-
logDebug $ "NOTIFICATIONS: delivered to client #" <> tshow clientId <> " " <> tshow len <> " ntfs"
330328

331329
sendPendingEvtsThread :: Server -> M ()
332330
sendPendingEvtsThread s = do
@@ -386,13 +384,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
386384
liftIO $ forever $ do
387385
threadDelay' interval
388386
old <- expireBeforeEpoch expCfg
389-
Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs stats old
387+
now <- systemSeconds <$> getSystemTime
388+
Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs now ms old
389+
atomicModifyIORef'_ (msgExpired stats) (+ deleted)
390390
logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages"
391391
where
392-
expireQueueMsgs stats old rId q =
393-
runExceptT (deleteExpiredMsgs rId q True old) >>= \case
394-
Right deleted -> Sum deleted <$ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
395-
Left _ -> pure 0
392+
expireQueueMsgs now ms old rId q =
393+
either (const 0) Sum <$> runExceptT (idleDeleteExpiredMsgs now ms rId q old)
396394

397395
expireNtfsThread :: ServerConfig -> M ()
398396
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
@@ -1469,7 +1467,7 @@ client
14691467

14701468
expireMessages :: Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO ()
14711469
expireMessages msgExp stats = do
1472-
deleted <- maybe (pure 0) (deleteExpiredMsgs (recipientId qr) q True <=< liftIO . expireBeforeEpoch) msgExp
1470+
deleted <- maybe (pure 0) (deleteExpiredMsgs ms (recipientId qr) q <=< liftIO . expireBeforeEpoch) msgExp
14731471
liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
14741472

14751473
-- The condition for delivery of the message is:
@@ -1763,9 +1761,8 @@ processServerMessages = do
17631761
exitFailure
17641762
where
17651763
expireQueue = do
1766-
expired'' <- deleteExpiredMsgs rId q False old
1764+
expired'' <- deleteExpiredMsgs ms rId q old
17671765
stored'' <- getQueueSize ms rId q
1768-
liftIO $ logQueueState q
17691766
liftIO $ closeMsgQueue q
17701767
pure (stored'', expired'')
17711768
processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ defaultMessageExpiration :: ExpirationConfig
121121
defaultMessageExpiration =
122122
ExpirationConfig
123123
{ ttl = defMsgExpirationDays * 86400, -- seconds
124-
checkInterval = 43200 -- seconds, 12 hours
124+
checkInterval = 21600 -- seconds, 6 hours
125125
}
126126

127127
defNtfExpirationHours :: Int64
@@ -290,7 +290,8 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
290290
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
291291
AMSType SMSJournal -> case storeMsgsFile of
292292
Just storePath ->
293-
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize}
293+
let idleInterval = checkInterval defaultMessageExpiration
294+
cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval}
294295
in AMS SMSJournal <$> newMsgStore cfg
295296
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
296297
ntfStore <- NtfStore <$> TM.emptyIO

src/Simplex/Messaging/Server/Main.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
148148
doesFileExist iniFile >>= \case
149149
True -> readIniFile iniFile >>= either exitError a
150150
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
151-
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize}
151+
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration}
152152
iniFile = combine cfgPath "smp-server.ini"
153153
serverVersion = "SMP server v" <> simplexMQVersion
154154
defaultServerPorts = "5223,443"

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import Data.List (intercalate)
5353
import Data.Maybe (catMaybes, fromMaybe)
5454
import qualified Data.Text as T
5555
import Data.Time.Clock (getCurrentTime)
56+
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
5657
import Data.Time.Format.ISO8601 (iso8601Show)
5758
import GHC.IO (catchAny)
5859
import Simplex.Messaging.Agent.Client (getMapLock, withLockMap)
@@ -92,7 +93,9 @@ data JournalStoreConfig = JournalStoreConfig
9293
-- This number should be set bigger than queue quota.
9394
maxMsgCount :: Int,
9495
maxStateLines :: Int,
95-
stateTailSize :: Int
96+
stateTailSize :: Int,
97+
-- time in seconds after which the queue will be closed after message expiration
98+
idleInterval :: Int64
9699
}
97100

98101
data JournalQueue = JournalQueue
@@ -116,7 +119,9 @@ data JournalMsgQueue = JournalMsgQueue
116119
-- It prevents reading each message twice,
117120
-- and reading it after it was just written.
118121
tipMsg :: TVar (Maybe (Maybe (Message, Int64))),
119-
handles :: TVar (Maybe MsgQueueHandles)
122+
handles :: TVar (Maybe MsgQueueHandles),
123+
-- system time in seconds since epoch
124+
activeAt :: TVar Int64
120125
}
121126

122127
data MsgQueueState = MsgQueueState
@@ -295,11 +300,11 @@ instance MsgStoreClass JournalMsgStore where
295300
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
296301

297302
logQueueStates :: JournalMsgStore -> IO ()
298-
logQueueStates ms = withActiveMsgQueues ms $ \_ -> logQueueState
303+
logQueueStates ms = withActiveMsgQueues ms $ \_ -> unStoreIO . logQueueState
299304

300-
logQueueState :: JournalQueue -> IO ()
305+
logQueueState :: JournalQueue -> StoreIO ()
301306
logQueueState q =
302-
void $
307+
StoreIO . void $
303308
readTVarIO (msgQueue_ q)
304309
$>>= \mq -> readTVarIO (handles mq)
305310
$>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ())
@@ -326,9 +331,21 @@ instance MsgStoreClass JournalMsgStore where
326331
journalId <- newJournalId random
327332
mkJournalQueue queue (newMsgQueueState journalId) Nothing
328333

329-
openedMsgQueue :: JournalQueue -> StoreIO (Maybe JournalMsgQueue)
330-
openedMsgQueue = StoreIO . readTVarIO . msgQueue_
331-
{-# INLINE openedMsgQueue #-}
334+
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a)
335+
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
336+
StoreIO $ readTVarIO (msgQueue_ q) >>= maybe runQ idleQ
337+
where
338+
runQ =
339+
Just <$>
340+
E.bracket
341+
(unStoreIO $ getMsgQueue ms rId q)
342+
(\_ -> closeMsgQueue q)
343+
(unStoreIO . action)
344+
idleQ mq = do
345+
ts <- readTVarIO $ activeAt mq
346+
if now - ts > idleInterval config
347+
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
348+
else pure Nothing
332349

333350
deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
334351
deleteQueue ms rId q =
@@ -355,7 +372,7 @@ instance MsgStoreClass JournalMsgStore where
355372
writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
356373
writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do
357374
q <- getMsgQueue ms rId q'
358-
StoreIO $ do
375+
StoreIO $ (`E.finally` updateActiveAt q) $ do
359376
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
360377
let empty = size == 0
361378
if canWrite || empty
@@ -420,7 +437,7 @@ instance MsgStoreClass JournalMsgStore where
420437
pure $ Just msg
421438

422439
tryDeleteMsg_ :: JournalMsgQueue -> Bool -> StoreIO ()
423-
tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} logState = StoreIO $
440+
tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` updateActiveAt q) $
424441
void $
425442
readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
426443
$>>= (pure . fmap snd)
@@ -431,6 +448,9 @@ instance MsgStoreClass JournalMsgStore where
431448
isolateQueue rId JournalQueue {queueLock} op =
432449
tryStore' op rId . withLock' queueLock op . unStoreIO
433450

451+
updateActiveAt :: JournalMsgQueue -> IO ()
452+
updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime
453+
434454
tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a
435455
tryStore' op rId = tryStore op rId . fmap Right
436456

@@ -457,9 +477,10 @@ mkJournalQueue queue st hs_ = do
457477
state <- newTVarIO st
458478
tipMsg <- newTVarIO Nothing
459479
handles <- newTVarIO hs_
480+
activeAt <- newTVarIO . systemSeconds =<< getSystemTime
460481
-- using the same queue lock which is currently locked,
461482
-- to avoid map lookup on queue operations
462-
pure JournalMsgQueue {queue, state, tipMsg, handles}
483+
pure JournalMsgQueue {queue, state, tipMsg, handles, activeAt}
463484

464485
chooseReadJournal :: JournalMsgQueue -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle))
465486
chooseReadJournal q log' hs = do

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Control.Concurrent.STM
2121
import Control.Monad.IO.Class
2222
import Control.Monad.Trans.Except
2323
import Data.Functor (($>))
24+
import Data.Int (Int64)
2425
import Simplex.Messaging.Protocol
2526
import Simplex.Messaging.Server.MsgStore.Types
2627
import Simplex.Messaging.Server.QueueStore
@@ -108,9 +109,10 @@ instance MsgStoreClass STMMsgStore where
108109
writeTVar msgQueue_ $! Just q
109110
pure q
110111

111-
openedMsgQueue :: STMQueue -> STM (Maybe STMMsgQueue)
112-
openedMsgQueue = readTVar . msgQueue_
113-
{-# INLINE openedMsgQueue #-}
112+
-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
113+
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a)
114+
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= mapM action
115+
{-# INLINE withIdleMsgQueue #-}
114116

115117
deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
116118
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q

src/Simplex/Messaging/Server/MsgStore/Types.hs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import Control.Monad (foldM)
1515
import Control.Monad.Trans.Except
1616
import Data.Int (Int64)
1717
import Data.Kind
18+
import Data.Maybe (fromMaybe)
1819
import qualified Data.Map.Strict as M
1920
import Data.Time.Clock.System (SystemTime (systemSeconds))
2021
import Simplex.Messaging.Protocol
@@ -42,10 +43,11 @@ class Monad (StoreMonad s) => MsgStoreClass s where
4243
activeMsgQueues :: s -> TMap RecipientId (StoreQueue s)
4344
withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
4445
logQueueStates :: s -> IO ()
45-
logQueueState :: StoreQueue s -> IO ()
46+
logQueueState :: StoreQueue s -> StoreMonad s ()
4647
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
4748
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
48-
openedMsgQueue :: StoreQueue s -> StoreMonad s (Maybe (MsgQueue s))
49+
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
50+
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a)
4951
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
5052
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
5153
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
@@ -106,13 +108,26 @@ withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String ->
106108
withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a
107109
{-# INLINE withMsgQueue #-}
108110

109-
deleteExpiredMsgs :: MsgStoreClass s => RecipientId -> StoreQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int
110-
deleteExpiredMsgs rId q logState old =
111-
isolateQueue rId q "deleteExpiredMsgs" $ openedMsgQueue q >>= maybe (pure 0) (loop 0)
111+
deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
112+
deleteExpiredMsgs st rId q old =
113+
isolateQueue rId q "deleteExpiredMsgs" $
114+
getMsgQueue st rId q >>= deleteExpireMsgs_ old q
115+
116+
-- closed and idle queues will be closed after expiration
117+
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
118+
idleDeleteExpiredMsgs now st rId q old =
119+
isolateQueue rId q "idleDeleteExpiredMsgs" $
120+
fromMaybe 0 <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
121+
122+
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
123+
deleteExpireMsgs_ old q mq = do
124+
n <- loop 0
125+
logQueueState q
126+
pure n
112127
where
113-
loop dc mq =
128+
loop dc =
114129
tryPeekMsg_ mq >>= \case
115130
Just Message {msgTs}
116131
| systemSeconds msgTs < old ->
117-
tryDeleteMsg_ mq logState >> loop (dc + 1) mq
132+
tryDeleteMsg_ mq False >> loop (dc + 1)
118133
_ -> pure dc

tests/CoreTests/MsgStoreTests.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ testJournalStoreCfg =
7777
quota = 3,
7878
maxMsgCount = 4,
7979
maxStateLines = 2,
80-
stateTailSize = 256
80+
stateTailSize = 256,
81+
idleInterval = 21600
8182
}
8283

8384
mkMessage :: MonadIO m => ByteString -> m Message

0 commit comments

Comments
 (0)