Skip to content

Commit 9596a03

Browse files
authored
servers: reduce STM transactions (#1287)
* servers: reduce STM transactions * switch stats and pending ENDs to IORef * more IORef, split pending ENDs to use in one thread
1 parent 2e7e476 commit 9596a03

File tree

8 files changed

+435
-414
lines changed

8 files changed

+435
-414
lines changed

src/Simplex/FileTransfer/Server.hs

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import Data.Time.Format.ISO8601 (iso8601Show)
3737
import Data.Word (Word32)
3838
import qualified Data.X509 as X
3939
import GHC.IO.Handle (hSetNewlineMode)
40+
import GHC.IORef (atomicSwapIORef)
4041
import GHC.Stats (getRTSStats)
4142
import qualified Network.HTTP.Types as N
4243
import qualified Network.HTTP2.Server as H
@@ -207,17 +208,17 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
207208
withFile statsFilePath AppendMode $ \h -> liftIO $ do
208209
hSetBuffering h LineBuffering
209210
ts <- getCurrentTime
210-
fromTime' <- atomically $ swapTVar fromTime ts
211-
filesCreated' <- atomically $ swapTVar filesCreated 0
212-
fileRecipients' <- atomically $ swapTVar fileRecipients 0
213-
filesUploaded' <- atomically $ swapTVar filesUploaded 0
214-
filesExpired' <- atomically $ swapTVar filesExpired 0
215-
filesDeleted' <- atomically $ swapTVar filesDeleted 0
216-
files <- atomically $ periodStatCounts filesDownloaded ts
217-
fileDownloads' <- atomically $ swapTVar fileDownloads 0
218-
fileDownloadAcks' <- atomically $ swapTVar fileDownloadAcks 0
219-
filesCount' <- readTVarIO filesCount
220-
filesSize' <- readTVarIO filesSize
211+
fromTime' <- atomicSwapIORef fromTime ts
212+
filesCreated' <- atomicSwapIORef filesCreated 0
213+
fileRecipients' <- atomicSwapIORef fileRecipients 0
214+
filesUploaded' <- atomicSwapIORef filesUploaded 0
215+
filesExpired' <- atomicSwapIORef filesExpired 0
216+
filesDeleted' <- atomicSwapIORef filesDeleted 0
217+
files <- liftIO $ periodStatCounts filesDownloaded ts
218+
fileDownloads' <- atomicSwapIORef fileDownloads 0
219+
fileDownloadAcks' <- atomicSwapIORef fileDownloadAcks 0
220+
filesCount' <- readIORef filesCount
221+
filesSize' <- readIORef filesSize
221222
hPutStrLn h $
222223
intercalate
223224
","
@@ -405,8 +406,8 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
405406
logAddFile sl sId file ts
406407
logAddRecipients sl sId rcps
407408
stats <- asks serverStats
408-
atomically $ modifyTVar' (filesCreated stats) (+ 1)
409-
atomically $ modifyTVar' (fileRecipients stats) (+ length rks)
409+
lift $ incFileStat filesCreated
410+
liftIO $ atomicModifyIORef'_ (fileRecipients stats) (+ length rks)
410411
let rIds = L.map (\(FileRecipient rId _) -> rId) rcps
411412
pure $ FRSndIds sId rIds
412413
pure $ either FRErr id r
@@ -435,7 +436,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
435436
rcps <- mapM (ExceptT . addRecipientRetry st 3 sId) rks
436437
lift $ withFileLog $ \sl -> logAddRecipients sl sId rcps
437438
stats <- asks serverStats
438-
atomically $ modifyTVar' (fileRecipients stats) (+ length rks)
439+
liftIO $ atomicModifyIORef'_ (fileRecipients stats) (+ length rks)
439440
let rIds = L.map (\(FileRecipient rId _) -> rId) rcps
440441
pure $ FRRcvIds rIds
441442
pure $ either FRErr id r
@@ -469,9 +470,9 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
469470
stats <- asks serverStats
470471
withFileLog $ \sl -> logPutFile sl senderId fPath
471472
atomically $ writeTVar filePath (Just fPath)
472-
atomically $ modifyTVar' (filesUploaded stats) (+ 1)
473-
atomically $ modifyTVar' (filesCount stats) (+ 1)
474-
atomically $ modifyTVar' (filesSize stats) (+ fromIntegral size)
473+
incFileStat filesUploaded
474+
incFileStat filesCount
475+
liftIO $ atomicModifyIORef'_ (filesSize stats) (+ fromIntegral size)
475476
pure FROk
476477
Left e -> do
477478
us <- asks $ usedStorage . store
@@ -494,8 +495,8 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
494495
case LC.cbInit dhSecret cbNonce of
495496
Right sbState -> do
496497
stats <- asks serverStats
497-
atomically $ modifyTVar' (fileDownloads stats) (+ 1)
498-
atomically $ updatePeriodStats (filesDownloaded stats) senderId
498+
incFileStat fileDownloads
499+
liftIO $ updatePeriodStats (filesDownloaded stats) senderId
499500
pure (FRFile sDhKey cbNonce, Just ServerFile {filePath = path, fileSize = size, sbState})
500501
_ -> pure (FRErr INTERNAL, Nothing)
501502
_ -> pure (FRErr NO_FILE, Nothing)
@@ -511,8 +512,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
511512
withFileLog (`logAckFile` rId)
512513
st <- asks store
513514
atomically $ deleteRecipient st rId fr
514-
stats <- asks serverStats
515-
atomically $ modifyTVar' (fileDownloadAcks stats) (+ 1)
515+
incFileStat fileDownloadAcks
516516
pure FROk
517517

518518
deleteServerFile_ :: FileRec -> M (Either XFTPErrorType ())
@@ -524,11 +524,11 @@ deleteServerFile_ FileRec {senderId, fileInfo, filePath} = do
524524
ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p >> deletedStats stats))
525525
st <- asks store
526526
void $ atomically $ deleteFile st senderId
527-
atomically $ modifyTVar' (filesDeleted stats) (+ 1)
527+
lift $ incFileStat filesDeleted
528528
where
529529
deletedStats stats = do
530-
atomically $ modifyTVar' (filesCount stats) (subtract 1)
531-
atomically $ modifyTVar' (filesSize stats) (subtract $ fromIntegral $ size fileInfo)
530+
liftIO $ atomicModifyIORef'_ (filesCount stats) (subtract 1)
531+
liftIO $ atomicModifyIORef'_ (filesSize stats) (subtract $ fromIntegral $ size fileInfo)
532532

533533
expireServerFiles :: Maybe Int -> ExpirationConfig -> M ()
534534
expireServerFiles itemDelay expCfg = do
@@ -554,8 +554,7 @@ expireServerFiles itemDelay expCfg = do
554554
delete st sId = do
555555
withFileLog (`logDeleteFile` sId)
556556
void . atomically $ deleteFile st sId -- will not update usedStorage if sId isn't in store
557-
FileServerStats {filesExpired} <- asks serverStats
558-
atomically $ modifyTVar' filesExpired (+ 1)
557+
incFileStat filesExpired
559558

560559
randomId :: Int -> M ByteString
561560
randomId n = atomically . C.randomBytes n =<< asks random
@@ -568,10 +567,10 @@ getFileId = do
568567
withFileLog :: (StoreLog 'WriteMode -> IO a) -> M ()
569568
withFileLog action = liftIO . mapM_ action =<< asks storeLog
570569

571-
incFileStat :: (FileServerStats -> TVar Int) -> M ()
570+
incFileStat :: (FileServerStats -> IORef Int) -> M ()
572571
incFileStat statSel = do
573572
stats <- asks serverStats
574-
atomically $ modifyTVar' (statSel stats) (+ 1)
573+
liftIO $ atomicModifyIORef'_ (statSel stats) (+ 1)
575574

576575
saveServerStats :: M ()
577576
saveServerStats =
@@ -594,7 +593,7 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat
594593
FileStore {files, usedStorage} <- asks store
595594
_filesCount <- M.size <$> readTVarIO files
596595
_filesSize <- readTVarIO usedStorage
597-
atomically $ setFileServerStats s d {_filesCount, _filesSize}
596+
liftIO $ setFileServerStats s d {_filesCount, _filesSize}
598597
renameFile f $ f <> ".bak"
599598
logInfo "server stats restored"
600599
when (statsFilesCount /= _filesCount) $ logWarn $ "Files count differs: stats: " <> tshow statsFilesCount <> ", store: " <> tshow _filesCount

src/Simplex/FileTransfer/Server/Stats.hs

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,25 @@ module Simplex.FileTransfer.Server.Stats where
77
import Control.Applicative ((<|>))
88
import qualified Data.Attoparsec.ByteString.Char8 as A
99
import qualified Data.ByteString.Char8 as B
10+
import Data.IORef
1011
import Data.Int (Int64)
1112
import Data.Time.Clock (UTCTime)
1213
import Simplex.Messaging.Encoding.String
1314
import Simplex.Messaging.Protocol (SenderId)
1415
import Simplex.Messaging.Server.Stats (PeriodStats, PeriodStatsData, getPeriodStatsData, newPeriodStats, setPeriodStats)
15-
import UnliftIO.STM
1616

1717
data FileServerStats = FileServerStats
18-
{ fromTime :: TVar UTCTime,
19-
filesCreated :: TVar Int,
20-
fileRecipients :: TVar Int,
21-
filesUploaded :: TVar Int,
22-
filesExpired :: TVar Int,
23-
filesDeleted :: TVar Int,
18+
{ fromTime :: IORef UTCTime,
19+
filesCreated :: IORef Int,
20+
fileRecipients :: IORef Int,
21+
filesUploaded :: IORef Int,
22+
filesExpired :: IORef Int,
23+
filesDeleted :: IORef Int,
2424
filesDownloaded :: PeriodStats SenderId,
25-
fileDownloads :: TVar Int,
26-
fileDownloadAcks :: TVar Int,
27-
filesCount :: TVar Int,
28-
filesSize :: TVar Int64
25+
fileDownloads :: IORef Int,
26+
fileDownloadAcks :: IORef Int,
27+
filesCount :: IORef Int,
28+
filesSize :: IORef Int64
2929
}
3030

3131
data FileServerStatsData = FileServerStatsData
@@ -45,47 +45,48 @@ data FileServerStatsData = FileServerStatsData
4545

4646
newFileServerStats :: UTCTime -> IO FileServerStats
4747
newFileServerStats ts = do
48-
fromTime <- newTVarIO ts
49-
filesCreated <- newTVarIO 0
50-
fileRecipients <- newTVarIO 0
51-
filesUploaded <- newTVarIO 0
52-
filesExpired <- newTVarIO 0
53-
filesDeleted <- newTVarIO 0
48+
fromTime <- newIORef ts
49+
filesCreated <- newIORef 0
50+
fileRecipients <- newIORef 0
51+
filesUploaded <- newIORef 0
52+
filesExpired <- newIORef 0
53+
filesDeleted <- newIORef 0
5454
filesDownloaded <- newPeriodStats
55-
fileDownloads <- newTVarIO 0
56-
fileDownloadAcks <- newTVarIO 0
57-
filesCount <- newTVarIO 0
58-
filesSize <- newTVarIO 0
55+
fileDownloads <- newIORef 0
56+
fileDownloadAcks <- newIORef 0
57+
filesCount <- newIORef 0
58+
filesSize <- newIORef 0
5959
pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesExpired, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize}
6060

6161
getFileServerStatsData :: FileServerStats -> IO FileServerStatsData
6262
getFileServerStatsData s = do
63-
_fromTime <- readTVarIO $ fromTime (s :: FileServerStats)
64-
_filesCreated <- readTVarIO $ filesCreated s
65-
_fileRecipients <- readTVarIO $ fileRecipients s
66-
_filesUploaded <- readTVarIO $ filesUploaded s
67-
_filesExpired <- readTVarIO $ filesExpired s
68-
_filesDeleted <- readTVarIO $ filesDeleted s
63+
_fromTime <- readIORef $ fromTime (s :: FileServerStats)
64+
_filesCreated <- readIORef $ filesCreated s
65+
_fileRecipients <- readIORef $ fileRecipients s
66+
_filesUploaded <- readIORef $ filesUploaded s
67+
_filesExpired <- readIORef $ filesExpired s
68+
_filesDeleted <- readIORef $ filesDeleted s
6969
_filesDownloaded <- getPeriodStatsData $ filesDownloaded s
70-
_fileDownloads <- readTVarIO $ fileDownloads s
71-
_fileDownloadAcks <- readTVarIO $ fileDownloadAcks s
72-
_filesCount <- readTVarIO $ filesCount s
73-
_filesSize <- readTVarIO $ filesSize s
70+
_fileDownloads <- readIORef $ fileDownloads s
71+
_fileDownloadAcks <- readIORef $ fileDownloadAcks s
72+
_filesCount <- readIORef $ filesCount s
73+
_filesSize <- readIORef $ filesSize s
7474
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize}
7575

76-
setFileServerStats :: FileServerStats -> FileServerStatsData -> STM ()
76+
-- this function is not thread safe, it is used on server start only
77+
setFileServerStats :: FileServerStats -> FileServerStatsData -> IO ()
7778
setFileServerStats s d = do
78-
writeTVar (fromTime (s :: FileServerStats)) $! _fromTime (d :: FileServerStatsData)
79-
writeTVar (filesCreated s) $! _filesCreated d
80-
writeTVar (fileRecipients s) $! _fileRecipients d
81-
writeTVar (filesUploaded s) $! _filesUploaded d
82-
writeTVar (filesExpired s) $! _filesExpired d
83-
writeTVar (filesDeleted s) $! _filesDeleted d
79+
writeIORef (fromTime (s :: FileServerStats)) $! _fromTime (d :: FileServerStatsData)
80+
writeIORef (filesCreated s) $! _filesCreated d
81+
writeIORef (fileRecipients s) $! _fileRecipients d
82+
writeIORef (filesUploaded s) $! _filesUploaded d
83+
writeIORef (filesExpired s) $! _filesExpired d
84+
writeIORef (filesDeleted s) $! _filesDeleted d
8485
setPeriodStats (filesDownloaded s) $! _filesDownloaded d
85-
writeTVar (fileDownloads s) $! _fileDownloads d
86-
writeTVar (fileDownloadAcks s) $! _fileDownloadAcks d
87-
writeTVar (filesCount s) $! _filesCount d
88-
writeTVar (filesSize s) $! _filesSize d
86+
writeIORef (fileDownloads s) $! _fileDownloads d
87+
writeIORef (fileDownloadAcks s) $! _fileDownloadAcks d
88+
writeIORef (filesCount s) $! _filesCount d
89+
writeIORef (filesSize s) $! _filesSize d
8990

9091
instance StrEncoding FileServerStatsData where
9192
strEncode FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} =

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import Control.Monad.Reader
1919
import Data.ByteString.Char8 (ByteString)
2020
import qualified Data.ByteString.Char8 as B
2121
import Data.Functor (($>))
22+
import Data.IORef
2223
import Data.Int (Int64)
2324
import Data.List (intercalate, sort)
2425
import Data.List.NonEmpty (NonEmpty (..))
@@ -30,6 +31,7 @@ import Data.Text.Encoding (decodeLatin1)
3031
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
3132
import Data.Time.Clock.System (getSystemTime)
3233
import Data.Time.Format.ISO8601 (iso8601Show)
34+
import GHC.IORef (atomicSwapIORef)
3335
import Network.Socket (ServiceName)
3436
import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..))
3537
import Simplex.Messaging.Client.Agent
@@ -118,16 +120,16 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do
118120
withFile statsFilePath AppendMode $ \h -> liftIO $ do
119121
hSetBuffering h LineBuffering
120122
ts <- getCurrentTime
121-
fromTime' <- atomically $ swapTVar fromTime ts
122-
tknCreated' <- atomically $ swapTVar tknCreated 0
123-
tknVerified' <- atomically $ swapTVar tknVerified 0
124-
tknDeleted' <- atomically $ swapTVar tknDeleted 0
125-
subCreated' <- atomically $ swapTVar subCreated 0
126-
subDeleted' <- atomically $ swapTVar subDeleted 0
127-
ntfReceived' <- atomically $ swapTVar ntfReceived 0
128-
ntfDelivered' <- atomically $ swapTVar ntfDelivered 0
129-
tkn <- atomically $ periodStatCounts activeTokens ts
130-
sub <- atomically $ periodStatCounts activeSubs ts
123+
fromTime' <- atomicSwapIORef fromTime ts
124+
tknCreated' <- atomicSwapIORef tknCreated 0
125+
tknVerified' <- atomicSwapIORef tknVerified 0
126+
tknDeleted' <- atomicSwapIORef tknDeleted 0
127+
subCreated' <- atomicSwapIORef subCreated 0
128+
subDeleted' <- atomicSwapIORef subDeleted 0
129+
ntfReceived' <- atomicSwapIORef ntfReceived 0
130+
ntfDelivered' <- atomicSwapIORef ntfDelivered 0
131+
tkn <- liftIO $ periodStatCounts activeTokens ts
132+
sub <- liftIO $ periodStatCounts activeSubs ts
131133
hPutStrLn h $
132134
intercalate
133135
","
@@ -215,7 +217,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
215217
st <- asks store
216218
NtfPushServer {pushQ} <- asks pushServer
217219
stats <- asks serverStats
218-
atomically $ updatePeriodStats (activeSubs stats) ntfId
220+
liftIO $ updatePeriodStats (activeSubs stats) ntfId
219221
atomically $
220222
findNtfSubscriptionToken st smpQueue
221223
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}))
@@ -299,7 +301,7 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
299301
void $ deliverNotification pp tkn ntf
300302
PNMessage {} -> checkActiveTkn status $ do
301303
stats <- asks serverStats
302-
atomically $ updatePeriodStats (activeTokens stats) ntfTknId
304+
liftIO $ updatePeriodStats (activeTokens stats) ntfTknId
303305
void $ deliverNotification pp tkn ntf
304306
incNtfStat ntfDelivered
305307
where
@@ -575,14 +577,14 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
575577
withNtfLog :: (StoreLog 'WriteMode -> IO a) -> M ()
576578
withNtfLog action = liftIO . mapM_ action =<< asks storeLog
577579

578-
incNtfStatT :: DeviceToken -> (NtfServerStats -> TVar Int) -> M ()
580+
incNtfStatT :: DeviceToken -> (NtfServerStats -> IORef Int) -> M ()
579581
incNtfStatT (DeviceToken PPApnsNull _) _ = pure ()
580582
incNtfStatT _ statSel = incNtfStat statSel
581583

582-
incNtfStat :: (NtfServerStats -> TVar Int) -> M ()
584+
incNtfStat :: (NtfServerStats -> IORef Int) -> M ()
583585
incNtfStat statSel = do
584586
stats <- asks serverStats
585-
atomically $ modifyTVar' (statSel stats) (+ 1)
587+
liftIO $ atomicModifyIORef'_ (statSel stats) (+ 1)
586588

587589
saveServerStats :: M ()
588590
saveServerStats =
@@ -602,7 +604,7 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat
602604
liftIO (strDecode <$> B.readFile f) >>= \case
603605
Right d -> do
604606
s <- asks serverStats
605-
atomically $ setNtfServerStats s d
607+
liftIO $ setNtfServerStats s d
606608
renameFile f $ f <> ".bak"
607609
logInfo "server stats restored"
608610
Left e -> do

0 commit comments

Comments
 (0)