diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index b0e499d3f..3d8fac5e4 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -296,7 +296,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT deliverNtfs ns stats (AClient _ Client {clientId, ntfSubscriptions, sndQ, connected}) = whenM (currentClient readTVarIO) $ do subs <- readTVarIO ntfSubscriptions - logDebug $ "NOTIFICATIONS: client #" <> tshow clientId <> " is current with " <> tshow (M.size subs) <> " subs" ntfQs <- M.assocs . M.filterWithKey (\nId _ -> M.member nId subs) <$> readTVarIO ns tryAny (atomically $ flushSubscribedNtfs ntfQs) >>= \case Right len -> updateNtfStats len @@ -321,12 +320,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT writeTVar v [] pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta) - updateNtfStats 0 = logDebug $ "NOTIFICATIONS: no ntfs for client #" <> tshow clientId + updateNtfStats 0 = pure () updateNtfStats len = liftIO $ do atomicModifyIORef'_ (ntfCount stats) (subtract len) atomicModifyIORef'_ (msgNtfs stats) (+ len) atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch - logDebug $ "NOTIFICATIONS: delivered to client #" <> tshow clientId <> " " <> tshow len <> " ntfs" sendPendingEvtsThread :: Server -> M () sendPendingEvtsThread s = do @@ -386,13 +384,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT liftIO $ forever $ do threadDelay' interval old <- expireBeforeEpoch expCfg - Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs stats old + now <- systemSeconds <$> getSystemTime + Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs now ms old + atomicModifyIORef'_ (msgExpired stats) (+ deleted) logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages" where - expireQueueMsgs stats old rId q = - runExceptT (deleteExpiredMsgs rId q True old) >>= \case - Right deleted -> Sum deleted <$ atomicModifyIORef'_ (msgExpired stats) (+ deleted) - Left _ -> pure 0 + expireQueueMsgs now ms old rId q = + either (const 0) Sum <$> runExceptT (idleDeleteExpiredMsgs now ms rId q old) expireNtfsThread :: ServerConfig -> M () expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do @@ -1469,7 +1467,7 @@ client expireMessages :: Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO () expireMessages msgExp stats = do - deleted <- maybe (pure 0) (deleteExpiredMsgs (recipientId qr) q True <=< liftIO . expireBeforeEpoch) msgExp + deleted <- maybe (pure 0) (deleteExpiredMsgs ms (recipientId qr) q <=< liftIO . expireBeforeEpoch) msgExp liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted) -- The condition for delivery of the message is: @@ -1763,9 +1761,8 @@ processServerMessages = do exitFailure where expireQueue = do - expired'' <- deleteExpiredMsgs rId q False old + expired'' <- deleteExpiredMsgs ms rId q old stored'' <- getQueueSize ms rId q - liftIO $ logQueueState q liftIO $ closeMsgQueue q pure (stored'', expired'') processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats @@ -1823,7 +1820,7 @@ importMessages tty ms f old_ = do -- if the first message in queue head is "quota", remove it. mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ \mq -> tryPeekMsg_ mq >>= \case - Just MessageQuota {} -> tryDeleteMsg_ mq False + Just MessageQuota {} -> tryDeleteMsg_ q mq False _ -> pure () msgErr :: Show e => String -> e -> String msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 5e60757c1..9e0b65beb 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -121,7 +121,7 @@ defaultMessageExpiration :: ExpirationConfig defaultMessageExpiration = ExpirationConfig { ttl = defMsgExpirationDays * 86400, -- seconds - checkInterval = 43200 -- seconds, 12 hours + checkInterval = 21600 -- seconds, 6 hours } defNtfExpirationHours :: Int64 @@ -290,7 +290,8 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota} AMSType SMSJournal -> case storeMsgsFile of Just storePath -> - let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize} + let idleInterval = maybe maxBound checkInterval messageExpiration + cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval} in AMS SMSJournal <$> newMsgStore cfg Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure ntfStore <- NtfStore <$> TM.emptyIO diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 79709d3eb..8b189c6f7 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -148,7 +148,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = doesFileExist iniFile >>= \case True -> readIniFile iniFile >>= either exitError a _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." - newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize} + newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration} iniFile = combine cfgPath "smp-server.ini" serverVersion = "SMP server v" <> simplexMQVersion defaultServerPorts = "5223,443" diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 0310a5991..3df894652 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -53,6 +53,7 @@ import Data.List (intercalate) import Data.Maybe (catMaybes, fromMaybe) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) import GHC.IO (catchAny) import Simplex.Messaging.Agent.Client (getMapLock, withLockMap) @@ -92,7 +93,9 @@ data JournalStoreConfig = JournalStoreConfig -- This number should be set bigger than queue quota. maxMsgCount :: Int, maxStateLines :: Int, - stateTailSize :: Int + stateTailSize :: Int, + -- time in seconds after which the queue will be closed after message expiration + idleInterval :: Int64 } data JournalQueue = JournalQueue @@ -100,7 +103,9 @@ data JournalQueue = JournalQueue -- To avoid race conditions and errors when restoring queues, -- Nothing is written to TVar when queue is deleted. queueRec :: TVar (Maybe QueueRec), - msgQueue_ :: TVar (Maybe JournalMsgQueue) + msgQueue_ :: TVar (Maybe JournalMsgQueue), + -- system time in seconds since epoch + activeAt :: TVar Int64 } data JMQueue = JMQueue @@ -221,7 +226,8 @@ instance STMQueueStore JournalMsgStore where lock <- getMapLock (queueLocks st) $ recipientId qr q <- newTVar $! Just qr mq <- newTVar Nothing - pure $ JournalQueue lock q mq + activeAt <- newTVar 0 + pure $ JournalQueue lock q mq activeAt msgQueue_' = msgQueue_ instance MsgStoreClass JournalMsgStore where @@ -295,11 +301,11 @@ instance MsgStoreClass JournalMsgStore where (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) logQueueStates :: JournalMsgStore -> IO () - logQueueStates ms = withActiveMsgQueues ms $ \_ -> logQueueState + logQueueStates ms = withActiveMsgQueues ms $ \_ -> unStoreIO . logQueueState - logQueueState :: JournalQueue -> IO () + logQueueState :: JournalQueue -> StoreIO () logQueueState q = - void $ + StoreIO . void $ readTVarIO (msgQueue_ q) $>>= \mq -> readTVarIO (handles mq) $>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ()) @@ -326,9 +332,20 @@ instance MsgStoreClass JournalMsgStore where journalId <- newJournalId random mkJournalQueue queue (newMsgQueueState journalId) Nothing - openedMsgQueue :: JournalQueue -> StoreIO (Maybe JournalMsgQueue) - openedMsgQueue = StoreIO . readTVarIO . msgQueue_ - {-# INLINE openedMsgQueue #-} + withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a) + withIdleMsgQueue now ms@JournalMsgStore {config} rId q action = + StoreIO $ readTVarIO (msgQueue_ q) >>= \case + Nothing -> + Just <$> + E.bracket + (unStoreIO $ getMsgQueue ms rId q) + (\_ -> closeMsgQueue q) + (unStoreIO . action) + Just mq -> do + ts <- readTVarIO $ activeAt q + if now - ts >= idleInterval config + then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q + else pure Nothing deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec) deleteQueue ms rId q = @@ -355,7 +372,7 @@ instance MsgStoreClass JournalMsgStore where writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do q <- getMsgQueue ms rId q' - StoreIO $ do + StoreIO $ (`E.finally` updateActiveAt q') $ do st@MsgQueueState {canWrite, size} <- readTVarIO (state q) let empty = size == 0 if canWrite || empty @@ -419,18 +436,21 @@ instance MsgStoreClass JournalMsgStore where atomically $ writeTVar tipMsg $ Just (Just ml) pure $ Just msg - tryDeleteMsg_ :: JournalMsgQueue -> Bool -> StoreIO () - tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ + tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO () + tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $ void $ readTVarIO tipMsg -- if there is no cached tipMsg, do nothing $>>= (pure . fmap snd) $>>= \len -> readTVarIO handles - $>>= \hs -> updateReadPos q logState len hs $> Just () + $>>= \hs -> updateReadPos mq logState len hs $> Just () isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a isolateQueue rId JournalQueue {queueLock} op = tryStore' op rId . withLock' queueLock op . unStoreIO +updateActiveAt :: JournalQueue -> IO () +updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime + tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a tryStore' op rId = tryStore op rId . fmap Right diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index b1412421c..33c91374c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -21,6 +21,7 @@ import Control.Concurrent.STM import Control.Monad.IO.Class import Control.Monad.Trans.Except import Data.Functor (($>)) +import Data.Int (Int64) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore @@ -108,9 +109,10 @@ instance MsgStoreClass STMMsgStore where writeTVar msgQueue_ $! Just q pure q - openedMsgQueue :: STMQueue -> STM (Maybe STMMsgQueue) - openedMsgQueue = readTVar . msgQueue_ - {-# INLINE openedMsgQueue #-} + -- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue) + withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a) + withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= mapM action + {-# INLINE withIdleMsgQueue #-} deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec) deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q @@ -157,8 +159,8 @@ instance MsgStoreClass STMMsgStore where tryPeekMsg_ = tryPeekTQueue . msgQueue {-# INLINE tryPeekMsg_ #-} - tryDeleteMsg_ :: STMMsgQueue -> Bool -> STM () - tryDeleteMsg_ STMMsgQueue {msgQueue = q, size} _logState = + tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM () + tryDeleteMsg_ _ STMMsgQueue {msgQueue = q, size} _logState = tryReadTQueue q >>= \case Just _ -> modifyTVar' size (subtract 1) _ -> pure () diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 28f250b70..33d376a20 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -15,6 +15,7 @@ import Control.Monad (foldM) import Control.Monad.Trans.Except import Data.Int (Int64) import Data.Kind +import Data.Maybe (fromMaybe) import qualified Data.Map.Strict as M import Data.Time.Clock.System (SystemTime (systemSeconds)) import Simplex.Messaging.Protocol @@ -42,10 +43,11 @@ class Monad (StoreMonad s) => MsgStoreClass s where activeMsgQueues :: s -> TMap RecipientId (StoreQueue s) withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a logQueueStates :: s -> IO () - logQueueState :: StoreQueue s -> IO () + logQueueState :: StoreQueue s -> StoreMonad s () queueRec' :: StoreQueue s -> TVar (Maybe QueueRec) getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s) - openedMsgQueue :: StoreQueue s -> StoreMonad s (Maybe (MsgQueue s)) + -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config + withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a) deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec) deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message] @@ -53,7 +55,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running getQueueSize_ :: MsgQueue s -> StoreMonad s Int tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message) - tryDeleteMsg_ :: MsgQueue s -> Bool -> StoreMonad s () + tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s () isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a data MSType = MSMemory | MSJournal @@ -89,7 +91,7 @@ tryDelMsg st rId q msgId' = tryPeekMsg_ mq >>= \case msg_@(Just msg) | messageId msg == msgId' -> - tryDeleteMsg_ mq True >> pure msg_ + tryDeleteMsg_ q mq True >> pure msg_ _ -> pure Nothing -- atomic delete (== read) last and peek next message if available @@ -98,7 +100,7 @@ tryDelPeekMsg st rId q msgId' = withMsgQueue st rId q "tryDelPeekMsg" $ \mq -> tryPeekMsg_ mq >>= \case msg_@(Just msg) - | messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq) + | messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ mq) | otherwise -> pure (Nothing, msg_) _ -> pure (Nothing, Nothing) @@ -106,13 +108,26 @@ withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a {-# INLINE withMsgQueue #-} -deleteExpiredMsgs :: MsgStoreClass s => RecipientId -> StoreQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int -deleteExpiredMsgs rId q logState old = - isolateQueue rId q "deleteExpiredMsgs" $ openedMsgQueue q >>= maybe (pure 0) (loop 0) +deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int +deleteExpiredMsgs st rId q old = + isolateQueue rId q "deleteExpiredMsgs" $ + getMsgQueue st rId q >>= deleteExpireMsgs_ old q + +-- closed and idle queues will be closed after expiration +idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int +idleDeleteExpiredMsgs now st rId q old = + isolateQueue rId q "idleDeleteExpiredMsgs" $ + fromMaybe 0 <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q) + +deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int +deleteExpireMsgs_ old q mq = do + n <- loop 0 + logQueueState q + pure n where - loop dc mq = + loop dc = tryPeekMsg_ mq >>= \case Just Message {msgTs} | systemSeconds msgTs < old -> - tryDeleteMsg_ mq logState >> loop (dc + 1) mq + tryDeleteMsg_ q mq False >> loop (dc + 1) _ -> pure dc diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 55fc8070f..b281d8001 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -3076,8 +3076,9 @@ testTwoUsers = withAgentClients2 $ \a b -> do ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a - ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a + -- to avoice race condition + nGet a =##> \case ("", "", DOWN _ _) -> True; ("", "", UP _ _) -> True; _ -> False + nGet a =##> \case ("", "", UP _ _) -> True; ("", "", DOWN _ _) -> True; _ -> False ("", "", UP _ _) <- nGet a ("", "", UP _ _) <- nGet a ("", "", UP _ _) <- nGet a diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 4fc48d6c7..35c27c22e 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -77,7 +77,8 @@ testJournalStoreCfg = quota = 3, maxMsgCount = 4, maxStateLines = 2, - stateTailSize = 256 + stateTailSize = 256, + idleInterval = 21600 } mkMessage :: MonadIO m => ByteString -> m Message diff --git a/tests/CoreTests/RetryIntervalTests.hs b/tests/CoreTests/RetryIntervalTests.hs index da96d0208..d4eee9ed6 100644 --- a/tests/CoreTests/RetryIntervalTests.hs +++ b/tests/CoreTests/RetryIntervalTests.hs @@ -66,7 +66,7 @@ testRetryIntervalSameMode = testRetryIntervalSwitchMode :: Spec testRetryIntervalSwitchMode = - it "should increase elapased time and interval when the mode stays the same" $ do + it "should increase elapased time and interval when the mode switches" $ do lock <- newEmptyTMVarIO intervals <- newTVarIO [] reportedIntervals <- newTVarIO []