@@ -62,7 +62,8 @@ import Data.Either (fromRight)
6262import Data.Functor (($>) )
6363import Data.Int (Int64 )
6464import Data.List (intercalate , sort )
65- import Data.Maybe (fromMaybe , isNothing , mapMaybe )
65+ import qualified Data.Map.Strict as M
66+ import Data.Maybe (fromMaybe , isJust , isNothing , mapMaybe )
6667import Data.Text (Text )
6768import qualified Data.Text as T
6869import Data.Time.Clock (NominalDiffTime , UTCTime , addUTCTime , getCurrentTime )
@@ -96,6 +97,7 @@ data JournalMsgStore s = JournalMsgStore
9697 queueLocks :: TMap RecipientId Lock ,
9798 sharedLock :: TMVar RecipientId ,
9899 queueStore_ :: QStore s ,
100+ openedQueueCount :: TVar Int ,
99101 expireBackupsBefore :: UTCTime
100102 }
101103
@@ -338,12 +340,6 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
338340 deleteStoreQueue = withQS deleteStoreQueue
339341 {-# INLINE deleteStoreQueue #-}
340342
341- #if defined(dbServerPostgres)
342- mkTempQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s )
343- mkTempQueue ms rId qr = createLockIO >>= makeQueue_ ms rId qr
344- {-# INLINE mkTempQueue #-}
345- #endif
346-
347343makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s )
348344makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do
349345 queueRec' <- newTVarIO $ Just qr
@@ -373,16 +369,17 @@ instance MsgStoreClass (JournalMsgStore s) where
373369 queueLocks <- TM. emptyIO
374370 sharedLock <- newEmptyTMVarIO
375371 queueStore_ <- newQueueStore @ (JournalQueue s ) queueStoreCfg
372+ openedQueueCount <- newTVarIO 0
376373 expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime
377- pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, expireBackupsBefore}
374+ pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, openedQueueCount, expireBackupsBefore}
378375
379376 closeMsgStore :: JournalMsgStore s -> IO ()
380377 closeMsgStore ms = do
381378 let st = queueStore_ ms
382379 closeQueues $ loadedQueues @ (JournalQueue s ) st
383380 closeQueueStore @ (JournalQueue s ) st
384381 where
385- closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue
382+ closeQueues qs = readTVarIO qs >>= mapM_ ( closeMsgQueue ms)
386383
387384 withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a ) -> IO a
388385 withActiveMsgQueues = withQS withLoadedQueues . queueStore_
@@ -393,12 +390,12 @@ instance MsgStoreClass (JournalMsgStore s) where
393390 unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of
394391 MQStore st -> withLoadedQueues st run
395392#if defined(dbServerPostgres)
396- PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkTempQueue ms) >=> run
393+ PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkQueue ms False ) >=> run
397394#endif
398395 where
399396 run q = do
400397 r <- action q
401- closeMsgQueue q
398+ closeMsgQueue ms q
402399 pure r
403400
404401 -- This function is concurrency safe
@@ -414,7 +411,7 @@ instance MsgStoreClass (JournalMsgStore s) where
414411 PQStore st -> do
415412 let JournalMsgStore {queueLocks, sharedLock} = ms
416413 foldQueueRecs tty st (Just veryOld) $ \ (rId, qr) -> do
417- q <- mkTempQueue ms rId qr
414+ q <- mkQueue ms False rId qr
418415 withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' " deleteExpiredMsgs" rId $
419416 getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old
420417#endif
@@ -441,9 +438,26 @@ instance MsgStoreClass (JournalMsgStore s) where
441438 queueStore = queueStore_
442439 {-# INLINE queueStore #-}
443440
444- mkQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s )
445- mkQueue ms rId qr = do
446- lock <- atomically $ getMapLock (queueLocks ms) rId
441+ loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts
442+ loadedQueueCounts ms = do
443+ let (qs, ns, nLocks_) = loaded
444+ loadedQueueCount <- M. size <$> readTVarIO qs
445+ loadedNotifierCount <- M. size <$> readTVarIO ns
446+ openJournalCount <- readTVarIO (openedQueueCount ms)
447+ queueLockCount <- M. size <$> readTVarIO (queueLocks ms)
448+ notifierLockCount <- maybe (pure 0 ) (fmap M. size . readTVarIO) nLocks_
449+ pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount, queueLockCount, notifierLockCount}
450+ where
451+ loaded :: (TMap RecipientId (JournalQueue s ), TMap NotifierId RecipientId , Maybe (TMap NotifierId Lock ))
452+ loaded = case queueStore_ ms of
453+ MQStore STMQueueStore {queues, notifiers} -> (queues, notifiers, Nothing )
454+ #if defined(dbServerPostgres)
455+ PQStore PostgresQueueStore {queues, notifiers, notifierLocks} -> (queues, notifiers, Just notifierLocks)
456+ #endif
457+
458+ mkQueue :: JournalMsgStore s -> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s )
459+ mkQueue ms keepLock rId qr = do
460+ lock <- if keepLock then atomically $ getMapLock (queueLocks ms) rId else createLockIO
447461 makeQueue_ ms rId qr lock
448462
449463 getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s )
@@ -478,7 +492,7 @@ instance MsgStoreClass (JournalMsgStore s) where
478492 -- In case the queue became non-empty on write and then again empty on read
479493 -- we won't be closing it, to avoid frequent open/close on active queues.
480494 r <- peek
481- when (isNothing r) $ StoreIO $ closeMsgQueue q
495+ when (isNothing r) $ StoreIO $ closeMsgQueue ms q
482496 pure r
483497 where
484498 peek = do
@@ -492,7 +506,7 @@ instance MsgStoreClass (JournalMsgStore s) where
492506 Nothing ->
493507 E. bracket
494508 getNonEmptyMsgQueue
495- (mapM_ $ \ _ -> closeMsgQueue q)
509+ (mapM_ $ \ _ -> closeMsgQueue ms q)
496510 (maybe (pure (Nothing , 0 )) (unStoreIO . run))
497511 where
498512 run mq = do
@@ -502,7 +516,7 @@ instance MsgStoreClass (JournalMsgStore s) where
502516 Just mq -> do
503517 ts <- readTVarIO $ activeAt q
504518 r <- if now - ts >= idleInterval config
505- then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
519+ then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue ms q
506520 else pure Nothing
507521 sz <- unStoreIO $ getQueueSize_ mq
508522 pure (r, sz)
@@ -517,7 +531,7 @@ instance MsgStoreClass (JournalMsgStore s) where
517531 mq <- unStoreIO $ getMsgQueue ms q False
518532 -- queueState was updated in getMsgQueue
519533 readTVarIO queueState >>= \ case
520- Just QState {hasStored} | not hasStored -> closeMsgQueue q $> Nothing
534+ Just QState {hasStored} | not hasStored -> closeMsgQueue ms q $> Nothing
521535 _ -> pure $ Just mq
522536
523537 deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec )
@@ -580,6 +594,7 @@ instance MsgStoreClass (JournalMsgStore s) where
580594 rh <- createNewJournal queueDirectory $ journalId rs
581595 let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing }
582596 atomically $ writeTVar handles $ Just hs
597+ atomically $ modifyTVar' (openedQueueCount ms) (+ 1 )
583598 pure hs
584599 switchWriteJournal hs = do
585600 journalId <- newJournalId $ random ms
@@ -651,13 +666,16 @@ openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, stateP
651666 Just st
652667 | size st == 0 -> do
653668 (st', hs_) <- removeJournals st shouldBackup
669+ when (isJust hs_) incOpenedCount
654670 mkJournalQueue q st' hs_
655671 | otherwise -> do
656672 sh <- openBackupQueueState st shouldBackup
657673 (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
658674 let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
675+ incOpenedCount
659676 mkJournalQueue q st' (Just hs)
660677 where
678+ incOpenedCount = atomically $ modifyTVar' (openedQueueCount ms) (+ 1 )
661679 -- If the queue is empty, journals are deleted.
662680 -- New journal is created if queue is written to.
663681 -- canWrite is set to True.
@@ -920,28 +938,30 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
920938 && msgPos ws == msgCount ws
921939 && bytePos ws == byteCount ws
922940
923- -- TODO [postgres] possibly, we need to remove the lock from map
924941deleteQueue_ :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec , Maybe (JournalMsgQueue s )))
925942deleteQueue_ ms q =
926- runExceptT $ isolateQueueId " deleteQueue_" ms rId $
927- deleteStoreQueue (queueStore_ ms) q >>= mapM remove
943+ runExceptT $ isolateQueueId " deleteQueue_" ms rId $ do
944+ r <- deleteStoreQueue (queueStore_ ms) q >>= mapM remove
945+ atomically $ TM. delete rId (queueLocks ms)
946+ pure r
928947 where
929948 rId = recipientId q
930949 remove r@ (_, mq_) = do
931- mapM_ closeMsgQueueHandles mq_
950+ mapM_ ( closeMsgQueueHandles ms) mq_
932951 removeQueueDirectory ms rId
933952 pure r
934953
935- closeMsgQueue :: JournalQueue s -> IO ()
936- closeMsgQueue JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing ) >>= mapM_ closeMsgQueueHandles
954+ closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO ()
955+ closeMsgQueue ms JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing ) >>= mapM_ ( closeMsgQueueHandles ms)
937956
938- closeMsgQueueHandles :: JournalMsgQueue s -> IO ()
939- closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles
957+ closeMsgQueueHandles :: JournalMsgStore s -> JournalMsgQueue s -> IO ()
958+ closeMsgQueueHandles ms q = readTVarIO (handles q) >>= mapM_ closeHandles
940959 where
941960 closeHandles (MsgQueueHandles sh rh wh_) = do
942961 hClose sh
943962 hClose rh
944963 mapM_ hClose wh_
964+ atomically $ modifyTVar' (openedQueueCount ms) (subtract 1 )
945965
946966removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO ()
947967removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st
0 commit comments