Skip to content

Commit 6db7980

Browse files
authored
smp server: use COPY to import store log to postgres db, improve concurrency and error handling (#1487)
* smp server: use COPY to import store log to postgres db * compact queues when importing to postgres * mempty * version * handle errors while expiring, mask async exceptions while getting queue * whitespace * version
1 parent f4b55bf commit 6db7980

File tree

6 files changed

+74
-35
lines changed

6 files changed

+74
-35
lines changed

simplexmq.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cabal-version: 1.12
22

33
name: simplexmq
4-
version: 6.3.0.805
4+
version: 6.3.0.8
55
synopsis: SimpleXMQ message broker
66
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
77
<./docs/Simplex-Messaging-Client.html client> and

src/Simplex/Messaging/Server.hs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -399,15 +399,17 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
399399
expire :: forall s. MsgStoreClass s => s -> ServerStats -> Int64 -> IO ()
400400
expire ms stats interval = do
401401
threadDelay' interval
402+
logInfo "Started expiring messages..."
402403
n <- compactQueues @(StoreQueue s) $ queueStore ms
403404
when (n > 0) $ logInfo $ "Removed " <> tshow n <> " old deleted queues from the database."
404405
old <- expireBeforeEpoch expCfg
405406
now <- systemSeconds <$> getSystemTime
406-
msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <-
407-
withAllMsgQueues False "idleDeleteExpiredMsgs" ms $ expireQueueMsgs now ms old
408-
atomicWriteIORef (msgCount stats) stored
409-
atomicModifyIORef'_ (msgExpired stats) (+ expired)
410-
printMessageStats "STORE: messages" msgStats
407+
tryAny (withAllMsgQueues False "idleDeleteExpiredMsgs" ms $ expireQueueMsgs now ms old) >>= \case
408+
Right msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} -> do
409+
atomicWriteIORef (msgCount stats) stored
410+
atomicModifyIORef'_ (msgExpired stats) (+ expired)
411+
printMessageStats "STORE: messages" msgStats
412+
Left e -> logError $ "STORE: withAllMsgQueues, error expiring messages, " <> tshow e
411413
expireQueueMsgs now ms old q = do
412414
(expired_, stored) <- idleDeleteExpiredMsgs now ms q old
413415
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}

src/Simplex/Messaging/Server/Main.hs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ import Simplex.Messaging.Server.MsgStore.Types (QSType (..))
7171
import Simplex.Messaging.Server.MsgStore.Journal (postgresQueueStore)
7272
import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, foldQueueRecs)
7373
import Simplex.Messaging.Server.QueueStore.Types
74-
import Simplex.Messaging.Server.StoreLog (logCreateQueue, openWriteStoreLog)
74+
import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue, openWriteStoreLog)
7575
import System.Directory (renameFile)
7676
#endif
7777

@@ -176,21 +176,25 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
176176
| otherwise -> do
177177
storeLogFile <- getRequiredStoreLogFile ini
178178
confirmOrExit
179-
("WARNING: store log file " <> storeLogFile <> " will be imported to PostrgreSQL database: " <> B.unpack connstr <> ", schema: " <> B.unpack schema)
179+
("WARNING: store log file " <> storeLogFile <> " will be compacted and imported to PostrgreSQL database: " <> B.unpack connstr <> ", schema: " <> B.unpack schema)
180180
"Queue records not imported"
181181
ms <- newJournalMsgStore MQStoreCfg
182-
readQueueStore True (mkQueue ms) storeLogFile (queueStore ms)
182+
sl <- readWriteQueueStore True (mkQueue ms) storeLogFile (queueStore ms)
183+
closeStoreLog sl
183184
queues <- readTVarIO $ loadedQueues $ stmQueueStore ms
184185
let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini}
185186
ps <- newJournalMsgStore $ PQStoreCfg storeCfg
186187
qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps
187188
renameFile storeLogFile $ storeLogFile <> ".bak"
188189
putStrLn $ "Import completed: " <> show qCnt <> " queues"
189190
putStrLn $ case readStoreType ini of
190-
Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`.\nImport messages to journal to use PostgreSQL database for queues (`smp-server journal import`)"
191-
Right (ASType SQSMemory SMSJournal) -> "store_queues set to `memory`, update it to `database` in INI file"
191+
Right (ASType SQSMemory SMSMemory) -> setToDbStr <> "\nstore_messages set to `memory`, import messages to journal to use PostgreSQL database for queues (`smp-server journal import`)"
192+
Right (ASType SQSMemory SMSJournal) -> setToDbStr
192193
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, start the server."
193194
Left e -> e <> ", configure storage correctly"
195+
where
196+
setToDbStr :: String
197+
setToDbStr = "store_queues set to `memory`, update it to `database` in INI file"
194198
SCExport
195199
| schemaExists && storeLogExists -> exitConfigureQueueStore connstr schema
196200
| not schemaExists -> do

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,25 @@ import Control.Monad
3232
import Control.Monad.Except
3333
import Control.Monad.IO.Class
3434
import Control.Monad.Trans.Except
35+
import Data.ByteString.Builder (Builder)
36+
import qualified Data.ByteString.Builder as BB
37+
import Data.ByteString.Char8 (ByteString)
38+
import qualified Data.ByteString.Char8 as B
39+
import qualified Data.ByteString.Lazy as LB
3540
import Data.Bitraversable (bimapM)
3641
import Data.Either (fromRight)
3742
import Data.Functor (($>))
3843
import Data.Int (Int64)
44+
import Data.List (intersperse)
3945
import qualified Data.Map.Strict as M
4046
import Data.Maybe (catMaybes)
4147
import qualified Data.Text as T
4248
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
4349
import Database.PostgreSQL.Simple (Binary (..), Only (..), Query, SqlError)
4450
import qualified Database.PostgreSQL.Simple as DB
51+
import qualified Database.PostgreSQL.Simple.Copy as DB
4552
import Database.PostgreSQL.Simple.FromField (FromField (..))
46-
import Database.PostgreSQL.Simple.ToField (ToField (..))
53+
import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..))
4754
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
4855
import Database.PostgreSQL.Simple.SqlQQ (sql)
4956
import GHC.IO (catchAny)
@@ -160,7 +167,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
160167
loadSndQueue = loadQueue " WHERE sender_id = ?" $ \rId -> TM.insert qId rId senders
161168
loadNtfQueue = loadQueue " WHERE notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare
162169
loadQueue condition insertRef =
163-
runExceptT $ do
170+
E.uninterruptibleMask_ $ runExceptT $ do
164171
(rId, qRec) <-
165172
withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $
166173
DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId)
@@ -182,7 +189,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
182189

183190
secureQueue :: PostgresQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
184191
secureQueue st sq sKey =
185-
withQueueDB sq "secureQueue" $ \q -> do
192+
withQueueRec sq "secureQueue" $ \q -> do
186193
verify q
187194
assertUpdated $ withDB' "secureQueue" st $ \db ->
188195
DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ? AND deleted_at IS NULL" (sKey, rId)
@@ -196,7 +203,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
196203

197204
addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
198205
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} =
199-
withQueueDB sq "addQueueNotifier" $ \q ->
206+
withQueueRec sq "addQueueNotifier" $ \q ->
200207
ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $
201208
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do
202209
assertUpdated $ withDB "addQueueNotifier" st $ \db ->
@@ -223,7 +230,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
223230

224231
deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId))
225232
deleteQueueNotifier st sq =
226-
withQueueDB sq "deleteQueueNotifier" $ \q ->
233+
withQueueRec sq "deleteQueueNotifier" $ \q ->
227234
ExceptT $ fmap sequence $ forM (notifier q) $ \NtfCreds {notifierId = nId} ->
228235
withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do
229236
assertUpdated $ withDB' "deleteQueueNotifier" st update
@@ -260,7 +267,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
260267

261268
updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
262269
updateQueueTime st sq t =
263-
withQueueDB sq "updateQueueTime" $ \q@QueueRec {updatedAt} ->
270+
withQueueRec sq "updateQueueTime" $ \q@QueueRec {updatedAt} ->
264271
if updatedAt == Just t
265272
then pure q
266273
else do
@@ -295,21 +302,19 @@ batchInsertQueues tty queues toStore = do
295302
qs <- catMaybes <$> mapM (\(rId, q) -> (rId,) <$$> readTVarIO (queueRec q)) (M.assocs queues)
296303
putStrLn $ "Importing " <> show (length qs) <> " queues..."
297304
let st = dbStore toStore
298-
(qCnt, count) <- foldM (processChunk st) (0, 0) $ toChunks 1000000 qs
305+
count <-
306+
withConnection st $ \db -> do
307+
DB.copy_ db "COPY msg_queues (recipient_id, recipient_key, rcv_dh_secret, sender_id, sender_key, snd_secure, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at) FROM STDIN WITH (FORMAT CSV)"
308+
mapM_ (putQueue db) (zip [1..] qs)
309+
DB.putCopyEnd db
310+
Only qCnt : _ <- withConnection st (`DB.query_` "SELECT count(*) FROM msg_queues")
299311
putStrLn $ progress count
300312
pure qCnt
301313
where
302-
processChunk st (qCnt, i) qs = do
303-
qCnt' <- withConnection st $ \db -> DB.executeMany db insertQueueQuery $ map queueRecToRow qs
304-
let i' = i + length qs
305-
when tty $ putStr (progress i' <> "\r") >> hFlush stdout
306-
pure (qCnt + qCnt', i')
314+
putQueue db (i :: Int, q) = do
315+
DB.putCopyData db $ queueRecToText q
316+
when (tty && i `mod` 100000 == 0) $ putStr (progress i <> "\r") >> hFlush stdout
307317
progress i = "Imported: " <> show i <> " queues"
308-
toChunks :: Int -> [a] -> [[a]]
309-
toChunks _ [] = []
310-
toChunks n xs =
311-
let (ys, xs') = splitAt n xs
312-
in ys : toChunks n xs'
313318

314319
insertQueueQuery :: Query
315320
insertQueueQuery =
@@ -349,21 +354,49 @@ queueRecToRow :: (RecipientId, QueueRec) -> QueueRecRow
349354
queueRecToRow (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier = n, status, updatedAt}) =
350355
(rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifierId <$> n, notifierKey <$> n, rcvNtfDhSecret <$> n, status, updatedAt)
351356

357+
queueRecToText :: (RecipientId, QueueRec) -> ByteString
358+
queueRecToText (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier = n, status, updatedAt}) =
359+
LB.toStrict $ BB.toLazyByteString $ mconcat tabFields <> BB.char7 '\n'
360+
where
361+
tabFields = BB.char7 ',' `intersperse` fields
362+
fields =
363+
[ renderField (toField rId),
364+
renderField (toField recipientKey),
365+
renderField (toField rcvDhSecret),
366+
renderField (toField senderId),
367+
nullable senderKey,
368+
renderField (toField sndSecure),
369+
nullable (notifierId <$> n),
370+
nullable (notifierKey <$> n),
371+
nullable (rcvNtfDhSecret <$> n),
372+
BB.char7 '"' <> renderField (toField status) <> BB.char7 '"',
373+
nullable updatedAt
374+
]
375+
nullable :: ToField a => Maybe a -> Builder
376+
nullable = maybe mempty (renderField . toField)
377+
renderField :: Action -> Builder
378+
renderField = \case
379+
Plain bld -> bld
380+
Escape s -> BB.byteString s
381+
EscapeByteA s -> BB.string7 "\\x" <> BB.byteStringHex s
382+
EscapeIdentifier s -> BB.byteString s -- Not used in COPY data
383+
Many as -> mconcat (map renderField as)
384+
352385
rowToQueueRec :: QueueRecRow -> (RecipientId, QueueRec)
353386
rowToQueueRec (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifierId_, notifierKey_, rcvNtfDhSecret_, status, updatedAt) =
354387
let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_
355388
in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt})
356389

357390
setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> ExceptT ErrorType IO () -> IO (Either ErrorType ())
358391
setStatusDB op st sq status writeLog =
359-
withQueueDB sq op $ \q -> do
392+
withQueueRec sq op $ \q -> do
360393
assertUpdated $ withDB' op st $ \db ->
361394
DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ? AND deleted_at IS NULL" (status, recipientId sq)
362395
atomically $ writeTVar (queueRec sq) $ Just q {status}
363396
writeLog
364397

365-
withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a)
366-
withQueueDB sq op action =
398+
withQueueRec :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a)
399+
withQueueRec sq op action =
367400
withQueueLock sq op $ E.uninterruptibleMask_ $ runExceptT $ ExceptT (readQueueRecIO $ queueRec sq) >>= action
368401

369402
assertUpdated :: ExceptT ErrorType IO Int64 -> ExceptT ErrorType IO ()
@@ -379,7 +412,7 @@ withDB op st action =
379412
logErr :: E.SomeException -> IO (Either ErrorType a)
380413
logErr e = logError ("STORE: " <> T.pack err) $> Left (STORE err)
381414
where
382-
err = op <> ", withLog, " <> show e
415+
err = op <> ", withDB, " <> show e
383416

384417
withLog :: MonadIO m => String -> PostgresQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> m ()
385418
withLog op PostgresQueueStore {dbStoreLog} action =

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ module Simplex.Messaging.Server.QueueStore.STM
1818
( STMQueueStore (..),
1919
setStoreLog,
2020
withLog',
21-
withQueueRec,
2221
readQueueRecIO,
2322
setStatus,
2423
)

src/Simplex/Messaging/Server/StoreLog.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,9 @@ removeStoreLogBackups f = do
259259
times2 = take (length times1 - minOldBackups) times1 -- keep 3 backups older than 24 hours
260260
toDelete = filter (< old) times2 -- remove all backups older than 21 day
261261
mapM_ (removeFile . backupPath) toDelete
262-
putStrLn $ "Removed " <> show (length toDelete) <> " backups:"
263-
mapM_ (putStrLn . backupPath) toDelete
262+
when (length toDelete > 0) $ do
263+
putStrLn $ "Removed " <> show (length toDelete) <> " backups:"
264+
mapM_ (putStrLn . backupPath) toDelete
264265
where
265266
backupPathTime :: FilePath -> Maybe UTCTime
266267
backupPathTime = iso8601ParseM <=< stripPrefix backupPathPfx

0 commit comments

Comments
 (0)