Skip to content

Commit 45333bd

Browse files
authored
smp server: do not open/read journal message queues that are known to be empty (#1406)
* smp server: do not open/read journal message queues that are known to be empty * cleanup * version * close empty queues on first subscription * revert version
1 parent bbcb1ab commit 45333bd

File tree

5 files changed

+86
-45
lines changed

5 files changed

+86
-45
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,9 +1821,9 @@ importMessages tty ms f old_ = do
18211821
mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota)
18221822
where
18231823
-- if the first message in queue head is "quota", remove it.
1824-
mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ \mq ->
1825-
tryPeekMsg_ mq >>= \case
1826-
Just MessageQuota {} -> tryDeleteMsg_ q mq False
1824+
mergeQuotaMsgs = withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \(mq, msg) ->
1825+
case msg of
1826+
MessageQuota {} -> tryDeleteMsg_ q mq False
18271827
_ -> pure ()
18281828
msgErr :: Show e => String -> e -> String
18291829
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@
1111
{-# LANGUAGE OverloadedStrings #-}
1212
{-# LANGUAGE ScopedTypeVariables #-}
1313
{-# LANGUAGE StandaloneDeriving #-}
14-
{-# LANGUAGE TupleSections #-}
1514
{-# LANGUAGE TypeFamilies #-}
15+
{-# LANGUAGE TupleSections #-}
1616

1717
module Simplex.Messaging.Server.MsgStore.Journal
1818
( JournalMsgStore (queues, senders, notifiers, random),
1919
JournalQueue,
2020
JournalMsgQueue (queue, state),
2121
JMQueue (queueDirectory, statePath),
2222
JournalStoreConfig (..),
23-
getQueueMessages,
2423
closeMsgQueue,
2524
closeMsgQueueHandles,
2625
-- below are exported for tests
@@ -50,7 +49,7 @@ import qualified Data.ByteString.Char8 as B
5049
import Data.Functor (($>))
5150
import Data.Int (Int64)
5251
import Data.List (intercalate)
53-
import Data.Maybe (catMaybes, fromMaybe)
52+
import Data.Maybe (catMaybes, fromMaybe, isNothing)
5453
import qualified Data.Text as T
5554
import Data.Time.Clock (getCurrentTime)
5655
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
@@ -105,7 +104,9 @@ data JournalQueue = JournalQueue
105104
queueRec :: TVar (Maybe QueueRec),
106105
msgQueue_ :: TVar (Maybe JournalMsgQueue),
107106
-- system time in seconds since epoch
108-
activeAt :: TVar Int64
107+
activeAt :: TVar Int64,
108+
-- Just True - empty, Just False - non-empty, Nothing - unknown
109+
isEmpty :: TVar (Maybe Bool)
109110
}
110111

111112
data JMQueue = JMQueue
@@ -224,10 +225,11 @@ instance STMQueueStore JournalMsgStore where
224225
storeLog' = storeLog
225226
mkQueue st qr = do
226227
lock <- getMapLock (queueLocks st) $ recipientId qr
227-
q <- newTVar $! Just qr
228+
q <- newTVar $ Just qr
228229
mq <- newTVar Nothing
229230
activeAt <- newTVar 0
230-
pure $ JournalQueue lock q mq activeAt
231+
isEmpty <- newTVar Nothing
232+
pure $ JournalQueue lock q mq activeAt isEmpty
231233
msgQueue_' = msgQueue_
232234

233235
instance MsgStoreClass JournalMsgStore where
@@ -322,7 +324,7 @@ instance MsgStoreClass JournalMsgStore where
322324
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
323325
queue = JMQueue {queueDirectory = dir, statePath}
324326
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue)
325-
atomically $ writeTVar msgQueue_ $! Just q
327+
atomically $ writeTVar msgQueue_ $ Just q
326328
pure q
327329
where
328330
createQ :: JMQueue -> IO JournalMsgQueue
@@ -332,14 +334,39 @@ instance MsgStoreClass JournalMsgStore where
332334
journalId <- newJournalId random
333335
mkJournalQueue queue (newMsgQueueState journalId) Nothing
334336

337+
getPeekMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message))
338+
getPeekMsgQueue ms rId q@JournalQueue {isEmpty} =
339+
StoreIO (readTVarIO isEmpty) >>= \case
340+
Just True -> pure Nothing
341+
Just False -> peek
342+
Nothing -> do
343+
-- We only close the queue if we just learnt it's empty.
344+
-- This is needed to reduce file descriptors and memory usage
345+
-- after the server just started and many clients subscribe.
346+
-- In case the queue became non-empty on write and then again empty on read
347+
-- we won't be closing it, to avoid frequent open/close on active queues.
348+
r <- peek
349+
when (isNothing r) $ StoreIO $ closeMsgQueue q
350+
pure r
351+
where
352+
peek = do
353+
mq <- getMsgQueue ms rId q
354+
(mq,) <$$> tryPeekMsg_ q mq
355+
356+
-- only runs action if queue is not empty
335357
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
336358
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
337359
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
338360
Nothing ->
339-
E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do
340-
r <- action mq
341-
sz <- getQueueSize_ mq
342-
pure (Just r, sz)
361+
E.bracket
362+
(unStoreIO $ getPeekMsgQueue ms rId q)
363+
(mapM_ $ \_ -> closeMsgQueue q)
364+
(maybe (pure (Nothing, 0)) (unStoreIO . run))
365+
where
366+
run (mq, _) = do
367+
r <- action mq
368+
sz <- getQueueSize_ mq
369+
pure (Just r, sz)
343370
Just mq -> do
344371
ts <- readTVarIO $ activeAt q
345372
r <- if now - ts >= idleInterval config
@@ -378,6 +405,7 @@ instance MsgStoreClass JournalMsgStore where
378405
let empty = size == 0
379406
if canWrite || empty
380407
then do
408+
atomically $ writeTVar (isEmpty q') (Just False)
381409
let canWrt' = quota > size
382410
if canWrt'
383411
then writeToJournal q st canWrt' msg $> Just (msg, empty)
@@ -426,16 +454,19 @@ instance MsgStoreClass JournalMsgStore where
426454
getQueueSize_ :: JournalMsgQueue -> StoreIO Int
427455
getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state
428456

429-
tryPeekMsg_ :: JournalMsgQueue -> StoreIO (Maybe Message)
430-
tryPeekMsg_ q@JournalMsgQueue {tipMsg, handles} =
431-
StoreIO $ readTVarIO handles $>>= chooseReadJournal q True $>>= peekMsg
457+
tryPeekMsg_ :: JournalQueue -> JournalMsgQueue -> StoreIO (Maybe Message)
458+
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} =
459+
StoreIO $ (readTVarIO handles $>>= chooseReadJournal mq True $>>= peekMsg) >>= setEmpty
432460
where
433461
peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
434462
where
435463
readMsg = do
436464
ml@(msg, _) <- hGetMsgAt h $ bytePos rs
437465
atomically $ writeTVar tipMsg $ Just (Just ml)
438466
pure $ Just msg
467+
setEmpty msg = do
468+
atomically $ writeTVar (isEmpty q) (Just $ isNothing msg)
469+
pure msg
439470

440471
tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
441472
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
{-# LANGUAGE BangPatterns #-}
21
{-# LANGUAGE ConstraintKinds #-}
32
{-# LANGUAGE DataKinds #-}
43
{-# LANGUAGE DuplicateRecordFields #-}
@@ -8,8 +7,8 @@
87
{-# LANGUAGE LambdaCase #-}
98
{-# LANGUAGE MultiParamTypeClasses #-}
109
{-# LANGUAGE NamedFieldPuns #-}
11-
{-# LANGUAGE TupleSections #-}
1210
{-# LANGUAGE TypeFamilies #-}
11+
{-# LANGUAGE TupleSections #-}
1312

1413
module Simplex.Messaging.Server.MsgStore.STM
1514
( STMMsgStore (..),
@@ -29,7 +28,7 @@ import Simplex.Messaging.Server.QueueStore.STM
2928
import Simplex.Messaging.Server.StoreLog
3029
import Simplex.Messaging.TMap (TMap)
3130
import qualified Simplex.Messaging.TMap as TM
32-
import Simplex.Messaging.Util ((<$$>))
31+
import Simplex.Messaging.Util ((<$$>), ($>>=))
3332
import System.IO (IOMode (..))
3433

3534
data STMMsgStore = STMMsgStore
@@ -63,7 +62,7 @@ instance STMQueueStore STMMsgStore where
6362
senders' = senders
6463
notifiers' = notifiers
6564
storeLog' = storeLog
66-
mkQueue _ qr = STMQueue <$> (newTVar $! Just qr) <*> newTVar Nothing
65+
mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing
6766
msgQueue_' = msgQueue_
6867

6968
instance MsgStoreClass STMMsgStore where
@@ -106,9 +105,12 @@ instance MsgStoreClass STMMsgStore where
106105
canWrite <- newTVar True
107106
size <- newTVar 0
108107
let q = STMMsgQueue {msgQueue, canWrite, size}
109-
writeTVar msgQueue_ $! Just q
108+
writeTVar msgQueue_ (Just q)
110109
pure q
111110

111+
getPeekMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
112+
getPeekMsgQueue _ _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq
113+
112114
-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
113115
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
114116
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
@@ -159,8 +161,8 @@ instance MsgStoreClass STMMsgStore where
159161
getQueueSize_ :: STMMsgQueue -> STM Int
160162
getQueueSize_ STMMsgQueue {size} = readTVar size
161163

162-
tryPeekMsg_ :: STMMsgQueue -> STM (Maybe Message)
163-
tryPeekMsg_ = tryPeekTQueue . msgQueue
164+
tryPeekMsg_ :: STMQueue -> STMMsgQueue -> STM (Maybe Message)
165+
tryPeekMsg_ _ = tryPeekTQueue . msgQueue
164166
{-# INLINE tryPeekMsg_ #-}
165167

166168
tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()

src/Simplex/Messaging/Server/MsgStore/Types.hs

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@
44
{-# LANGUAGE FlexibleContexts #-}
55
{-# LANGUAGE GADTs #-}
66
{-# LANGUAGE LambdaCase #-}
7+
{-# LANGUAGE MultiWayIf #-}
78
{-# LANGUAGE NamedFieldPuns #-}
89
{-# LANGUAGE TupleSections #-}
910
{-# LANGUAGE TypeFamilyDependencies #-}
11+
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
12+
13+
{-# HLINT ignore "Redundant multi-way if" #-}
1014

1115
module Simplex.Messaging.Server.MsgStore.Types where
1216

1317
import Control.Concurrent.STM
1418
import Control.Monad (foldM)
1519
import Control.Monad.Trans.Except
20+
import Data.Functor (($>))
1621
import Data.Int (Int64)
1722
import Data.Kind
1823
import qualified Data.Map.Strict as M
@@ -21,6 +26,7 @@ import Simplex.Messaging.Protocol
2126
import Simplex.Messaging.Server.QueueStore
2227
import Simplex.Messaging.Server.StoreLog.Types
2328
import Simplex.Messaging.TMap (TMap)
29+
import Simplex.Messaging.Util ((<$$>))
2430
import System.IO (IOMode (..))
2531

2632
class MsgStoreClass s => STMQueueStore s where
@@ -44,7 +50,9 @@ class Monad (StoreMonad s) => MsgStoreClass s where
4450
logQueueStates :: s -> IO ()
4551
logQueueState :: StoreQueue s -> StoreMonad s ()
4652
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
53+
getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
4754
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
55+
4856
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
4957
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
5058
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
@@ -53,7 +61,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
5361
writeMsg :: s -> RecipientId -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
5462
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
5563
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
56-
tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message)
64+
tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
5765
tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
5866
isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a
5967

@@ -73,39 +81,39 @@ withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty
7381
pure $! acc <> r
7482

7583
getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
76-
getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueMessages_ drainMsgs
84+
getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst)
7785
{-# INLINE getQueueMessages #-}
7886

7987
getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
80-
getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueSize_
88+
getQueueSize st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
8189
{-# INLINE getQueueSize #-}
8290

8391
tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
84-
tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ tryPeekMsg_
92+
tryPeekMsg st rId q = snd <$$> withPeekMsgQueue st rId q "tryPeekMsg" pure
8593
{-# INLINE tryPeekMsg #-}
8694

8795
tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
8896
tryDelMsg st rId q msgId' =
89-
withMsgQueue st rId q "tryDelMsg" $ \mq ->
90-
tryPeekMsg_ mq >>= \case
91-
msg_@(Just msg)
97+
withPeekMsgQueue st rId q "tryDelMsg" $
98+
maybe (pure Nothing) $ \(mq, msg) ->
99+
if
92100
| messageId msg == msgId' ->
93-
tryDeleteMsg_ q mq True >> pure msg_
94-
_ -> pure Nothing
101+
tryDeleteMsg_ q mq True $> Just msg
102+
| otherwise -> pure Nothing
95103

96104
-- atomic delete (== read) last and peek next message if available
97105
tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
98106
tryDelPeekMsg st rId q msgId' =
99-
withMsgQueue st rId q "tryDelPeekMsg" $ \mq ->
100-
tryPeekMsg_ mq >>= \case
101-
msg_@(Just msg)
102-
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ mq)
103-
| otherwise -> pure (Nothing, msg_)
104-
_ -> pure (Nothing, Nothing)
107+
withPeekMsgQueue st rId q "tryDelPeekMsg" $
108+
maybe (pure (Nothing, Nothing)) $ \(mq, msg) ->
109+
if
110+
| messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
111+
| otherwise -> pure (Nothing, Just msg)
105112

106-
withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (MsgQueue s -> StoreMonad s a) -> ExceptT ErrorType IO a
107-
withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a
108-
{-# INLINE withMsgQueue #-}
113+
-- The action is called with Nothing when it is known that the queue is empty
114+
withPeekMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
115+
withPeekMsgQueue st rId q op a = isolateQueue rId q op $ getPeekMsgQueue st rId q >>= a
116+
{-# INLINE withPeekMsgQueue #-}
109117

110118
deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
111119
deleteExpiredMsgs st rId q old =
@@ -126,7 +134,7 @@ deleteExpireMsgs_ old q mq = do
126134
pure n
127135
where
128136
loop dc =
129-
tryPeekMsg_ mq >>= \case
137+
tryPeekMsg_ q mq >>= \case
130138
Just Message {msgTs}
131139
| systemSeconds msgTs < old ->
132140
tryDeleteMsg_ q mq False >> loop (dc + 1)

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ secureQueue st sq sKey =
8484
secure q@QueueRec {recipientId = rId} = case senderKey q of
8585
Just k -> pure $ if sKey == k then Right rId else Left AUTH
8686
Nothing -> do
87-
writeTVar qr $! Just q {senderKey = Just sKey}
87+
writeTVar qr $ Just q {senderKey = Just sKey}
8888
pure $ Right rId
8989

9090
addQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
@@ -96,7 +96,7 @@ addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} =
9696
add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
9797
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId
9898
let !q' = q {notifier = Just ntfCreds}
99-
writeTVar qr $! Just q'
99+
writeTVar qr $ Just q'
100100
TM.insert nId rId $ notifiers' st
101101
pure $ Right (rId, nId_)
102102

0 commit comments

Comments
 (0)