Skip to content

Commit 1b5a9f3

Browse files
authored
smp server: do not cache all queues from database while processing expirations (#1483)
* smp server: expire only active queues * version * do not cache all queues while processing expirations * refactor * foldWithOptions_ * version * use shared lock when expiring all queues * use TMVar * comment * rename * remove fold options * do not create locks in the Map for temporarily loaded queues * fix * revert version
1 parent fdf8bd7 commit 1b5a9f3

File tree

9 files changed

+186
-97
lines changed

9 files changed

+186
-97
lines changed

simplexmq.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ library
228228
Simplex.Messaging.Server.Main
229229
Simplex.Messaging.Server.MsgStore
230230
Simplex.Messaging.Server.MsgStore.Journal
231+
Simplex.Messaging.Server.MsgStore.Journal.SharedLock
231232
Simplex.Messaging.Server.MsgStore.STM
232233
Simplex.Messaging.Server.MsgStore.Types
233234
Simplex.Messaging.Server.NtfStore

src/Simplex/Messaging/Agent/Lock.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module Simplex.Messaging.Agent.Lock
66
withLock',
77
withGetLock,
88
withGetLocks,
9+
getPutLock,
910
)
1011
where
1112

src/Simplex/Messaging/Server.hs

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ import Simplex.Messaging.Server.Control
9797
import Simplex.Messaging.Server.Env.STM as Env
9898
import Simplex.Messaging.Server.Expiration
9999
import Simplex.Messaging.Server.MsgStore
100-
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, closeMsgQueue)
100+
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue)
101101
import Simplex.Messaging.Server.MsgStore.STM
102102
import Simplex.Messaging.Server.MsgStore.Types
103103
import Simplex.Messaging.Server.NtfStore
@@ -404,11 +404,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
404404
old <- expireBeforeEpoch expCfg
405405
now <- systemSeconds <$> getSystemTime
406406
msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <-
407-
withAllMsgQueues False ms $ expireQueueMsgs now ms old
407+
withAllMsgQueues False "idleDeleteExpiredMsgs" ms $ expireQueueMsgs now ms old
408408
atomicWriteIORef (msgCount stats) stored
409409
atomicModifyIORef'_ (msgExpired stats) (+ expired)
410410
printMessageStats "STORE: messages" msgStats
411-
expireQueueMsgs now ms old q = fmap (fromRight newMessageStats) . runExceptT $ do
411+
expireQueueMsgs now ms old q = do
412412
(expired_, stored) <- idleDeleteExpiredMsgs now ms q old
413413
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}
414414

@@ -1806,19 +1806,18 @@ exportMessages :: MsgStoreClass s => Bool -> s -> FilePath -> Bool -> IO ()
18061806
exportMessages tty ms f drainMsgs = do
18071807
logInfo $ "saving messages to file " <> T.pack f
18081808
liftIO $ withFile f WriteMode $ \h ->
1809-
tryAny (withAllMsgQueues tty ms $ saveQueueMsgs h) >>= \case
1809+
tryAny (unsafeWithAllMsgQueues tty ms $ saveQueueMsgs h) >>= \case
18101810
Right (Sum total) -> logInfo $ "messages saved: " <> tshow total
18111811
Left e -> do
18121812
logError $ "error exporting messages: " <> tshow e
18131813
exitFailure
18141814
where
18151815
saveQueueMsgs h q = do
1816-
let rId = recipientId q
1817-
runExceptT (getQueueMessages drainMsgs ms q) >>= \case
1818-
Right msgs -> Sum (length msgs) <$ BLD.hPutBuilder h (encodeMessages rId msgs)
1819-
Left e -> do
1820-
logError $ "STORE: saveQueueMsgs, error exporting messages from queue " <> decodeLatin1 (strEncode rId) <> ", " <> tshow e
1821-
exitFailure
1816+
msgs <-
1817+
unsafeRunStore q "saveQueueMsgs" $
1818+
getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
1819+
BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
1820+
pure $ Sum $ length msgs
18221821
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
18231822

18241823
processServerMessages :: StartOptions -> M (Maybe MessageStats)
@@ -1838,33 +1837,23 @@ processServerMessages StartOptions {skipWarnings} = do
18381837
| expire = Just <$> case old_ of
18391838
Just old -> do
18401839
logInfo "expiring journal store messages..."
1841-
withAllMsgQueues False ms $ processExpireQueue old
1840+
run $ processExpireQueue old
18421841
Nothing -> do
18431842
logInfo "validating journal store messages..."
1844-
withAllMsgQueues False ms $ processValidateQueue
1843+
run processValidateQueue
18451844
| otherwise = logWarn "skipping message expiration" $> Nothing
18461845
where
1846+
run a = unsafeWithAllMsgQueues False ms a `catchAny` \_ -> exitFailure
18471847
processExpireQueue :: Int64 -> JournalQueue s -> IO MessageStats
1848-
processExpireQueue old q =
1849-
runExceptT expireQueue >>= \case
1850-
Right (storedMsgsCount, expiredMsgsCount) ->
1851-
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues = 1}
1852-
Left e -> do
1853-
logError $ "STORE: processExpireQueue, failed expiring messages in queue, " <> tshow e
1854-
exitFailure
1855-
where
1856-
expireQueue = do
1857-
expired'' <- deleteExpiredMsgs ms q old
1858-
stored'' <- getQueueSize ms q
1859-
liftIO $ closeMsgQueue q
1860-
pure (stored'', expired'')
1848+
processExpireQueue old q = unsafeRunStore q "processExpireQueue" $ do
1849+
mq <- getMsgQueue ms q False
1850+
expiredMsgsCount <- deleteExpireMsgs_ old q mq
1851+
storedMsgsCount <- getQueueSize_ mq
1852+
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues = 1}
18611853
processValidateQueue :: JournalQueue s -> IO MessageStats
1862-
processValidateQueue q =
1863-
runExceptT (getQueueSize ms q) >>= \case
1864-
Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1}
1865-
Left e -> do
1866-
logError $ "STORE: processValidateQueue, failed opening message queue, " <> tshow e
1867-
exitFailure
1854+
processValidateQueue q = unsafeRunStore q "processValidateQueue" $ do
1855+
storedMsgsCount <- getQueueSize_ =<< getMsgQueue ms q False
1856+
pure newMessageStats {storedMsgsCount, storedQueues = 1}
18681857

18691858
importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> Bool -> IO MessageStats
18701859
importMessages tty ms f old_ skipWarnings = do

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 74 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import Control.Monad.Trans.Except
5555
import qualified Data.Attoparsec.ByteString.Char8 as A
5656
import Data.ByteString.Char8 (ByteString)
5757
import qualified Data.ByteString.Char8 as B
58+
import Data.Either (fromRight)
5859
import Data.Functor (($>))
5960
import Data.Int (Int64)
6061
import Data.List (intercalate, sort)
@@ -65,10 +66,11 @@ import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
6566
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
6667
import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM)
6768
import GHC.IO (catchAny)
68-
import Simplex.Messaging.Agent.Client (getMapLock, withLockMap)
69+
import Simplex.Messaging.Agent.Client (getMapLock)
6970
import Simplex.Messaging.Agent.Lock
7071
import Simplex.Messaging.Encoding.String
7172
import Simplex.Messaging.Protocol
73+
import Simplex.Messaging.Server.MsgStore.Journal.SharedLock
7274
import Simplex.Messaging.Server.MsgStore.Types
7375
import Simplex.Messaging.Server.QueueStore
7476
import Simplex.Messaging.Server.QueueStore.Postgres
@@ -87,6 +89,7 @@ data JournalMsgStore s = JournalMsgStore
8789
{ config :: JournalStoreConfig s,
8890
random :: TVar StdGen,
8991
queueLocks :: TMap RecipientId Lock,
92+
sharedLock :: TMVar RecipientId,
9093
queueStore_ :: QStore s,
9194
expireBackupsBefore :: UTCTime
9295
}
@@ -138,6 +141,7 @@ data QStoreCfg s where
138141
data JournalQueue (s :: QSType) = JournalQueue
139142
{ recipientId' :: RecipientId,
140143
queueLock :: Lock,
144+
sharedLock :: TMVar RecipientId,
141145
-- To avoid race conditions and errors when restoring queues,
142146
-- Nothing is written to TVar when queue is deleted.
143147
queueRec' :: TVar (Maybe QueueRec),
@@ -276,7 +280,8 @@ instance StoreQueueClass (JournalQueue s) where
276280
msgQueue = msgQueue'
277281
{-# INLINE msgQueue #-}
278282
withQueueLock :: JournalQueue s -> String -> IO a -> IO a
279-
withQueueLock = withLock' . queueLock
283+
withQueueLock JournalQueue {recipientId', queueLock, sharedLock} =
284+
withLockWaitShared recipientId' queueLock sharedLock
280285
{-# INLINE withQueueLock #-}
281286

282287
instance QueueStoreClass (JournalQueue s) (QStore s) where
@@ -316,6 +321,27 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
316321
deleteStoreQueue = withQS deleteStoreQueue
317322
{-# INLINE deleteStoreQueue #-}
318323

324+
mkTempQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s)
325+
mkTempQueue ms rId qr = createLockIO >>= makeQueue_ ms rId qr
326+
{-# INLINE mkTempQueue #-}
327+
328+
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
329+
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do
330+
queueRec' <- newTVarIO $ Just qr
331+
msgQueue' <- newTVarIO Nothing
332+
activeAt <- newTVarIO 0
333+
queueState <- newTVarIO Nothing
334+
pure $
335+
JournalQueue
336+
{ recipientId' = rId,
337+
queueLock,
338+
sharedLock,
339+
queueRec',
340+
msgQueue',
341+
activeAt,
342+
queueState
343+
}
344+
319345
instance MsgStoreClass (JournalMsgStore s) where
320346
type StoreMonad (JournalMsgStore s) = StoreIO s
321347
type QueueStore (JournalMsgStore s) = QStore s
@@ -326,9 +352,10 @@ instance MsgStoreClass (JournalMsgStore s) where
326352
newMsgStore config@JournalStoreConfig {queueStoreCfg} = do
327353
random <- newTVarIO =<< newStdGen
328354
queueLocks <- TM.emptyIO
355+
sharedLock <- newEmptyTMVarIO
329356
queueStore_ <- newQueueStore @(JournalQueue s) queueStoreCfg
330357
expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime
331-
pure JournalMsgStore {config, random, queueLocks, queueStore_, expireBackupsBefore}
358+
pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, expireBackupsBefore}
332359

333360
closeMsgStore :: JournalMsgStore s -> IO ()
334361
closeMsgStore ms = do
@@ -341,10 +368,32 @@ instance MsgStoreClass (JournalMsgStore s) where
341368
withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
342369
withActiveMsgQueues = withQS withLoadedQueues . queueStore_
343370

344-
withAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
345-
withAllMsgQueues tty ms action = case queueStore_ ms of
346-
MQStore st -> withLoadedQueues st action
347-
PQStore st -> foldQueues tty st (mkQueue ms) action
371+
-- This function can only be used in server CLI commands or before server is started.
372+
-- It does not cache queues and is NOT concurrency safe.
373+
unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
374+
unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of
375+
MQStore st -> withLoadedQueues st run
376+
PQStore st -> foldQueueRecs tty st $ uncurry (mkTempQueue ms) >=> run
377+
where
378+
run q = do
379+
r <- action q
380+
closeMsgQueue q
381+
pure r
382+
383+
-- This function is concurrency safe, it is used to expire queues.
384+
withAllMsgQueues :: forall a. Monoid a => Bool -> String -> JournalMsgStore s -> (JournalQueue s -> StoreIO s a) -> IO a
385+
withAllMsgQueues tty op ms@JournalMsgStore {queueLocks, sharedLock} action = case queueStore_ ms of
386+
MQStore st ->
387+
withLoadedQueues st $ \q ->
388+
run $ isolateQueue q op $ action q
389+
PQStore st ->
390+
foldQueueRecs tty st $ \(rId, qr) -> do
391+
q <- mkTempQueue ms rId qr
392+
withSharedWaitLock rId queueLocks sharedLock $
393+
run $ tryStore' op rId $ unStoreIO $ action q
394+
where
395+
run :: ExceptT ErrorType IO a -> IO a
396+
run = fmap (fromRight mempty) . runExceptT
348397

349398
logQueueStates :: JournalMsgStore s -> IO ()
350399
logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState
@@ -361,20 +410,11 @@ instance MsgStoreClass (JournalMsgStore s) where
361410

362411
mkQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s)
363412
mkQueue ms rId qr = do
364-
queueLock <- atomically $ getMapLock (queueLocks ms) rId
365-
queueRec' <- newTVarIO $ Just qr
366-
msgQueue' <- newTVarIO Nothing
367-
activeAt <- newTVarIO 0
368-
queueState <- newTVarIO Nothing
369-
pure $
370-
JournalQueue
371-
{ recipientId' = rId,
372-
queueLock,
373-
queueRec',
374-
msgQueue',
375-
activeAt,
376-
queueState
377-
}
413+
lock <- atomically $ getMapLock (queueLocks ms) rId
414+
makeQueue_ ms rId qr lock
415+
416+
getLoadedQueue :: JournalMsgStore s -> JournalQueue s -> StoreIO s (JournalQueue s)
417+
getLoadedQueue ms sq = StoreIO $ fromMaybe sq <$> TM.lookupIO (recipientId sq) (loadedQueues $ queueStore_ ms)
378418

379419
getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
380420
getMsgQueue ms@JournalMsgStore {random} q'@JournalQueue {recipientId' = rId, msgQueue'} forWrite =
@@ -546,8 +586,11 @@ instance MsgStoreClass (JournalMsgStore s) where
546586
$>>= \hs -> updateReadPos q mq logState len hs $> Just ()
547587

548588
isolateQueue :: JournalQueue s -> String -> StoreIO s a -> ExceptT ErrorType IO a
549-
isolateQueue JournalQueue {recipientId' = rId, queueLock} op a =
550-
tryStore' op rId $ withLock' queueLock op $ unStoreIO a
589+
isolateQueue sq op = tryStore' op (recipientId' sq) . withQueueLock sq op . unStoreIO
590+
591+
unsafeRunStore :: JournalQueue s -> String -> StoreIO s a -> IO a
592+
unsafeRunStore sq op a =
593+
unStoreIO a `E.catch` \e -> storeError op (recipientId' sq) e >> E.throwIO e
551594

552595
updateActiveAt :: JournalQueue s -> IO ()
553596
updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime
@@ -556,15 +599,16 @@ tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a
556599
tryStore' op rId = tryStore op rId . fmap Right
557600

558601
tryStore :: forall a. String -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
559-
tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure
560-
where
561-
storeErr :: E.SomeException -> IO (Either ErrorType a)
562-
storeErr e =
563-
let e' = intercalate ", " [op, B.unpack $ strEncode rId, show e]
564-
in logError ("STORE: " <> T.pack e') $> Left (STORE e')
602+
tryStore op rId a = ExceptT $ E.mask_ $ a `E.catch` storeError op rId
603+
604+
storeError :: String -> RecipientId -> E.SomeException -> IO (Either ErrorType a)
605+
storeError op rId e =
606+
let e' = intercalate ", " [op, B.unpack $ strEncode rId, show e]
607+
in logError ("STORE: " <> T.pack e') $> Left (STORE e')
565608

566609
isolateQueueId :: String -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
567-
isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op
610+
isolateQueueId op JournalMsgStore {queueLocks, sharedLock} rId =
611+
tryStore op rId . withLockMapWaitShared rId queueLocks sharedLock op
568612

569613
openMsgQueue :: JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
570614
openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, statePath} forWrite = do
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
module Simplex.Messaging.Server.MsgStore.Journal.SharedLock
2+
( withLockWaitShared,
3+
withLockMapWaitShared,
4+
withSharedWaitLock,
5+
)
6+
where
7+
8+
import Control.Concurrent.STM
9+
import qualified Control.Exception as E
10+
import Control.Monad
11+
import Simplex.Messaging.Agent.Lock
12+
import Simplex.Messaging.Agent.Client (getMapLock)
13+
import Simplex.Messaging.Protocol (RecipientId)
14+
import Simplex.Messaging.TMap (TMap)
15+
import qualified Simplex.Messaging.TMap as TM
16+
import Simplex.Messaging.Util (($>>), ($>>=))
17+
18+
-- wait until shared lock with passed ID is released and take lock
19+
withLockWaitShared :: RecipientId -> Lock -> TMVar RecipientId -> String -> IO a -> IO a
20+
withLockWaitShared rId lock shared name =
21+
E.bracket_
22+
(atomically $ waitShared rId shared >> putTMVar lock name)
23+
(void $ atomically $ takeTMVar lock)
24+
25+
-- wait until shared lock with passed ID is released and take lock from Map for this ID
26+
withLockMapWaitShared :: RecipientId -> TMap RecipientId Lock -> TMVar RecipientId -> String -> IO a -> IO a
27+
withLockMapWaitShared rId locks shared name a =
28+
E.bracket
29+
(atomically $ waitShared rId shared >> getPutLock (getMapLock locks) rId name)
30+
(atomically . takeTMVar)
31+
(const a)
32+
33+
waitShared :: RecipientId -> TMVar RecipientId -> STM ()
34+
waitShared rId shared = tryReadTMVar shared >>= mapM_ (\rId' -> when (rId == rId') retry)
35+
36+
-- wait until lock with passed ID in Map is released and take shared lock for this ID
37+
withSharedWaitLock :: RecipientId -> TMap RecipientId Lock -> TMVar RecipientId -> IO a -> IO a
38+
withSharedWaitLock rId locks shared =
39+
E.bracket_
40+
(atomically $ waitLock >> putTMVar shared rId)
41+
(atomically $ takeTMVar shared)
42+
where
43+
waitLock = TM.lookup rId locks $>>= tryReadTMVar $>> retry

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ instance MsgStoreClass STMMsgStore where
8080
{-# INLINE closeMsgStore #-}
8181
withActiveMsgQueues = withLoadedQueues . queueStore_
8282
{-# INLINE withActiveMsgQueues #-}
83-
withAllMsgQueues _ = withLoadedQueues . queueStore_
83+
unsafeWithAllMsgQueues _ = withLoadedQueues . queueStore_
84+
{-# INLINE unsafeWithAllMsgQueues #-}
85+
withAllMsgQueues _tty _op ms action = withLoadedQueues (queueStore_ ms) $ atomically . action
8486
{-# INLINE withAllMsgQueues #-}
8587
logQueueStates _ = pure ()
8688
{-# INLINE logQueueStates #-}
@@ -92,6 +94,10 @@ instance MsgStoreClass STMMsgStore where
9294
mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing
9395
{-# INLINE mkQueue #-}
9496

97+
getLoadedQueue :: STMMsgStore -> STMQueue -> STM STMQueue
98+
getLoadedQueue _ = pure
99+
{-# INLINE getLoadedQueue #-}
100+
95101
getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue
96102
getMsgQueue _ STMQueue {msgQueue'} _ = readTVar msgQueue' >>= maybe newQ pure
97103
where
@@ -168,3 +174,8 @@ instance MsgStoreClass STMMsgStore where
168174

169175
isolateQueue :: STMQueue -> String -> STM a -> ExceptT ErrorType IO a
170176
isolateQueue _ _ = liftIO . atomically
177+
{-# INLINE isolateQueue #-}
178+
179+
unsafeRunStore :: STMQueue -> String -> STM a -> IO a
180+
unsafeRunStore _ _ = atomically
181+
{-# INLINE unsafeRunStore #-}

0 commit comments

Comments
 (0)