@@ -53,6 +53,7 @@ import Data.List (intercalate)
5353import Data.Maybe (catMaybes , fromMaybe )
5454import qualified Data.Text as T
5555import Data.Time.Clock (getCurrentTime )
56+ import Data.Time.Clock.System (SystemTime (.. ), getSystemTime )
5657import Data.Time.Format.ISO8601 (iso8601Show )
5758import GHC.IO (catchAny )
5859import Simplex.Messaging.Agent.Client (getMapLock , withLockMap )
@@ -92,15 +93,19 @@ data JournalStoreConfig = JournalStoreConfig
9293 -- This number should be set bigger than queue quota.
9394 maxMsgCount :: Int ,
9495 maxStateLines :: Int ,
95- stateTailSize :: Int
96+ stateTailSize :: Int ,
97+ -- time in seconds after which the queue will be closed after message expiration
98+ idleInterval :: Int64
9699 }
97100
98101data JournalQueue = JournalQueue
99102 { queueLock :: Lock ,
100103 -- To avoid race conditions and errors when restoring queues,
101104 -- Nothing is written to TVar when queue is deleted.
102105 queueRec :: TVar (Maybe QueueRec ),
103- msgQueue_ :: TVar (Maybe JournalMsgQueue )
106+ msgQueue_ :: TVar (Maybe JournalMsgQueue ),
107+ -- system time in seconds since epoch
108+ activeAt :: TVar Int64
104109 }
105110
106111data JMQueue = JMQueue
@@ -221,7 +226,8 @@ instance STMQueueStore JournalMsgStore where
221226 lock <- getMapLock (queueLocks st) $ recipientId qr
222227 q <- newTVar $! Just qr
223228 mq <- newTVar Nothing
224- pure $ JournalQueue lock q mq
229+ activeAt <- newTVar 0
230+ pure $ JournalQueue lock q mq activeAt
225231 msgQueue_' = msgQueue_
226232
227233instance MsgStoreClass JournalMsgStore where
@@ -295,11 +301,11 @@ instance MsgStoreClass JournalMsgStore where
295301 (Nothing <$ putStrLn (" Error: path " <> path' <> " is not a directory, skipping" ))
296302
297303 logQueueStates :: JournalMsgStore -> IO ()
298- logQueueStates ms = withActiveMsgQueues ms $ \ _ -> logQueueState
304+ logQueueStates ms = withActiveMsgQueues ms $ \ _ -> unStoreIO . logQueueState
299305
300- logQueueState :: JournalQueue -> IO ()
306+ logQueueState :: JournalQueue -> StoreIO ()
301307 logQueueState q =
302- void $
308+ StoreIO . void $
303309 readTVarIO (msgQueue_ q)
304310 $>>= \ mq -> readTVarIO (handles mq)
305311 $>>= (\ hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just () )
@@ -326,9 +332,20 @@ instance MsgStoreClass JournalMsgStore where
326332 journalId <- newJournalId random
327333 mkJournalQueue queue (newMsgQueueState journalId) Nothing
328334
329- openedMsgQueue :: JournalQueue -> StoreIO (Maybe JournalMsgQueue )
330- openedMsgQueue = StoreIO . readTVarIO . msgQueue_
331- {-# INLINE openedMsgQueue #-}
335+ withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a ) -> StoreIO (Maybe a )
336+ withIdleMsgQueue now ms@ JournalMsgStore {config} rId q action =
337+ StoreIO $ readTVarIO (msgQueue_ q) >>= \ case
338+ Nothing ->
339+ Just <$>
340+ E. bracket
341+ (unStoreIO $ getMsgQueue ms rId q)
342+ (\ _ -> closeMsgQueue q)
343+ (unStoreIO . action)
344+ Just mq -> do
345+ ts <- readTVarIO $ activeAt q
346+ if now - ts >= idleInterval config
347+ then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
348+ else pure Nothing
332349
333350 deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec )
334351 deleteQueue ms rId q =
@@ -355,7 +372,7 @@ instance MsgStoreClass JournalMsgStore where
355372 writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message , Bool ))
356373 writeMsg ms rId q' logState msg = isolateQueue rId q' " writeMsg" $ do
357374 q <- getMsgQueue ms rId q'
358- StoreIO $ do
375+ StoreIO $ ( `E.finally` updateActiveAt q') $ do
359376 st@ MsgQueueState {canWrite, size} <- readTVarIO (state q)
360377 let empty = size == 0
361378 if canWrite || empty
@@ -419,18 +436,21 @@ instance MsgStoreClass JournalMsgStore where
419436 atomically $ writeTVar tipMsg $ Just (Just ml)
420437 pure $ Just msg
421438
422- tryDeleteMsg_ :: JournalMsgQueue -> Bool -> StoreIO ()
423- tryDeleteMsg_ q@ JournalMsgQueue {tipMsg, handles} logState = StoreIO $
439+ tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
440+ tryDeleteMsg_ q mq @ JournalMsgQueue {tipMsg, handles} logState = StoreIO $ ( `E.finally` when logState (updateActiveAt q)) $
424441 void $
425442 readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
426443 $>>= (pure . fmap snd )
427444 $>>= \ len -> readTVarIO handles
428- $>>= \ hs -> updateReadPos q logState len hs $> Just ()
445+ $>>= \ hs -> updateReadPos mq logState len hs $> Just ()
429446
430447 isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
431448 isolateQueue rId JournalQueue {queueLock} op =
432449 tryStore' op rId . withLock' queueLock op . unStoreIO
433450
451+ updateActiveAt :: JournalQueue -> IO ()
452+ updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime
453+
434454tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a
435455tryStore' op rId = tryStore op rId . fmap Right
436456
0 commit comments