Skip to content

Commit 655e7ad

Browse files
authored
smp server: get message queue faster, avoiding STM contention if queue exists, split transaction for notification delivery (#1289)
* put DRG state to IORef, split STM transaction of sending notification (#1288) * put DRG state to IORef, split STM transaction of sending notification * remove comment * remove comment * add comment * revert version * smp server: get message queue faster, avoiding STM contention if queue exists * IORef for counter * Revert "put DRG state to IORef, split STM transaction of sending notification (#1288)" This reverts commit 517933d. * version * remove IORef * split notification delivery transations * revert version
1 parent 9596a03 commit 655e7ad

File tree

5 files changed

+61
-51
lines changed

5 files changed

+61
-51
lines changed

package.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: simplexmq
2-
version: 6.0.2.0
2+
version: 6.0.2
33
synopsis: SimpleXMQ message broker
44
description: |
55
This package includes <./docs/Simplex-Messaging-Server.html server>,

simplexmq.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ cabal-version: 1.12
55
-- see: https://github.com/sol/hpack
66

77
name: simplexmq
8-
version: 6.0.2.0
8+
version: 6.0.2
99
synopsis: SimpleXMQ message broker
1010
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
1111
<./docs/Simplex-Messaging-Client.html client> and

src/Simplex/Messaging/Server.hs

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import Control.Monad.Except
4444
import Control.Monad.IO.Unlift
4545
import Control.Monad.Reader
4646
import Control.Monad.Trans.Except
47-
import Crypto.Random
4847
import Control.Monad.STM (retry)
4948
import Data.Bifunctor (first)
5049
import Data.ByteString.Base64 (encode)
@@ -247,7 +246,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
247246
old <- liftIO $ expireBeforeEpoch expCfg
248247
rIds <- M.keysSet <$> readTVarIO ms
249248
forM_ rIds $ \rId -> do
250-
q <- atomically (getMsgQueue ms rId quota)
249+
q <- liftIO $ getMsgQueue ms rId quota
251250
deleted <- atomically $ deleteExpiredMsgs q old
252251
liftIO $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
253252

@@ -1255,15 +1254,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
12551254
Just (msg, wasEmpty) -> time "SEND ok" $ do
12561255
when wasEmpty $ tryDeliverMessage msg
12571256
when (notification msgFlags) $ do
1258-
forM_ (notifier qr) $ \ntf -> do
1259-
asks random >>= atomically . trySendNotification ntf msg >>= \case
1260-
Nothing -> do
1261-
incStat $ msgNtfNoSub stats
1262-
logWarn "No notification subscription"
1263-
Just False -> do
1264-
incStat $ msgNtfLost stats
1265-
logWarn "Dropped message notification"
1266-
Just True -> incStat $ msgNtfs stats
1257+
mapM_ (`trySendNotification` msg) (notifier qr)
12671258
incStat $ msgSentNtf stats
12681259
liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
12691260
incStat $ msgSent stats
@@ -1335,23 +1326,35 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
13351326
deliver q s
13361327
writeTVar st NoSub
13371328

1338-
trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool)
1339-
trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg =
1340-
mapM (writeNtf notifierId msg rcvNtfDhSecret ntfNonceDrg) =<< TM.lookup notifierId notifiers
1341-
1342-
writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM Bool
1343-
writeNtf nId msg rcvNtfDhSecret ntfNonceDrg Client {sndQ = q} =
1344-
ifM (isFullTBQueue q) (pure False) (sendNtf $> True)
1345-
where
1346-
sendNtf = case msg of
1347-
Message {msgId, msgTs} -> do
1348-
(nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg
1349-
writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)]
1350-
_ -> pure ()
1351-
1352-
mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> TVar ChaChaDRG -> STM (C.CbNonce, EncNMsgMeta)
1353-
mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg = do
1354-
cbNonce <- C.randomCbNonce ntfNonceDrg
1329+
trySendNotification :: NtfCreds -> Message -> M ()
1330+
trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg = do
1331+
stats <- asks serverStats
1332+
liftIO (TM.lookupIO notifierId notifiers) >>= \case
1333+
Nothing -> do
1334+
incStat $ msgNtfNoSub stats
1335+
logWarn "No notification subscription"
1336+
Just ntfClnt -> do
1337+
let updateStats True = incStat $ msgNtfs stats
1338+
updateStats _ = do
1339+
incStat $ msgNtfLost stats
1340+
logWarn "Dropped message notification"
1341+
writeNtf notifierId msg rcvNtfDhSecret ntfClnt >>= mapM_ updateStats
1342+
1343+
writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> Client -> M (Maybe Bool)
1344+
writeNtf nId msg rcvNtfDhSecret Client {sndQ = q} = case msg of
1345+
Message {msgId, msgTs} -> Just <$> do
1346+
(nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret
1347+
-- must be in one STM transaction to avoid the queue becoming full between the check and writing
1348+
atomically $
1349+
ifM
1350+
(isFullTBQueue q)
1351+
(pure $ False)
1352+
(True <$ writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)])
1353+
_ -> pure Nothing
1354+
1355+
mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> M (C.CbNonce, EncNMsgMeta)
1356+
mkMessageNotification msgId msgTs rcvNtfDhSecret = do
1357+
cbNonce <- atomically . C.randomCbNonce =<< asks random
13551358
let msgMeta = NMsgMeta {msgId, msgTs}
13561359
encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128
13571360
pure . (cbNonce,) $ fromRight "" encNMsgMeta
@@ -1441,7 +1444,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
14411444
getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do
14421445
ms <- asks msgStore
14431446
quota <- asks $ msgQueueQuota . config
1444-
atomically $ getMsgQueue ms rId quota
1447+
liftIO $ getMsgQueue ms rId quota
14451448

14461449
delQueueAndMsgs :: QueueStore -> M (Transmission BrokerMsg)
14471450
delQueueAndMsgs st = do
@@ -1459,24 +1462,23 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
14591462

14601463
getQueueInfo :: QueueRec -> M (Transmission BrokerMsg)
14611464
getQueueInfo QueueRec {senderKey, notifier} = do
1462-
q@MsgQueue {size} <- getStoreMsgQueue "getQueueInfo" entId
1463-
info <- atomically $ do
1464-
qiSub <- TM.lookup entId subscriptions >>= mapM mkQSub
1465-
qiSize <- readTVar size
1466-
qiMsg <- toMsgInfo <$$> tryPeekMsg q
1467-
pure QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
1465+
q <- getStoreMsgQueue "getQueueInfo" entId
1466+
qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub
1467+
qiSize <- liftIO $ getQueueSize q
1468+
qiMsg <- atomically $ toMsgInfo <$$> tryPeekMsg q
1469+
let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
14681470
pure (corrId, entId, INFO info)
14691471
where
14701472
mkQSub Sub {subThread, delivered} = do
14711473
qSubThread <- case subThread of
14721474
ServerSub t -> do
1473-
st <- readTVar t
1475+
st <- readTVarIO t
14741476
pure $ case st of
14751477
NoSub -> QNoSub
14761478
SubPending -> QSubPending
14771479
SubThread _ -> QSubThread
14781480
ProhibitSub -> pure QProhibitSub
1479-
qDelivered <- decodeLatin1 . encode <$$> tryReadTMVar delivered
1481+
qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered
14801482
pure QSub {qSubThread, qDelivered}
14811483

14821484
ok :: Transmission BrokerMsg
@@ -1564,13 +1566,12 @@ restoreServerMessages =
15641566
where
15651567
s = LB.toStrict s'
15661568
addToMsgQueue rId msg = do
1567-
(isExpired, logFull) <- atomically $ do
1568-
q <- getMsgQueue ms rId quota
1569-
case msg of
1570-
Message {msgTs}
1571-
| maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg
1572-
| otherwise -> pure (True, False)
1573-
MessageQuota {} -> writeMsg q msg $> (False, False)
1569+
q <- liftIO $ getMsgQueue ms rId quota
1570+
(isExpired, logFull) <- atomically $ case msg of
1571+
Message {msgTs}
1572+
| maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg
1573+
| otherwise -> pure (True, False)
1574+
MessageQuota {} -> writeMsg q msg $> (False, False)
15741575
when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg)
15751576
pure $ if isExpired then expired + 1 else expired
15761577
msgErr :: Show e => String -> e -> String
@@ -1595,7 +1596,7 @@ restoreServerStats expiredWhileRestoring = asks (serverStatsBackupFile . config)
15951596
Right d@ServerStatsData {_qCount = statsQCount} -> do
15961597
s <- asks serverStats
15971598
_qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore
1598-
_msgCount <- foldM (\(!n) q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore
1599+
_msgCount <- liftIO . foldM (\(!n) q -> (n +) <$> getQueueSize q) 0 =<< readTVarIO =<< asks msgStore
15991600
liftIO $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredWhileRestoring}
16001601
renameFile f $ f <> ".bak"
16011602
logInfo "server stats restored"

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
module Simplex.Messaging.Server.MsgStore.STM
1111
( STMMsgStore,
12-
MsgQueue (..),
12+
MsgQueue (msgQueue),
1313
newMsgStore,
1414
getMsgQueue,
1515
delMsgQueue,
@@ -20,6 +20,7 @@ module Simplex.Messaging.Server.MsgStore.STM
2020
tryDelMsg,
2121
tryDelPeekMsg,
2222
deleteExpiredMsgs,
23+
getQueueSize,
2324
)
2425
where
2526

@@ -44,9 +45,14 @@ type STMMsgStore = TMap RecipientId MsgQueue
4445
newMsgStore :: IO STMMsgStore
4546
newMsgStore = TM.emptyIO
4647

47-
getMsgQueue :: STMMsgStore -> RecipientId -> Int -> STM MsgQueue
48-
getMsgQueue st rId quota = maybe newQ pure =<< TM.lookup rId st
48+
-- The reason for double lookup is that majority of messaging queues exist,
49+
-- because multiple messages are sent to the same queue,
50+
-- so the first lookup without STM transaction will return the queue faster.
51+
-- In case the queue does not exist, it needs to be looked-up again inside transaction.
52+
getMsgQueue :: STMMsgStore -> RecipientId -> Int -> IO MsgQueue
53+
getMsgQueue st rId quota = TM.lookupIO rId st >>= maybe (atomically maybeNewQ) pure
4954
where
55+
maybeNewQ = TM.lookup rId st >>= maybe newQ pure
5056
newQ = do
5157
msgQueue <- newTQueue
5258
canWrite <- newTVar True
@@ -117,3 +123,6 @@ tryDeleteMsg MsgQueue {msgQueue = q, size} =
117123
tryReadTQueue q >>= \case
118124
Just _ -> modifyTVar' size (subtract 1)
119125
_ -> pure ()
126+
127+
getQueueSize :: MsgQueue -> IO Int
128+
getQueueSize MsgQueue {size} = readTVarIO size

tests/ServerTests.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ testTiming (ATransport t) =
771771
(C.AuthAlg C.SX25519, C.AuthAlg C.SX25519, 200) -- correct key type
772772
]
773773
timeRepeat n = fmap fst . timeItT . forM_ (replicate n ()) . const
774-
similarTime t1 t2 = abs (t2 / t1 - 1) < 0.2 -- normally the difference between "no queue" and "wrong key" is less than 5%
774+
similarTime t1 t2 = abs (t2 / t1 - 1) < 0.25 -- normally the difference between "no queue" and "wrong key" is less than 5%
775775
testSameTiming :: forall c. Transport c => THandleSMP c 'TClient -> THandleSMP c 'TClient -> (C.AuthAlg, C.AuthAlg, Int) -> Expectation
776776
testSameTiming rh sh (C.AuthAlg goodKeyAlg, C.AuthAlg badKeyAlg, n) = do
777777
g <- C.newRandom

0 commit comments

Comments
 (0)