Skip to content

Commit e929ec5

Browse files
committed
close empty queues on first subscription
1 parent 679b2c8 commit e929ec5

File tree

5 files changed

+58
-39
lines changed

5 files changed

+58
-39
lines changed

simplexmq.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cabal-version: 1.12
22

33
name: simplexmq
4-
version: 6.2.0.400
4+
version: 6.2.0.401
55
synopsis: SimpleXMQ message broker
66
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
77
<./docs/Simplex-Messaging-Client.html client> and

src/Simplex/Messaging/Server.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,9 +1821,9 @@ importMessages tty ms f old_ = do
18211821
mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota)
18221822
where
18231823
-- if the first message in queue head is "quota", remove it.
1824-
mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \mq ->
1825-
tryPeekMsg_ q mq >>= \case
1826-
Just MessageQuota {} -> tryDeleteMsg_ q mq False
1824+
mergeQuotaMsgs = withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \(mq, msg) ->
1825+
case msg of
1826+
MessageQuota {} -> tryDeleteMsg_ q mq False
18271827
_ -> pure ()
18281828
msgErr :: Show e => String -> e -> String
18291829
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
{-# LANGUAGE ScopedTypeVariables #-}
1313
{-# LANGUAGE StandaloneDeriving #-}
1414
{-# LANGUAGE TypeFamilies #-}
15+
{-# LANGUAGE TupleSections #-}
1516

1617
module Simplex.Messaging.Server.MsgStore.Journal
1718
( JournalMsgStore (queues, senders, notifiers, random),
@@ -104,8 +105,8 @@ data JournalQueue = JournalQueue
104105
msgQueue_ :: TVar (Maybe JournalMsgQueue),
105106
-- system time in seconds since epoch
106107
activeAt :: TVar Int64,
107-
-- True - empty, False - non-empty or unknown
108-
isEmpty :: TVar Bool
108+
-- Just True - empty, Just False - non-empty, Nothing - unknown
109+
isEmpty :: TVar (Maybe Bool)
109110
}
110111

111112
data JMQueue = JMQueue
@@ -227,7 +228,7 @@ instance STMQueueStore JournalMsgStore where
227228
q <- newTVar $ Just qr
228229
mq <- newTVar Nothing
229230
activeAt <- newTVar 0
230-
isEmpty <- newTVar False
231+
isEmpty <- newTVar Nothing
231232
pure $ JournalQueue lock q mq activeAt isEmpty
232233
msgQueue_' = msgQueue_
233234

@@ -333,24 +334,36 @@ instance MsgStoreClass JournalMsgStore where
333334
journalId <- newJournalId random
334335
mkJournalQueue queue (newMsgQueueState journalId) Nothing
335336

336-
getNonEmptyMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe JournalMsgQueue)
337-
getNonEmptyMsgQueue ms rId q@JournalQueue {isEmpty} =
338-
ifM
339-
(StoreIO $ readTVarIO isEmpty)
340-
(pure Nothing)
341-
(Just <$> getMsgQueue ms rId q)
337+
getPeekMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message))
338+
getPeekMsgQueue ms rId q@JournalQueue {isEmpty} =
339+
StoreIO (readTVarIO isEmpty) >>= \case
340+
Just True -> pure Nothing
341+
Just False -> peek
342+
Nothing -> do
343+
-- We only close the queue if we just learnt it's empty.
344+
-- This is needed to reduce file descriptors and memory usage
345+
-- after the server just started and many clients subscribe.
346+
-- In case the queue became non-empty on write and then again empty on read
347+
-- we won't be closing it, to avoid frequent open/close on active queues.
348+
r <- peek
349+
when (isNothing r) $ StoreIO $ closeMsgQueue q
350+
pure r
351+
where
352+
peek = do
353+
mq <- getMsgQueue ms rId q
354+
(mq,) <$$> tryPeekMsg_ q mq
342355

343356
-- only runs action if queue is not empty
344357
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
345358
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
346359
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
347360
Nothing ->
348361
E.bracket
349-
(unStoreIO $ getNonEmptyMsgQueue ms rId q)
362+
(unStoreIO $ getPeekMsgQueue ms rId q)
350363
(mapM_ $ \_ -> closeMsgQueue q)
351364
(maybe (pure (Nothing, 0)) (unStoreIO . run))
352365
where
353-
run mq = do
366+
run (mq, _) = do
354367
r <- action mq
355368
sz <- getQueueSize_ mq
356369
pure (Just r, sz)
@@ -392,7 +405,7 @@ instance MsgStoreClass JournalMsgStore where
392405
let empty = size == 0
393406
if canWrite || empty
394407
then do
395-
atomically $ writeTVar (isEmpty q') False
408+
atomically $ writeTVar (isEmpty q') (Just False)
396409
let canWrt' = quota > size
397410
if canWrt'
398411
then writeToJournal q st canWrt' msg $> Just (msg, empty)
@@ -452,7 +465,7 @@ instance MsgStoreClass JournalMsgStore where
452465
atomically $ writeTVar tipMsg $ Just (Just ml)
453466
pure $ Just msg
454467
setEmpty msg = do
455-
atomically $ writeTVar (isEmpty q) (isNothing msg)
468+
atomically $ writeTVar (isEmpty q) (Just $ isNothing msg)
456469
pure msg
457470

458471
tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
{-# LANGUAGE MultiParamTypeClasses #-}
99
{-# LANGUAGE NamedFieldPuns #-}
1010
{-# LANGUAGE TypeFamilies #-}
11+
{-# LANGUAGE TupleSections #-}
1112

1213
module Simplex.Messaging.Server.MsgStore.STM
1314
( STMMsgStore (..),
@@ -27,7 +28,7 @@ import Simplex.Messaging.Server.QueueStore.STM
2728
import Simplex.Messaging.Server.StoreLog
2829
import Simplex.Messaging.TMap (TMap)
2930
import qualified Simplex.Messaging.TMap as TM
30-
import Simplex.Messaging.Util ((<$$>))
31+
import Simplex.Messaging.Util ((<$$>), ($>>=))
3132
import System.IO (IOMode (..))
3233

3334
data STMMsgStore = STMMsgStore
@@ -107,9 +108,8 @@ instance MsgStoreClass STMMsgStore where
107108
writeTVar msgQueue_ (Just q)
108109
pure q
109110

110-
getNonEmptyMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe STMMsgQueue)
111-
getNonEmptyMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_
112-
{-# INLINE getNonEmptyMsgQueue #-}
111+
getPeekMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
112+
getPeekMsgQueue _ _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq
113113

114114
-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
115115
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)

src/Simplex/Messaging/Server/MsgStore/Types.hs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@
44
{-# LANGUAGE FlexibleContexts #-}
55
{-# LANGUAGE GADTs #-}
66
{-# LANGUAGE LambdaCase #-}
7+
{-# LANGUAGE MultiWayIf #-}
78
{-# LANGUAGE NamedFieldPuns #-}
89
{-# LANGUAGE TupleSections #-}
910
{-# LANGUAGE TypeFamilyDependencies #-}
11+
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
12+
13+
{-# HLINT ignore "Redundant multi-way if" #-}
1014

1115
module Simplex.Messaging.Server.MsgStore.Types where
1216

@@ -21,6 +25,7 @@ import Simplex.Messaging.Protocol
2125
import Simplex.Messaging.Server.QueueStore
2226
import Simplex.Messaging.Server.StoreLog.Types
2327
import Simplex.Messaging.TMap (TMap)
28+
import Simplex.Messaging.Util ((<$$>))
2429
import System.IO (IOMode (..))
2530

2631
class MsgStoreClass s => STMQueueStore s where
@@ -44,8 +49,9 @@ class Monad (StoreMonad s) => MsgStoreClass s where
4449
logQueueStates :: s -> IO ()
4550
logQueueState :: StoreQueue s -> StoreMonad s ()
4651
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
47-
getNonEmptyMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s))
52+
getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
4853
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
54+
4955
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
5056
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
5157
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
@@ -74,39 +80,39 @@ withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty
7480
pure $! acc <> r
7581

7682
getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
77-
getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ maybe (pure []) $ getQueueMessages_ drainMsgs
83+
getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst)
7884
{-# INLINE getQueueMessages #-}
7985

8086
getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
81-
getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ maybe (pure 0) getQueueSize_
87+
getQueueSize st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
8288
{-# INLINE getQueueSize #-}
8389

8490
tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
85-
tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ maybe (pure Nothing) (tryPeekMsg_ q)
91+
tryPeekMsg st rId q = snd <$$> withPeekMsgQueue st rId q "tryPeekMsg" pure
8692
{-# INLINE tryPeekMsg #-}
8793

8894
tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
89-
tryDelMsg st rId q msgId' = withMsgQueue st rId q "tryDelMsg" $ maybe (pure Nothing) $ \mq ->
90-
tryPeekMsg_ q mq >>= \case
91-
msg_@(Just msg)
95+
tryDelMsg st rId q msgId' =
96+
withPeekMsgQueue st rId q "tryDelMsg" $
97+
maybe (pure Nothing) $ \(mq, msg) ->
98+
if
9299
| messageId msg == msgId' ->
93-
tryDeleteMsg_ q mq True >> pure msg_
94-
_ -> pure Nothing
100+
tryDeleteMsg_ q mq True >> pure (Just msg)
101+
| otherwise -> pure Nothing
95102

96103
-- atomic delete (== read) last and peek next message if available
97104
tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
98105
tryDelPeekMsg st rId q msgId' =
99-
withMsgQueue st rId q "tryDelPeekMsg" $ maybe (pure (Nothing, Nothing)) $ \mq ->
100-
tryPeekMsg_ q mq >>= \case
101-
msg_@(Just msg)
102-
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
103-
| otherwise -> pure (Nothing, msg_)
104-
_ -> pure (Nothing, Nothing)
106+
withPeekMsgQueue st rId q "tryDelPeekMsg" $
107+
maybe (pure (Nothing, Nothing)) $ \(mq, msg) ->
108+
if
109+
| messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
110+
| otherwise -> pure (Nothing, Just msg)
105111

106112
-- The action is called with Nothing when it is known that the queue is empty
107-
withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s) -> StoreMonad s a) -> ExceptT ErrorType IO a
108-
withMsgQueue st rId q op a = isolateQueue rId q op $ getNonEmptyMsgQueue st rId q >>= a
109-
{-# INLINE withMsgQueue #-}
113+
withPeekMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
114+
withPeekMsgQueue st rId q op a = isolateQueue rId q op $ getPeekMsgQueue st rId q >>= a
115+
{-# INLINE withPeekMsgQueue #-}
110116

111117
deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
112118
deleteExpiredMsgs st rId q old =

0 commit comments

Comments
 (0)