Skip to content

Commit 0a3d014

Browse files
authored
ntf server: priority db pool for notifications and client commands, concurrent resubscriptions, CLI options (#1529)
* ntf server: option to skip specific tokens when importing store log * logs * batch later * subscribe in parallel, bigger queues * ntf server: test schema and migrations * optimize subscriptions * log level * resubscribe before starting * better subscribing * pooled concurrent resubscriptions * 5 * async resubscribe * typo * priority pool * db batch size
1 parent 7d0115d commit 0a3d014

File tree

26 files changed

+532
-254
lines changed

26 files changed

+532
-254
lines changed

apps/ntf-server/Main.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
1515

1616
main :: IO ()
1717
main = do
18-
setLogLevel LogInfo
1918
cfgPath <- getEnvPath "NTF_SERVER_CFG_PATH" defaultCfgPath
2019
logPath <- getEnvPath "NTF_SERVER_LOG_PATH" defaultLogPath
2120
withGlobalLogging logCfg $ ntfServerCLI cfgPath logPath

simplexmq.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ test-suite simplexmq-test
494494
AgentTests.NotificationTests
495495
NtfClient
496496
NtfServerTests
497-
ServerTests.SchemaDump
497+
PostgresSchemaDump
498498
hs-source-dirs:
499499
tests
500500
apps/smp-server/web

src/Simplex/FileTransfer/Client/Main.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
280280
let chunkSpecs = prepareChunkSpecs encPath chunkSizes
281281
fdRcv = FileDescription {party = SFRecipient, size = FileSize encSize, digest = FileDigest digest, key, nonce, chunkSize = FileSize defChunkSize, chunks = [], redirect = Nothing}
282282
fdSnd = FileDescription {party = SFSender, size = FileSize encSize, digest = FileDigest digest, key, nonce, chunkSize = FileSize defChunkSize, chunks = [], redirect = Nothing}
283-
logInfo $ "encrypted file to " <> tshow encPath
283+
logDebug $ "encrypted file to " <> tshow encPath
284284
pure (encPath, fdRcv, fdSnd, chunkSpecs, encSize)
285285
uploadFile :: TVar ChaChaDRG -> [XFTPChunkSpec] -> TVar [Int64] -> Int64 -> ExceptT CLIError IO [SentFileChunk]
286286
uploadFile g chunks uploadedChunks encSize = do
@@ -293,22 +293,22 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
293293
-- TODO shuffle/unshuffle chunks
294294
-- the reason we don't do pooled downloads here within one server is that http2 library doesn't handle cleint concurrency, even though
295295
-- upload doesn't allow other requests within the same client until complete (but download does allow).
296-
logInfo $ "uploading " <> tshow (length chunks) <> " chunks..."
296+
logDebug $ "uploading " <> tshow (length chunks) <> " chunks..."
297297
(errs, rs) <- partitionEithers . concat <$> liftIO (pooledForConcurrentlyN 16 chunks' . mapM $ runExceptT . uploadFileChunk a)
298298
mapM_ throwE errs
299299
pure $ map snd (sortOn fst rs)
300300
where
301301
uploadFileChunk :: XFTPClientAgent -> (Int, XFTPChunkSpec, XFTPServerWithAuth) -> ExceptT CLIError IO (Int, SentFileChunk)
302302
uploadFileChunk a (chunkNo, chunkSpec@XFTPChunkSpec {chunkSize}, ProtoServerWithAuth xftpServer auth) = do
303-
logInfo $ "uploading chunk " <> tshow chunkNo <> " to " <> showServer xftpServer <> "..."
303+
logDebug $ "uploading chunk " <> tshow chunkNo <> " to " <> showServer xftpServer <> "..."
304304
(sndKey, spKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
305305
rKeys <- atomically $ L.fromList <$> replicateM numRecipients (C.generateAuthKeyPair C.SEd25519 g)
306306
digest <- liftIO $ getChunkDigest chunkSpec
307307
let ch = FileInfo {sndKey, size = chunkSize, digest}
308308
c <- withRetry retryCount $ getXFTPServerClient a xftpServer
309309
(sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey ch (L.map fst rKeys) auth
310310
withReconnect a xftpServer retryCount $ \c' -> uploadXFTPChunk c' spKey sndId chunkSpec
311-
logInfo $ "uploaded chunk " <> tshow chunkNo
311+
logDebug $ "uploaded chunk " <> tshow chunkNo
312312
uploaded <- atomically . stateTVar uploadedChunks $ \cs ->
313313
let cs' = fromIntegral chunkSize : cs in (sum cs', cs')
314314
liftIO $ do
@@ -418,11 +418,11 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
418418
downloadFileChunk :: TVar ChaChaDRG -> XFTPClientAgent -> FilePath -> FileSize Int64 -> TVar [Int64] -> FileChunk -> ExceptT CLIError IO (Int, FilePath)
419419
downloadFileChunk g a encPath (FileSize encSize) downloadedChunks FileChunk {chunkNo, chunkSize, digest, replicas = replica : _} = do
420420
let FileChunkReplica {server, replicaId, replicaKey} = replica
421-
logInfo $ "downloading chunk " <> tshow chunkNo <> " from " <> showServer server <> "..."
421+
logDebug $ "downloading chunk " <> tshow chunkNo <> " from " <> showServer server <> "..."
422422
chunkPath <- uniqueCombine encPath $ show chunkNo
423423
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
424424
withReconnect a server retryCount $ \c -> downloadXFTPChunk g c replicaKey (unChunkReplicaId replicaId) chunkSpec
425-
logInfo $ "downloaded chunk " <> tshow chunkNo <> " to " <> T.pack chunkPath
425+
logDebug $ "downloaded chunk " <> tshow chunkNo <> " to " <> T.pack chunkPath
426426
downloaded <- atomically . stateTVar downloadedChunks $ \cs ->
427427
let cs' = fromIntegral (unFileSize chunkSize) : cs in (sum cs', cs')
428428
liftIO $ do
@@ -467,7 +467,7 @@ cliDeleteFile DeleteOptions {fileDescription, retryCount, yes} = do
467467
deleteFileChunk a FileChunk {chunkNo, replicas = replica : _} = do
468468
let FileChunkReplica {server, replicaId, replicaKey} = replica
469469
withReconnect a server retryCount $ \c -> deleteXFTPChunk c replicaKey (unChunkReplicaId replicaId)
470-
logInfo $ "deleted chunk " <> tshow chunkNo <> " from " <> showServer server
470+
logDebug $ "deleted chunk " <> tshow chunkNo <> " from " <> showServer server
471471
deleteFileChunk _ _ = throwE $ CLIError "chunk has no replicas"
472472

473473
cliFileDescrInfo :: InfoOptions -> ExceptT CLIError IO ()

src/Simplex/FileTransfer/Server.hs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
181181
stopServer = do
182182
withFileLog closeStoreLog
183183
saveServerStats
184-
logInfo "Server stopped"
184+
logNote "Server stopped"
185185

186186
expireFilesThread_ :: XFTPServerConfig -> [M ()]
187187
expireFilesThread_ XFTPServerConfig {fileExpiration = Just fileExp} = [expireFiles fileExp]
@@ -560,13 +560,13 @@ expireServerFiles itemDelay expCfg = do
560560
usedStart <- readTVarIO $ usedStorage st
561561
old <- liftIO $ expireBeforeEpoch expCfg
562562
files' <- readTVarIO (files st)
563-
logInfo $ "Expiration check: " <> tshow (M.size files') <> " files"
563+
logNote $ "Expiration check: " <> tshow (M.size files') <> " files"
564564
forM_ (M.keys files') $ \sId -> do
565565
mapM_ threadDelay itemDelay
566566
atomically (expiredFilePath st sId old)
567567
>>= mapM_ (maybeRemove $ delete st sId)
568568
usedEnd <- readTVarIO $ usedStorage st
569-
logInfo $ "Used " <> mbs usedStart <> " -> " <> mbs usedEnd <> ", " <> mbs (usedStart - usedEnd) <> " reclaimed."
569+
logNote $ "Used " <> mbs usedStart <> " -> " <> mbs usedEnd <> ", " <> mbs (usedStart - usedEnd) <> " reclaimed."
570570
where
571571
mbs bs = tshow (bs `div` 1048576) <> "mb"
572572
maybeRemove del = maybe del (remove del)
@@ -600,15 +600,15 @@ saveServerStats =
600600
>>= mapM_ (\f -> asks serverStats >>= liftIO . getFileServerStatsData >>= liftIO . saveStats f)
601601
where
602602
saveStats f stats = do
603-
logInfo $ "saving server stats to file " <> T.pack f
603+
logNote $ "saving server stats to file " <> T.pack f
604604
B.writeFile f $ strEncode stats
605-
logInfo "server stats saved"
605+
logNote "server stats saved"
606606

607607
restoreServerStats :: M ()
608608
restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
609609
where
610610
restoreStats f = whenM (doesFileExist f) $ do
611-
logInfo $ "restoring server stats from file " <> T.pack f
611+
logNote $ "restoring server stats from file " <> T.pack f
612612
liftIO (strDecode <$> B.readFile f) >>= \case
613613
Right d@FileServerStatsData {_filesCount = statsFilesCount, _filesSize = statsFilesSize} -> do
614614
s <- asks serverStats
@@ -617,10 +617,10 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat
617617
_filesSize <- readTVarIO usedStorage
618618
liftIO $ setFileServerStats s d {_filesCount, _filesSize}
619619
renameFile f $ f <> ".bak"
620-
logInfo "server stats restored"
620+
logNote "server stats restored"
621621
when (statsFilesCount /= _filesCount) $ logWarn $ "Files count differs: stats: " <> tshow statsFilesCount <> ", store: " <> tshow _filesCount
622622
when (statsFilesSize /= _filesSize) $ logWarn $ "Files size differs: stats: " <> tshow statsFilesSize <> ", store: " <> tshow _filesSize
623-
logInfo $ "Restored " <> tshow (_filesSize `div` 1048576) <> " MBs in " <> tshow _filesCount <> " files"
623+
logNote $ "Restored " <> tshow (_filesSize `div` 1048576) <> " MBs in " <> tshow _filesCount <> " files"
624624
Left e -> do
625-
logInfo $ "error restoring server stats: " <> T.pack e
625+
logNote $ "error restoring server stats: " <> T.pack e
626626
liftIO exitFailure

src/Simplex/FileTransfer/Server/Env.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCrede
103103
used <- countUsedStorage <$> readTVarIO (files store)
104104
atomically $ writeTVar (usedStorage store) used
105105
forM_ fileSizeQuota $ \quota -> do
106-
logInfo $ "Total / available storage: " <> tshow quota <> " / " <> tshow (quota - used)
107-
when (quota < used) $ logInfo "WARNING: storage quota is less than used storage, no files can be uploaded!"
106+
logNote $ "Total / available storage: " <> tshow quota <> " / " <> tshow (quota - used)
107+
when (quota < used) $ logWarn "WARNING: storage quota is less than used storage, no files can be uploaded!"
108108
tlsServerCreds <- loadServerCredential xftpCredentials
109109
Fingerprint fp <- loadFingerprint xftpCredentials
110110
serverStats <- newFileServerStats =<< getCurrentTime

src/Simplex/Messaging/Agent/Store/Postgres.hs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import qualified Simplex.Messaging.Agent.Store.Postgres.DB as DB
3333
import Simplex.Messaging.Agent.Store.Shared (Migration (..), MigrationConfirmation (..), MigrationError (..))
3434
import Simplex.Messaging.Util (ifM, safeDecodeUtf8)
3535
import System.Exit (exitFailure)
36-
import UnliftIO.MVar
3736

3837
-- | Create a new Postgres DBStore with the given connection string, schema name and migrations.
3938
-- If passed schema does not exist in connectInfo database, it will be created.
@@ -54,23 +53,26 @@ createDBStore opts migrations confirmMigrations = do
5453

5554
connectPostgresStore :: DBOpts -> IO DBStore
5655
connectPostgresStore DBOpts {connstr, schema, poolSize, createSchema} = do
57-
dbSem <- newMVar ()
58-
dbPool <- newTBQueueIO poolSize
56+
dbPriorityPool <- newDBStorePool poolSize
57+
dbPool <- newDBStorePool poolSize
5958
dbClosed <- newTVarIO True
60-
let st = DBStore {dbConnstr = connstr, dbSchema = schema, dbPoolSize = fromIntegral poolSize, dbPool, dbSem, dbNew = False, dbClosed}
61-
dbNew <- connectPool st createSchema
59+
let st = DBStore {dbConnstr = connstr, dbSchema = schema, dbPoolSize = fromIntegral poolSize, dbPriorityPool, dbPool, dbNew = False, dbClosed}
60+
dbNew <- connectStore st createSchema
6261
pure st {dbNew}
6362

6463
-- uninterruptibleMask_ here and below is used here so that it is not interrupted half-way,
6564
-- it relies on the assumption that when dbClosed = True, the queue is empty,
6665
-- and when it is False, the queue is full (or will have connections returned to it by the threads that use them).
67-
connectPool :: DBStore -> Bool -> IO Bool
68-
connectPool DBStore {dbConnstr, dbSchema, dbPoolSize, dbPool, dbClosed} createSchema = uninterruptibleMask_ $ do
66+
connectStore :: DBStore -> Bool -> IO Bool
67+
connectStore DBStore {dbConnstr, dbSchema, dbPoolSize, dbPriorityPool, dbPool, dbClosed} createSchema = uninterruptibleMask_ $ do
6968
(conn, dbNew) <- connectDB dbConnstr dbSchema createSchema -- TODO [postgres] analogue for dbBusyLoop?
70-
conns <- replicateM (dbPoolSize - 1) $ fst <$> connectDB dbConnstr dbSchema False
71-
mapM_ (atomically . writeTBQueue dbPool) (conn : conns)
69+
writeConns dbPriorityPool . (conn :) =<< mkConns (dbPoolSize - 1)
70+
writeConns dbPool =<< mkConns dbPoolSize
7271
atomically $ writeTVar dbClosed False
7372
pure dbNew
73+
where
74+
writeConns pool conns = mapM_ (atomically . writeTBQueue (dbPoolConns pool)) conns
75+
mkConns n = replicateM n $ fst <$> connectDB dbConnstr dbSchema False
7476

7577
connectDB :: ByteString -> ByteString -> Bool -> IO (DB.Connection, Bool)
7678
connectDB connstr schema createSchema = do
@@ -111,16 +113,19 @@ doesSchemaExist db schema = do
111113
pure schemaExists
112114

113115
closeDBStore :: DBStore -> IO ()
114-
closeDBStore DBStore {dbPool, dbPoolSize, dbClosed} =
116+
closeDBStore DBStore {dbPoolSize, dbPriorityPool, dbPool, dbClosed} =
115117
ifM (readTVarIO dbClosed) (putStrLn "closeDBStore: already closed") $ uninterruptibleMask_ $ do
116-
replicateM_ dbPoolSize $ atomically (readTBQueue dbPool) >>= DB.close
118+
closePool dbPriorityPool
119+
closePool dbPool
117120
atomically $ writeTVar dbClosed True
121+
where
122+
closePool pool = replicateM_ dbPoolSize $ atomically (readTBQueue $ dbPoolConns pool) >>= DB.close
118123

119124
reopenDBStore :: DBStore -> IO ()
120125
reopenDBStore st =
121126
ifM
122127
(readTVarIO $ dbClosed st)
123-
(void $ connectPool st False)
128+
(void $ connectStore st False)
124129
(putStrLn "reopenDBStore: already opened")
125130

126131
-- not used with postgres client (used for ExecAgentStoreSQL, ExecChatStoreSQL)

src/Simplex/Messaging/Agent/Store/Postgres/Common.hs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
module Simplex.Messaging.Agent.Store.Postgres.Common
88
( DBStore (..),
9+
DBStorePool (..),
910
DBOpts (..),
11+
newDBStorePool,
1012
withConnection,
1113
withConnection',
1214
withTransaction,
@@ -20,26 +22,48 @@ import Control.Concurrent.STM
2022
import Control.Exception (bracket)
2123
import Data.ByteString (ByteString)
2224
import qualified Database.PostgreSQL.Simple as PSQL
25+
import Numeric.Natural (Natural)
2326
import Simplex.Messaging.Agent.Store.Postgres.Options
2427

2528
-- TODO [postgres] use log_min_duration_statement instead of custom slow queries (SQLite's Connection type)
2629
data DBStore = DBStore
2730
{ dbConnstr :: ByteString,
2831
dbSchema :: ByteString,
2932
dbPoolSize :: Int,
30-
dbPool :: TBQueue PSQL.Connection,
31-
-- MVar is needed for fair pool distribution, without STM retry contention.
32-
-- Only one thread can be blocked on STM read.
33-
dbSem :: MVar (),
33+
dbPriorityPool :: DBStorePool,
34+
dbPool :: DBStorePool,
35+
-- dbPoolSize :: Int,
36+
-- dbPool :: TBQueue PSQL.Connection,
37+
-- -- MVar is needed for fair pool distribution, without STM retry contention.
38+
-- -- Only one thread can be blocked on STM read.
39+
-- dbSem :: MVar (),
3440
dbClosed :: TVar Bool,
3541
dbNew :: Bool
3642
}
3743

44+
newDBStorePool :: Natural -> IO DBStorePool
45+
newDBStorePool poolSize = do
46+
dbSem <- newMVar ()
47+
dbPoolConns <- newTBQueueIO poolSize
48+
pure DBStorePool {dbSem, dbPoolConns}
49+
50+
data DBStorePool = DBStorePool
51+
{ dbPoolConns :: TBQueue PSQL.Connection,
52+
-- MVar is needed for fair pool distribution, without STM retry contention.
53+
-- Only one thread can be blocked on STM read.
54+
dbSem :: MVar ()
55+
}
56+
3857
withConnectionPriority :: DBStore -> Bool -> (PSQL.Connection -> IO a) -> IO a
39-
withConnectionPriority DBStore {dbPool, dbSem} _priority =
58+
withConnectionPriority DBStore {dbPriorityPool, dbPool} priority =
59+
withConnectionPool $ if priority then dbPriorityPool else dbPool
60+
{-# INLINE withConnectionPriority #-}
61+
62+
withConnectionPool :: DBStorePool -> (PSQL.Connection -> IO a) -> IO a
63+
withConnectionPool DBStorePool {dbPoolConns, dbSem} =
4064
bracket
41-
(withMVar dbSem $ \_ -> atomically $ readTBQueue dbPool)
42-
(atomically . writeTBQueue dbPool)
65+
(withMVar dbSem $ \_ -> atomically $ readTBQueue dbPoolConns)
66+
(atomically . writeTBQueue dbPoolConns)
4367

4468
withConnection :: DBStore -> (PSQL.Connection -> IO a) -> IO a
4569
withConnection st = withConnectionPriority st False

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import Simplex.Messaging.Session
4242
import Simplex.Messaging.TMap (TMap)
4343
import qualified Simplex.Messaging.TMap as TM
4444
import Simplex.Messaging.Transport
45-
import Simplex.Messaging.Util (catchAll_, ifM, toChunks, whenM, ($>>=), (<$$>))
45+
import Simplex.Messaging.Util (catchAll_, ifM, safeDecodeUtf8, toChunks, tshow, whenM, ($>>=), (<$$>))
4646
import System.Timeout (timeout)
4747
import UnliftIO (async)
4848
import qualified UnliftIO.Exception as E
@@ -321,7 +321,7 @@ withSMP ca srv action = (getSMPServerClient' ca srv >>= action) `catchE` logSMPE
321321
where
322322
logSMPError :: SMPClientError -> ExceptT SMPClientError IO a
323323
logSMPError e = do
324-
liftIO $ putStrLn $ "SMP error (" <> show srv <> "): " <> show e
324+
logInfo $ "SMP error (" <> safeDecodeUtf8 (strEncode $ host srv) <> "): " <> tshow e
325325
throwE e
326326

327327
subscribeQueuesSMP :: SMPClientAgent -> SMPServer -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> IO ()

0 commit comments

Comments
 (0)