Skip to content

Commit 0c1030c

Browse files
authored
smp server: faster export (#1626)
* smp server: faster export * flush * export messages with PostgreSQL database * remove flush * batch message writes * Revert "batch message writes" This reverts commit 61fb3c3. * remove $! * fast journal export * another approach * Revert "smp server: remove dependency of message size on the version (#1627)" This reverts commit 8fea152. * style * faster? * cleanup * cleanup * refactor * refactor * concurrent read messages * Revert "concurrent read messages" This reverts commit 05a32e6. * concurrent read/write * parameter to export to/import from another message log file * Revert "parameter to export to/import from another message log file" This reverts commit 4e88b03. * Revert "concurrent read/write" This reverts commit a8eab1f.
1 parent 23aff6b commit 0c1030c

File tree

8 files changed

+138
-57
lines changed

8 files changed

+138
-57
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control
105105
import Simplex.Messaging.Server.Env.STM as Env
106106
import Simplex.Messaging.Server.Expiration
107107
import Simplex.Messaging.Server.MsgStore
108-
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue)
108+
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages)
109109
import Simplex.Messaging.Server.MsgStore.STM
110110
import Simplex.Messaging.Server.MsgStore.Types
111111
import Simplex.Messaging.Server.NtfStore
@@ -2104,27 +2104,36 @@ randomId = fmap EntityId . randomId'
21042104
{-# INLINE randomId #-}
21052105

21062106
saveServerMessages :: Bool -> MsgStore s -> IO ()
2107-
saveServerMessages drainMsgs = \case
2108-
StoreMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
2107+
saveServerMessages drainMsgs ms = case ms of
2108+
StoreMemory STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
21092109
Just f -> exportMessages False ms f drainMsgs
21102110
Nothing -> logNote "undelivered messages are not saved"
21112111
StoreJournal _ -> logNote "closed journal message storage"
21122112

2113-
exportMessages :: MsgStoreClass s => Bool -> s -> FilePath -> Bool -> IO ()
2114-
exportMessages tty ms f drainMsgs = do
2113+
exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO ()
2114+
exportMessages tty st f drainMsgs = do
21152115
logNote $ "saving messages to file " <> T.pack f
2116-
liftIO $ withFile f WriteMode $ \h ->
2117-
tryAny (unsafeWithAllMsgQueues tty True ms $ saveQueueMsgs h) >>= \case
2118-
Right (Sum total) -> logNote $ "messages saved: " <> tshow total
2116+
run $ case st of
2117+
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
2118+
StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
2119+
where
2120+
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
2121+
run :: (Handle -> IO Int) -> IO ()
2122+
run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case
2123+
Right n -> logNote $ "messages saved: " <> tshow n
21192124
Left e -> do
21202125
logError $ "error exporting messages: " <> tshow e
21212126
exitFailure
2122-
where
2123-
saveQueueMsgs h q = do
2124-
msgs <-
2125-
unsafeRunStore q "saveQueueMsgs" $
2126-
getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
2127-
BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
2127+
getJournalMsgs ms q =
2128+
readTVarIO (msgQueue q) >>= \case
2129+
Just _ -> getMsgs ms q
2130+
Nothing -> getJournalQueueMessages ms q
2131+
getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message]
2132+
getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
2133+
saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int)
2134+
saveQueueMsgs get h q = do
2135+
msgs <- get q
2136+
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
21282137
pure $ Sum $ length msgs
21292138
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
21302139

@@ -2151,7 +2160,7 @@ processServerMessages StartOptions {skipWarnings} = do
21512160
run processValidateQueue
21522161
| otherwise = logWarn "skipping message expiration" $> Nothing
21532162
where
2154-
run a = unsafeWithAllMsgQueues False False ms a `catchAny` \_ -> exitFailure
2163+
run a = unsafeWithAllMsgQueues False ms a `catchAny` \_ -> exitFailure
21552164
processExpireQueue :: Int64 -> JournalQueue s -> IO MessageStats
21562165
processExpireQueue old q = unsafeRunStore q "processExpireQueue" $ do
21572166
mq <- getMsgQueue ms q False

src/Simplex/Messaging/Server/Main.hs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupp
6060
import Simplex.Messaging.Transport.Client (TransportHost (..), defaultSocksProxy)
6161
import Simplex.Messaging.Transport.HTTP2 (httpALPN)
6262
import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig)
63-
import Simplex.Messaging.Util (eitherToMaybe, ifM)
63+
import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM)
6464
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
6565
import System.Exit (exitFailure)
6666
import System.FilePath (combine)
@@ -140,16 +140,25 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
140140
confirmOrExit
141141
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
142142
"Journal not exported"
143-
ms <- newJournalMsgStore logPath MQStoreCfg
144-
-- TODO [postgres] in case postgres configured, queues must be read from database
145-
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
146-
exportMessages True ms storeMsgsFilePath False
147-
putStrLn "Export completed"
148143
case readStoreType ini of
149-
Right (ASType SQSMemory SMSMemory) -> putStrLn "store_messages set to `memory`, start the server."
150-
Right (ASType SQSMemory SMSJournal) -> putStrLn "store_messages set to `journal`, update it to `memory` in INI file"
151-
Right (ASType SQSPostgres SMSJournal) ->
144+
Right (ASType SQSMemory msType) -> do
145+
ms <- newJournalMsgStore logPath MQStoreCfg
146+
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
147+
exportMessages True (StoreJournal ms) storeMsgsFilePath False
148+
putStrLn "Export completed"
149+
putStrLn $ case msType of
150+
SMSMemory -> "store_messages set to `memory`, start the server."
151+
SMSJournal -> "store_messages set to `journal`, update it to `memory` in INI file"
152+
Right (ASType SQSPostgres SMSJournal) -> do
152153
#if defined(dbServerPostgres)
154+
let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath
155+
dbOpts@DBOpts {connstr, schema} = iniDBOptions ini defaultDBOpts
156+
unlessM (checkSchemaExists connstr schema) $ do
157+
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
158+
exitFailure
159+
ms <- newJournalMsgStore logPath $ PQStoreCfg PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini}
160+
exportMessages True (StoreJournal ms) storeMsgsFilePath False
161+
putStrLn "Export completed"
153162
putStrLn "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)."
154163
#else
155164
noPostgresExit
@@ -582,7 +591,7 @@ exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do
582591
ps <- newJournalMsgStore logPath $ PQStoreCfg storeCfg
583592
sl <- openWriteStoreLog False storeLogFilePath
584593
Sum sCnt <- foldServiceRecs (postgresQueueStore ps) $ \sr -> logNewService sl sr $> Sum (1 :: Int)
585-
Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
594+
Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
586595
closeStoreLog sl
587596
pure (sCnt, qCnt)
588597
#endif

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ module Simplex.Messaging.Server.MsgStore.Journal
3838
msgQueueStatePath,
3939
readQueueState,
4040
newMsgQueueState,
41+
getJournalQueueMessages,
4142
newJournalId,
4243
appendState,
4344
queueLogFileName,
@@ -58,7 +59,7 @@ import Control.Monad.Trans.Except
5859
import qualified Data.Attoparsec.ByteString.Char8 as A
5960
import Data.ByteString.Char8 (ByteString)
6061
import qualified Data.ByteString.Char8 as B
61-
import Data.Either (fromRight)
62+
import Data.Either (fromRight, partitionEithers)
6263
import Data.Functor (($>))
6364
import Data.Int (Int64)
6465
import Data.List (sort)
@@ -405,11 +406,11 @@ instance MsgStoreClass (JournalMsgStore s) where
405406

406407
-- This function can only be used in server CLI commands or before server is started.
407408
-- It does not cache queues and is NOT concurrency safe.
408-
unsafeWithAllMsgQueues :: Monoid a => Bool -> Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
409-
unsafeWithAllMsgQueues tty withData ms action = case queueStore_ ms of
409+
unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
410+
unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of
410411
MQStore st -> withLoadedQueues st run
411412
#if defined(dbServerPostgres)
412-
PQStore st -> foldQueueRecs tty withData st Nothing $ uncurry (mkQueue ms False) >=> run
413+
PQStore st -> foldQueueRecs False tty st $ uncurry (mkQueue ms False) >=> run
413414
#endif
414415
where
415416
run q = do
@@ -429,7 +430,7 @@ instance MsgStoreClass (JournalMsgStore s) where
429430
#if defined(dbServerPostgres)
430431
PQStore st -> do
431432
let JournalMsgStore {queueLocks, sharedLock} = ms
432-
foldQueueRecs tty False st (Just veryOld) $ \(rId, qr) -> do
433+
foldRecentQueueRecs veryOld tty st $ \(rId, qr) -> do
433434
q <- mkQueue ms False rId qr
434435
withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $
435436
getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old
@@ -485,7 +486,7 @@ instance MsgStoreClass (JournalMsgStore s) where
485486
where
486487
newQ = do
487488
let dir = msgQueueDirectory ms rId
488-
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
489+
statePath = msgQueueStatePath dir rId
489490
queue = JMQueue {queueDirectory = dir, statePath}
490491
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue)
491492
atomically $ writeTVar msgQueue' $ Just q
@@ -563,8 +564,9 @@ instance MsgStoreClass (JournalMsgStore s) where
563564
where
564565
getSize = maybe (pure (-1)) (fmap size . readTVarIO . state)
565566

567+
-- drainMsgs is never True with Journal storage
566568
getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message]
567-
getQueueMessages_ drainMsgs q' q = StoreIO (run [])
569+
getQueueMessages_ drainMsgs q' q = StoreIO $ if drainMsgs then run [] else readTVarIO (state q) >>= runFast
568570
where
569571
run msgs = readTVarIO (handles q) >>= maybe (pure []) (getMsg msgs)
570572
getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg
@@ -573,6 +575,16 @@ instance MsgStoreClass (JournalMsgStore s) where
573575
(msg, len) <- hGetMsgAt h $ bytePos rs
574576
updateReadPos q' q drainMsgs len hs
575577
(msg :) <$> run msgs
578+
runFast MsgQueueState {writeState = ws, readState = rs, size}
579+
| size > 0 =
580+
readTVarIO (handles q) >>= \case
581+
Just (MsgQueueHandles _ rh wh_) -> do
582+
msgs <- getJournalRange rh (bytePos rs) (byteCount rs)
583+
case wh_ of
584+
Just wh -> (msgs ++) <$> getJournalRange wh 0 (bytePos ws)
585+
Nothing -> pure msgs
586+
Nothing -> pure []
587+
| otherwise = pure []
576588

577589
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
578590
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
@@ -795,8 +807,8 @@ msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathP
795807
let (seg, s') = B.splitAt 2 s
796808
in seg : splitSegments (n - 1) s'
797809

798-
msgQueueStatePath :: FilePath -> String -> FilePath
799-
msgQueueStatePath dir queueId = dir </> (queueLogFileName <> "." <> queueId <> logFileExt)
810+
msgQueueStatePath :: FilePath -> RecipientId -> FilePath
811+
msgQueueStatePath dir rId = dir </> (queueLogFileName <> "." <> B.unpack (strEncode rId) <> logFileExt)
800812

801813
createNewJournal :: FilePath -> ByteString -> IO Handle
802814
createNewJournal dir journalId = do
@@ -1019,3 +1031,33 @@ hClose h =
10191031

10201032
closeOnException :: Handle -> IO a -> IO a
10211033
closeOnException h a = a `E.onException` hClose h
1034+
1035+
getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message]
1036+
getJournalQueueMessages ms q =
1037+
readQueueState ms (msgQueueStatePath dir rId) >>= \case
1038+
(Just MsgQueueState {readState = rs, writeState = ws, size}, _) | size > 0 -> do
1039+
msgs <- getMsgs (journalId rs) (bytePos rs) (byteCount rs)
1040+
if journalId rs == journalId ws
1041+
then pure msgs
1042+
else (msgs ++) <$> getMsgs (journalId ws) 0 (bytePos ws)
1043+
_ -> pure []
1044+
where
1045+
rId = recipientId' q
1046+
dir = msgQueueDirectory ms rId
1047+
getMsgs jId from to =
1048+
IO.withFile (journalFilePath dir jId) ReadWriteMode $ \h' ->
1049+
getJournalRange h' from to
1050+
1051+
getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message]
1052+
getJournalRange h from to
1053+
| to > from = do
1054+
IO.hSeek h AbsoluteSeek $ fromIntegral from
1055+
parseMsgs =<< B.hGet h (fromIntegral $ to - from)
1056+
| otherwise = pure []
1057+
where
1058+
parseMsgs s = do
1059+
let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s
1060+
unless (null errs) $ do
1061+
f <- IO.hShow h
1062+
putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
1063+
pure msgs

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ instance MsgStoreClass STMMsgStore where
8282
{-# INLINE closeMsgStore #-}
8383
withActiveMsgQueues = withLoadedQueues . queueStore_
8484
{-# INLINE withActiveMsgQueues #-}
85-
unsafeWithAllMsgQueues _ _ = withLoadedQueues . queueStore_
85+
unsafeWithAllMsgQueues _ = withLoadedQueues . queueStore_
8686
{-# INLINE unsafeWithAllMsgQueues #-}
8787

8888
expireOldMessages :: Bool -> STMMsgStore -> Int64 -> Int64 -> IO MessageStats

src/Simplex/Messaging/Server/MsgStore/Types.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
4040
closeMsgStore :: s -> IO ()
4141
withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a
4242
-- This function can only be used in server CLI commands or before server is started.
43-
-- tty, withData, store
44-
unsafeWithAllMsgQueues :: Monoid a => Bool -> Bool -> s -> (StoreQueue s -> IO a) -> IO a
43+
-- tty, store
44+
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
4545
-- tty, store, now, ttl
4646
expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
4747
logQueueStates :: s -> IO ()
@@ -84,6 +84,7 @@ data MessageStats = MessageStats
8484
expiredMsgsCount :: Int,
8585
storedQueues :: Int
8686
}
87+
deriving (Show)
8788

8889
instance Monoid MessageStats where
8990
mempty = MessageStats 0 0 0

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ module Simplex.Messaging.Server.QueueStore.Postgres
2525
batchInsertQueues,
2626
foldServiceRecs,
2727
foldQueueRecs,
28+
foldRecentQueueRecs,
2829
handleDuplicate,
2930
withLog_,
3031
withDB',
@@ -545,8 +546,28 @@ foldServiceRecs st f =
545546
DB.fold_ db "SELECT service_id, service_role, service_cert, service_cert_hash, created_at FROM services" mempty $
546547
\ !acc -> fmap (acc <>) . f . rowToServiceRec
547548

548-
foldQueueRecs :: forall a q. Monoid a => Bool -> Bool -> PostgresQueueStore q -> Maybe Int64 -> ((RecipientId, QueueRec) -> IO a) -> IO a
549-
foldQueueRecs tty withData st skipOld_ f = do
549+
foldQueueRecs :: Monoid a => Bool -> Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a
550+
foldQueueRecs withData = foldQueueRecs_ foldRecs
551+
where
552+
foldRecs db acc f'
553+
| withData = DB.fold_ db (queueRecQueryWithData <> cond) acc $ \acc' -> f' acc' . rowToQueueRecWithData
554+
| otherwise = DB.fold_ db (queueRecQuery <> cond) acc $ \acc' -> f' acc' . rowToQueueRec
555+
cond = " WHERE deleted_at IS NULL ORDER BY recipient_id ASC"
556+
557+
foldRecentQueueRecs :: Monoid a => Int64 -> Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a
558+
foldRecentQueueRecs old = foldQueueRecs_ foldRecs
559+
where
560+
foldRecs db acc f' = DB.fold db (queueRecQuery <> cond) (Only old) acc $ \acc' -> f' acc' . rowToQueueRec
561+
cond = " WHERE deleted_at IS NULL AND updated_at > ? ORDER BY recipient_id ASC"
562+
563+
foldQueueRecs_ ::
564+
Monoid a =>
565+
(DB.Connection -> (Int, a) -> ((Int, a) -> (RecipientId, QueueRec) -> IO (Int, a)) -> IO (Int, a)) ->
566+
Bool ->
567+
PostgresQueueStore q ->
568+
((RecipientId, QueueRec) -> IO a) ->
569+
IO a
570+
foldQueueRecs_ foldRecs tty st f = do
550571
(n, r) <- withTransaction (dbStore st) $ \db ->
551572
foldRecs db (0 :: Int, mempty) $ \(i, acc) qr -> do
552573
r <- f qr
@@ -557,13 +578,6 @@ foldQueueRecs tty withData st skipOld_ f = do
557578
when tty $ putStrLn $ progress n
558579
pure r
559580
where
560-
foldRecs db acc f' = case skipOld_ of
561-
Nothing
562-
| withData -> DB.fold_ db (queueRecQueryWithData <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRecWithData
563-
| otherwise -> DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRec
564-
Just old
565-
| withData -> DB.fold db (queueRecQueryWithData <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRecWithData
566-
| otherwise -> DB.fold db (queueRecQuery <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRec
567581
progress i = "Processed: " <> show i <> " records"
568582

569583
queueRecQuery :: Query

0 commit comments

Comments
 (0)