44{-# LANGUAGE FlexibleContexts #-}
55{-# LANGUAGE GADTs #-}
66{-# LANGUAGE LambdaCase #-}
7+ {-# LANGUAGE MultiWayIf #-}
78{-# LANGUAGE NamedFieldPuns #-}
89{-# LANGUAGE TupleSections #-}
910{-# LANGUAGE TypeFamilyDependencies #-}
11+ {-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
12+
13+ {-# HLINT ignore "Redundant multi-way if" #-}
1014
1115module Simplex.Messaging.Server.MsgStore.Types where
1216
1317import Control.Concurrent.STM
1418import Control.Monad (foldM )
1519import Control.Monad.Trans.Except
20+ import Data.Functor (($>) )
1621import Data.Int (Int64 )
1722import Data.Kind
1823import qualified Data.Map.Strict as M
@@ -21,6 +26,7 @@ import Simplex.Messaging.Protocol
2126import Simplex.Messaging.Server.QueueStore
2227import Simplex.Messaging.Server.StoreLog.Types
2328import Simplex.Messaging.TMap (TMap )
29+ import Simplex.Messaging.Util ((<$$>) )
2430import System.IO (IOMode (.. ))
2531
2632class MsgStoreClass s => STMQueueStore s where
@@ -44,8 +50,9 @@ class Monad (StoreMonad s) => MsgStoreClass s where
4450 logQueueStates :: s -> IO ()
4551 logQueueState :: StoreQueue s -> StoreMonad s ()
4652 queueRec' :: StoreQueue s -> TVar (Maybe QueueRec )
47- getNonEmptyMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s ))
53+ getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s , Message ))
4854 getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s )
55+
4956 -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
5057 withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a ) -> StoreMonad s (Maybe a , Int )
5158 deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec )
@@ -74,39 +81,39 @@ withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty
7481 pure $! acc <> r
7582
7683getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message ]
77- getQueueMessages drainMsgs st rId q = withMsgQueue st rId q " getQueueSize" $ maybe (pure [] ) $ getQueueMessages_ drainMsgs
84+ getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q " getQueueSize" $ maybe (pure [] ) ( getQueueMessages_ drainMsgs . fst )
7885{-# INLINE getQueueMessages #-}
7986
8087getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
81- getQueueSize st rId q = withMsgQueue st rId q " getQueueSize" $ maybe (pure 0 ) getQueueSize_
88+ getQueueSize st rId q = withPeekMsgQueue st rId q " getQueueSize" $ maybe (pure 0 ) ( getQueueSize_ . fst )
8289{-# INLINE getQueueSize #-}
8390
8491tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message )
85- tryPeekMsg st rId q = withMsgQueue st rId q " tryPeekMsg" $ maybe ( pure Nothing ) (tryPeekMsg_ q)
92+ tryPeekMsg st rId q = snd <$$> withPeekMsgQueue st rId q " tryPeekMsg" pure
8693{-# INLINE tryPeekMsg #-}
8794
8895tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message )
89- tryDelMsg st rId q msgId' = withMsgQueue st rId q " tryDelMsg" $ maybe (pure Nothing ) $ \ mq ->
90- tryPeekMsg_ q mq >>= \ case
91- msg_@ (Just msg)
96+ tryDelMsg st rId q msgId' =
97+ withPeekMsgQueue st rId q " tryDelMsg" $
98+ maybe (pure Nothing ) $ \ (mq, msg) ->
99+ if
92100 | messageId msg == msgId' ->
93- tryDeleteMsg_ q mq True >> pure msg_
94- _ -> pure Nothing
101+ tryDeleteMsg_ q mq True $> Just msg
102+ | otherwise -> pure Nothing
95103
96104-- atomic delete (== read) last and peek next message if available
97105tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message , Maybe Message )
98106tryDelPeekMsg st rId q msgId' =
99- withMsgQueue st rId q " tryDelPeekMsg" $ maybe (pure (Nothing , Nothing )) $ \ mq ->
100- tryPeekMsg_ q mq >>= \ case
101- msg_@ (Just msg)
102- | messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
103- | otherwise -> pure (Nothing , msg_)
104- _ -> pure (Nothing , Nothing )
107+ withPeekMsgQueue st rId q " tryDelPeekMsg" $
108+ maybe (pure (Nothing , Nothing )) $ \ (mq, msg) ->
109+ if
110+ | messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
111+ | otherwise -> pure (Nothing , Just msg)
105112
106113-- The action is called with Nothing when it is known that the queue is empty
107- withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s ) -> StoreMonad s a ) -> ExceptT ErrorType IO a
108- withMsgQueue st rId q op a = isolateQueue rId q op $ getNonEmptyMsgQueue st rId q >>= a
109- {-# INLINE withMsgQueue #-}
114+ withPeekMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s , Message ) -> StoreMonad s a ) -> ExceptT ErrorType IO a
115+ withPeekMsgQueue st rId q op a = isolateQueue rId q op $ getPeekMsgQueue st rId q >>= a
116+ {-# INLINE withPeekMsgQueue #-}
110117
111118deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
112119deleteExpiredMsgs st rId q old =
0 commit comments