Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@

-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
dummySignKey :: C.SignatureAlgorithm a => C.SAlgorithm a -> C.PublicKey a

Check warning on line 1048 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a

Check warning on line 1048 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a
dummySignKey = \case
C.SEd25519 -> dummyKeyEd25519
C.SEd448 -> dummyKeyEd448
Expand Down Expand Up @@ -1821,9 +1821,9 @@
mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota)
where
-- if the first message in queue head is "quota", remove it.
mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ \mq ->
tryPeekMsg_ mq >>= \case
Just MessageQuota {} -> tryDeleteMsg_ q mq False
mergeQuotaMsgs = withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \(mq, msg) ->

Check warning on line 1824 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

This binding for ‘msg’ shadows the existing binding

Check warning on line 1824 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

This binding for ‘msg’ shadows the existing binding
case msg of
MessageQuota {} -> tryDeleteMsg_ q mq False
_ -> pure ()
msgErr :: Show e => String -> e -> String
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)
Expand Down
59 changes: 45 additions & 14 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}

module Simplex.Messaging.Server.MsgStore.Journal
( JournalMsgStore (queues, senders, notifiers, random),
JournalQueue,
JournalMsgQueue (queue, state),
JMQueue (queueDirectory, statePath),
JournalStoreConfig (..),
getQueueMessages,
closeMsgQueue,
closeMsgQueueHandles,
-- below are exported for tests
Expand Down Expand Up @@ -50,7 +49,7 @@ import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import Data.Maybe (catMaybes, fromMaybe)
import Data.Maybe (catMaybes, fromMaybe, isNothing)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
Expand Down Expand Up @@ -105,7 +104,9 @@ data JournalQueue = JournalQueue
queueRec :: TVar (Maybe QueueRec),
msgQueue_ :: TVar (Maybe JournalMsgQueue),
-- system time in seconds since epoch
activeAt :: TVar Int64
activeAt :: TVar Int64,
-- Just True - empty, Just False - non-empty, Nothing - unknown
isEmpty :: TVar (Maybe Bool)
}

data JMQueue = JMQueue
Expand Down Expand Up @@ -224,10 +225,11 @@ instance STMQueueStore JournalMsgStore where
storeLog' = storeLog
mkQueue st qr = do
lock <- getMapLock (queueLocks st) $ recipientId qr
q <- newTVar $! Just qr
q <- newTVar $ Just qr
mq <- newTVar Nothing
activeAt <- newTVar 0
pure $ JournalQueue lock q mq activeAt
isEmpty <- newTVar Nothing
pure $ JournalQueue lock q mq activeAt isEmpty
msgQueue_' = msgQueue_

instance MsgStoreClass JournalMsgStore where
Expand Down Expand Up @@ -322,7 +324,7 @@ instance MsgStoreClass JournalMsgStore where
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
queue = JMQueue {queueDirectory = dir, statePath}
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue)
atomically $ writeTVar msgQueue_ $! Just q
atomically $ writeTVar msgQueue_ $ Just q
pure q
where
createQ :: JMQueue -> IO JournalMsgQueue
Expand All @@ -332,14 +334,39 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing

getPeekMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message))
getPeekMsgQueue ms rId q@JournalQueue {isEmpty} =
StoreIO (readTVarIO isEmpty) >>= \case
Just True -> pure Nothing
Just False -> peek
Nothing -> do
-- We only close the queue if we just learnt it's empty.
-- This is needed to reduce file descriptors and memory usage
-- after the server just started and many clients subscribe.
-- In case the queue became non-empty on write and then again empty on read
-- we won't be closing it, to avoid frequent open/close on active queues.
r <- peek
when (isNothing r) $ StoreIO $ closeMsgQueue q
pure r
where
peek = do
mq <- getMsgQueue ms rId q
(mq,) <$$> tryPeekMsg_ q mq

-- only runs action if queue is not empty
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do
r <- action mq
sz <- getQueueSize_ mq
pure (Just r, sz)
E.bracket
(unStoreIO $ getPeekMsgQueue ms rId q)
(mapM_ $ \_ -> closeMsgQueue q)
(maybe (pure (Nothing, 0)) (unStoreIO . run))
where
run (mq, _) = do
r <- action mq
sz <- getQueueSize_ mq
pure (Just r, sz)
Just mq -> do
ts <- readTVarIO $ activeAt q
r <- if now - ts >= idleInterval config
Expand Down Expand Up @@ -378,6 +405,7 @@ instance MsgStoreClass JournalMsgStore where
let empty = size == 0
if canWrite || empty
then do
atomically $ writeTVar (isEmpty q') (Just False)
let canWrt' = quota > size
if canWrt'
then writeToJournal q st canWrt' msg $> Just (msg, empty)
Expand Down Expand Up @@ -426,16 +454,19 @@ instance MsgStoreClass JournalMsgStore where
getQueueSize_ :: JournalMsgQueue -> StoreIO Int
getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state

tryPeekMsg_ :: JournalMsgQueue -> StoreIO (Maybe Message)
tryPeekMsg_ q@JournalMsgQueue {tipMsg, handles} =
StoreIO $ readTVarIO handles $>>= chooseReadJournal q True $>>= peekMsg
tryPeekMsg_ :: JournalQueue -> JournalMsgQueue -> StoreIO (Maybe Message)
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} =
StoreIO $ (readTVarIO handles $>>= chooseReadJournal mq True $>>= peekMsg) >>= setEmpty
where
peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
where
readMsg = do
ml@(msg, _) <- hGetMsgAt h $ bytePos rs
atomically $ writeTVar tipMsg $ Just (Just ml)
pure $ Just msg
setEmpty msg = do
atomically $ writeTVar (isEmpty q) (Just $ isNothing msg)
pure msg

tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
Expand Down
16 changes: 9 additions & 7 deletions src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
Expand All @@ -8,8 +7,8 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}

module Simplex.Messaging.Server.MsgStore.STM
( STMMsgStore (..),
Expand All @@ -29,7 +28,7 @@ import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util ((<$$>))
import Simplex.Messaging.Util ((<$$>), ($>>=))
import System.IO (IOMode (..))

data STMMsgStore = STMMsgStore
Expand Down Expand Up @@ -63,7 +62,7 @@ instance STMQueueStore STMMsgStore where
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue _ qr = STMQueue <$> (newTVar $! Just qr) <*> newTVar Nothing
mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing
msgQueue_' = msgQueue_

instance MsgStoreClass STMMsgStore where
Expand Down Expand Up @@ -106,9 +105,12 @@ instance MsgStoreClass STMMsgStore where
canWrite <- newTVar True
size <- newTVar 0
let q = STMMsgQueue {msgQueue, canWrite, size}
writeTVar msgQueue_ $! Just q
writeTVar msgQueue_ (Just q)
pure q

getPeekMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
getPeekMsgQueue _ _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq

-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
Expand Down Expand Up @@ -159,8 +161,8 @@ instance MsgStoreClass STMMsgStore where
getQueueSize_ :: STMMsgQueue -> STM Int
getQueueSize_ STMMsgQueue {size} = readTVar size

tryPeekMsg_ :: STMMsgQueue -> STM (Maybe Message)
tryPeekMsg_ = tryPeekTQueue . msgQueue
tryPeekMsg_ :: STMQueue -> STMMsgQueue -> STM (Maybe Message)
tryPeekMsg_ _ = tryPeekTQueue . msgQueue
{-# INLINE tryPeekMsg_ #-}

tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
Expand Down
46 changes: 27 additions & 19 deletions src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}

{-# HLINT ignore "Redundant multi-way if" #-}

module Simplex.Messaging.Server.MsgStore.Types where

import Control.Concurrent.STM
import Control.Monad (foldM)
import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Kind
import qualified Data.Map.Strict as M
Expand All @@ -21,6 +26,7 @@ import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.StoreLog.Types
import Simplex.Messaging.TMap (TMap)
import Simplex.Messaging.Util ((<$$>))
import System.IO (IOMode (..))

class MsgStoreClass s => STMQueueStore s where
Expand All @@ -44,7 +50,9 @@ class Monad (StoreMonad s) => MsgStoreClass s where
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)

-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
Expand All @@ -53,7 +61,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
writeMsg :: s -> RecipientId -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a

Expand All @@ -73,39 +81,39 @@ withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty
pure $! acc <> r

getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueMessages_ drainMsgs
getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst)
{-# INLINE getQueueMessages #-}

getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueSize_
getQueueSize st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
{-# INLINE getQueueSize #-}

tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ tryPeekMsg_
tryPeekMsg st rId q = snd <$$> withPeekMsgQueue st rId q "tryPeekMsg" pure
{-# INLINE tryPeekMsg #-}

tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st rId q msgId' =
withMsgQueue st rId q "tryDelMsg" $ \mq ->
tryPeekMsg_ mq >>= \case
msg_@(Just msg)
withPeekMsgQueue st rId q "tryDelMsg" $
maybe (pure Nothing) $ \(mq, msg) ->
if
| messageId msg == msgId' ->
tryDeleteMsg_ q mq True >> pure msg_
_ -> pure Nothing
tryDeleteMsg_ q mq True $> Just msg
| otherwise -> pure Nothing

-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st rId q msgId' =
withMsgQueue st rId q "tryDelPeekMsg" $ \mq ->
tryPeekMsg_ mq >>= \case
msg_@(Just msg)
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ mq)
| otherwise -> pure (Nothing, msg_)
_ -> pure (Nothing, Nothing)
withPeekMsgQueue st rId q "tryDelPeekMsg" $
maybe (pure (Nothing, Nothing)) $ \(mq, msg) ->
if
| messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
| otherwise -> pure (Nothing, Just msg)

withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (MsgQueue s -> StoreMonad s a) -> ExceptT ErrorType IO a
withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a
{-# INLINE withMsgQueue #-}
-- The action is called with Nothing when it is known that the queue is empty
withPeekMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue st rId q op a = isolateQueue rId q op $ getPeekMsgQueue st rId q >>= a
{-# INLINE withPeekMsgQueue #-}

deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st rId q old =
Expand All @@ -126,7 +134,7 @@ deleteExpireMsgs_ old q mq = do
pure n
where
loop dc =
tryPeekMsg_ mq >>= \case
tryPeekMsg_ q mq >>= \case
Just Message {msgTs}
| systemSeconds msgTs < old ->
tryDeleteMsg_ q mq False >> loop (dc + 1)
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/QueueStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier

getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))

Check warning on line 66 in src/Simplex/Messaging/Server/QueueStore/STM.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: DirectParty p

Check warning on line 66 in src/Simplex/Messaging/Server/QueueStore/STM.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: DirectParty p
getQueue st party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId $ queues' st
Expand All @@ -84,7 +84,7 @@
secure q@QueueRec {recipientId = rId} = case senderKey q of
Just k -> pure $ if sKey == k then Right rId else Left AUTH
Nothing -> do
writeTVar qr $! Just q {senderKey = Just sKey}
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right rId

addQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
Expand All @@ -96,7 +96,7 @@
add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $! Just q'
writeTVar qr $ Just q'
TM.insert nId rId $ notifiers' st
pure $ Right (rId, nId_)

Expand Down
Loading