diff --git a/simplexmq.cabal b/simplexmq.cabal index 25969b2ff..7cee47e0d 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -1,7 +1,7 @@ cabal-version: 1.12 name: simplexmq -version: 6.2.0.2 +version: 6.2.0.3 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 3d8fac5e4..4b78f3cb4 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -163,11 +163,11 @@ smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M () smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do s <- asks server pa <- asks proxyAgent - msgStats <- processServerMessages + msgStats_ <- processServerMessages ntfStats <- restoreServerNtfs - liftIO $ printMessageStats "messages" msgStats + liftIO $ mapM_ (printMessageStats "messages") msgStats_ liftIO $ printMessageStats "notifications" ntfStats - restoreServerStats msgStats ntfStats + restoreServerStats msgStats_ ntfStats raceAny_ ( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ()) @@ -385,12 +385,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT threadDelay' interval old <- expireBeforeEpoch expCfg now <- systemSeconds <$> getSystemTime - Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs now ms old - atomicModifyIORef'_ (msgExpired stats) (+ deleted) - logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages" + msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <- + withActiveMsgQueues ms $ expireQueueMsgs now ms old + atomicWriteIORef (msgCount stats) stored + atomicModifyIORef'_ (msgExpired stats) (+ expired) + printMessageStats "STORE: messages" msgStats where - expireQueueMsgs now ms old rId q = - either (const 0) Sum <$> runExceptT (idleDeleteExpiredMsgs now ms rId q old) + expireQueueMsgs now ms old rId q = fmap (fromRight newMessageStats) . runExceptT $ do + (expired_, stored) <- idleDeleteExpiredMsgs now ms rId q old + pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1} expireNtfsThread :: ServerConfig -> M () expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do @@ -1731,26 +1734,26 @@ exportMessages tty ms f drainMsgs = do exitFailure encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') -processServerMessages :: M MessageStats +processServerMessages :: M (Maybe MessageStats) processServerMessages = do old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch) expire <- asks $ expireMessagesOnStart . config asks msgStore >>= liftIO . processMessages old_ expire where - processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO MessageStats + processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO (Maybe MessageStats) processMessages old_ expire = \case AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of - Just f -> ifM (doesFileExist f) (importMessages False ms f old_) (pure newMessageStats) - Nothing -> pure newMessageStats + Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_) (pure Nothing) + Nothing -> pure Nothing AMS SMSJournal ms - | expire -> case old_ of + | expire -> Just <$> case old_ of Just old -> do logInfo "expiring journal store messages..." withAllMsgQueues False ms $ processExpireQueue old Nothing -> do logInfo "validating journal store messages..." withAllMsgQueues False ms $ processValidateQueue - | otherwise -> logWarn "skipping message expiration" $> newMessageStats + | otherwise -> logWarn "skipping message expiration" $> Nothing where processExpireQueue old rId q = runExceptT expireQueue >>= \case @@ -1887,8 +1890,8 @@ saveServerStats = B.writeFile f $ strEncode stats logInfo "server stats saved" -restoreServerStats :: MessageStats -> MessageStats -> M () -restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats +restoreServerStats :: Maybe MessageStats -> MessageStats -> M () +restoreServerStats msgStats_ ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats where restoreStats f = whenM (doesFileExist f) $ do logInfo $ "restoring server stats from file " <> T.pack f @@ -1897,9 +1900,11 @@ restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>= s <- asks serverStats AMS _ st <- asks msgStore _qCount <- M.size <$> readTVarIO (activeMsgQueues st) - let _msgCount = storedMsgsCount msgStats + let _msgCount = maybe statsMsgCount storedMsgsCount msgStats_ _ntfCount = storedMsgsCount ntfStats - liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgsCount msgStats, _msgNtfExpired = _msgNtfExpired d + expiredMsgsCount ntfStats} + _msgExpired' = _msgExpired d + maybe 0 expiredMsgsCount msgStats_ + _msgNtfExpired' = _msgNtfExpired d + expiredMsgsCount ntfStats + liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired', _msgNtfExpired = _msgNtfExpired'} renameFile f $ f <> ".bak" logInfo "server stats restored" compareCounts "Queue" statsQCount _qCount diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 9e0b65beb..f598bdcb8 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -80,6 +80,8 @@ data ServerConfig = ServerConfig -- | time after which the messages can be removed from the queues and check interval, seconds messageExpiration :: Maybe ExpirationConfig, expireMessagesOnStart :: Bool, + -- | interval of inactivity after which journal queue is closed + idleQueueInterval :: Int64, -- | notification expiration interval (seconds) notificationExpiration :: ExpirationConfig, -- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING), @@ -121,9 +123,12 @@ defaultMessageExpiration :: ExpirationConfig defaultMessageExpiration = ExpirationConfig { ttl = defMsgExpirationDays * 86400, -- seconds - checkInterval = 21600 -- seconds, 6 hours + checkInterval = 14400 -- seconds, 4 hours } +defaultIdleQueueInterval :: Int64 +defaultIdleQueueInterval = 28800 -- seconds, 8 hours + defNtfExpirationHours :: Int64 defNtfExpirationHours = 24 @@ -283,15 +288,14 @@ newProhibitedSub = do return Sub {subThread = ProhibitSub, delivered} newEnv :: ServerConfig -> IO Env -newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do +newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, idleQueueInterval, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do serverActive <- newTVarIO True server <- newServer msgStore@(AMS _ store) <- case msgStoreType of AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota} AMSType SMSJournal -> case storeMsgsFile of Just storePath -> - let idleInterval = maybe maxBound checkInterval messageExpiration - cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval} + let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval} in AMS SMSJournal <$> newMsgStore cfg Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure ntfStore <- NtfStore <$> TM.emptyIO diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 8b189c6f7..3da2aaeb4 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -416,6 +416,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = { ttl = 86400 * readIniDefault defMsgExpirationDays "STORE_LOG" "expire_messages_days" ini }, expireMessagesOnStart = fromMaybe True $ iniOnOff "STORE_LOG" "expire_messages_on_start" ini, + idleQueueInterval = defaultIdleQueueInterval, notificationExpiration = defaultNtfExpiration { ttl = 3600 * readIniDefault defNtfExpirationHours "STORE_LOG" "expire_ntfs_hours" ini diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 3df894652..e3a70786c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -332,20 +332,21 @@ instance MsgStoreClass JournalMsgStore where journalId <- newJournalId random mkJournalQueue queue (newMsgQueueState journalId) Nothing - withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a) + withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int) withIdleMsgQueue now ms@JournalMsgStore {config} rId q action = StoreIO $ readTVarIO (msgQueue_ q) >>= \case Nothing -> - Just <$> - E.bracket - (unStoreIO $ getMsgQueue ms rId q) - (\_ -> closeMsgQueue q) - (unStoreIO . action) + E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do + r <- action mq + sz <- getQueueSize_ mq + pure (Just r, sz) Just mq -> do ts <- readTVarIO $ activeAt q - if now - ts >= idleInterval config + r <- if now - ts >= idleInterval config then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q else pure Nothing + sz <- unStoreIO $ getQueueSize_ mq + pure (r, sz) deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec) deleteQueue ms rId q = diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 33c91374c..42c3360d7 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -110,9 +110,13 @@ instance MsgStoreClass STMMsgStore where pure q -- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue) - withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a) - withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= mapM action - {-# INLINE withIdleMsgQueue #-} + withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int) + withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case + Just q -> do + r <- action q + sz <- getQueueSize_ q + pure (Just r, sz) + Nothing -> pure (Nothing, 0) deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec) deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 33d376a20..0b514659e 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -15,7 +15,6 @@ import Control.Monad (foldM) import Control.Monad.Trans.Except import Data.Int (Int64) import Data.Kind -import Data.Maybe (fromMaybe) import qualified Data.Map.Strict as M import Data.Time.Clock.System (SystemTime (systemSeconds)) import Simplex.Messaging.Protocol @@ -47,7 +46,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where queueRec' :: StoreQueue s -> TVar (Maybe QueueRec) getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s) -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config - withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a) + withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec) deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message] @@ -114,10 +113,11 @@ deleteExpiredMsgs st rId q old = getMsgQueue st rId q >>= deleteExpireMsgs_ old q -- closed and idle queues will be closed after expiration -idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int +-- returns (expired count, queue size after expiration) +idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int) idleDeleteExpiredMsgs now st rId q old = - isolateQueue rId q "idleDeleteExpiredMsgs" $ - fromMaybe 0 <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q) + isolateQueue rId q "idleDeleteExpiredMsgs" $ + withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q) deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int deleteExpireMsgs_ old q mq = do diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index b28f2b804..423b81074 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -133,6 +133,7 @@ cfgMS msType = controlPortAdminAuth = Nothing, messageExpiration = Just defaultMessageExpiration, expireMessagesOnStart = True, + idleQueueInterval = defaultIdleQueueInterval, notificationExpiration = defaultNtfExpiration, inactiveClientExpiration = Just defaultInactiveClientExpiration, logStatsInterval = Nothing, diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 28073d5e4..744ceb437 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -972,7 +972,7 @@ testMsgExpireOnInterval = xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g - let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}} + let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1} withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ -> testSMPClient @c $ \sh -> do (sId, rId, rKey, _) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub