Skip to content

Commit 6f4d0f5

Browse files
authored
Merge pull request #1630 from simplex-chat/db-messages
Feature branch: PostgreSQL message store
2 parents 8372124 + 9cfdae3 commit 6f4d0f5

File tree

22 files changed

+1533
-204
lines changed

22 files changed

+1533
-204
lines changed

simplexmq.cabal

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cabal-version: 1.12
22

33
name: simplexmq
4-
version: 6.4.5.2
4+
version: 6.5.0.0
55
synopsis: SimpleXMQ message broker
66
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
77
<./docs/Simplex-Messaging-Client.html client> and
@@ -266,6 +266,7 @@ library
266266
Simplex.Messaging.Notifications.Server.Store.Postgres
267267
Simplex.Messaging.Notifications.Server.Store.Types
268268
Simplex.Messaging.Notifications.Server.StoreLog
269+
Simplex.Messaging.Server.MsgStore.Postgres
269270
Simplex.Messaging.Server.QueueStore.Postgres
270271
Simplex.Messaging.Server.QueueStore.Postgres.Migrations
271272
other-modules:

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ import Simplex.Messaging.Protocol
287287
import qualified Simplex.Messaging.Protocol as SMP
288288
import Simplex.Messaging.Agent.Store.Entity
289289
import Simplex.Messaging.Transport.Client (TransportHost)
290-
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, tshow, ($>>=), (<$$>))
290+
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, maybeFirstRow', tshow, ($>>=), (<$$>))
291291
import Simplex.Messaging.Version.Internal
292292
import qualified UnliftIO.Exception as E
293293
import UnliftIO.STM
@@ -426,15 +426,12 @@ deleteConnRecord :: DB.Connection -> ConnId -> IO ()
426426
deleteConnRecord db connId = DB.execute db "DELETE FROM connections WHERE conn_id = ?" (Only connId)
427427

428428
checkConfirmedSndQueueExists_ :: DB.Connection -> NewSndQueue -> IO Bool
429-
checkConfirmedSndQueueExists_ db SndQueue {server, sndId} = do
430-
fromMaybe False
431-
<$> maybeFirstRow
432-
fromOnly
433-
( DB.query
434-
db
435-
"SELECT 1 FROM snd_queues WHERE host = ? AND port = ? AND snd_id = ? AND status != ? LIMIT 1"
436-
(host server, port server, sndId, New)
437-
)
429+
checkConfirmedSndQueueExists_ db SndQueue {server, sndId} =
430+
maybeFirstRow' False fromOnly $
431+
DB.query
432+
db
433+
"SELECT 1 FROM snd_queues WHERE host = ? AND port = ? AND snd_id = ? AND status != ? LIMIT 1"
434+
(host server, port server, sndId, New)
438435

439436
getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError (RcvQueue, SomeConn))
440437
getRcvConn db ProtocolServer {host, port} rcvId = runExceptT $ do
@@ -1072,15 +1069,12 @@ toRcvMsg ((agentMsgId, internalTs, brokerId, brokerTs) :. (sndMsgId, integrity,
10721069
in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck}
10731070

10741071
checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
1075-
checkRcvMsgHashExists db connId hash = do
1076-
fromMaybe False
1077-
<$> maybeFirstRow
1078-
fromOnly
1079-
( DB.query
1080-
db
1081-
"SELECT 1 FROM encrypted_rcv_message_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
1082-
(connId, Binary hash)
1083-
)
1072+
checkRcvMsgHashExists db connId hash =
1073+
maybeFirstRow' False fromOnly $
1074+
DB.query
1075+
db
1076+
"SELECT 1 FROM encrypted_rcv_message_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
1077+
(connId, Binary hash)
10841078

10851079
getRcvMsgBrokerTs :: DB.Connection -> ConnId -> SMP.MsgId -> IO (Either StoreError BrokerTs)
10861080
getRcvMsgBrokerTs db connId msgId =
@@ -2119,15 +2113,12 @@ addProcessedRatchetKeyHash db connId hash =
21192113
DB.execute db "INSERT INTO processed_ratchet_key_hashes (conn_id, hash) VALUES (?,?)" (connId, Binary hash)
21202114

21212115
checkRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
2122-
checkRatchetKeyHashExists db connId hash = do
2123-
fromMaybe False
2124-
<$> maybeFirstRow
2125-
fromOnly
2126-
( DB.query
2127-
db
2128-
"SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
2129-
(connId, Binary hash)
2130-
)
2116+
checkRatchetKeyHashExists db connId hash =
2117+
maybeFirstRow' False fromOnly $
2118+
DB.query
2119+
db
2120+
"SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
2121+
(connId, Binary hash)
21312122

21322123
deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
21332124
deleteRatchetKeyHashesExpired db ttl = do
@@ -2905,8 +2896,8 @@ deleteSndFile' db sndFileId =
29052896

29062897
getSndFileDeleted :: DB.Connection -> DBSndFileId -> IO Bool
29072898
getSndFileDeleted db sndFileId =
2908-
fromMaybe True
2909-
<$> maybeFirstRow fromOnlyBI (DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId))
2899+
maybeFirstRow' True fromOnlyBI $
2900+
DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)
29102901

29112902
createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO ()
29122903
createSndFileReplica db SndFileChunk {sndChunkId} = createSndFileReplica_ db sndChunkId

src/Simplex/Messaging/Server.hs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control
105105
import Simplex.Messaging.Server.Env.STM as Env
106106
import Simplex.Messaging.Server.Expiration
107107
import Simplex.Messaging.Server.MsgStore
108-
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages)
108+
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue (..), getJournalQueueMessages)
109109
import Simplex.Messaging.Server.MsgStore.STM
110110
import Simplex.Messaging.Server.MsgStore.Types
111111
import Simplex.Messaging.Server.NtfStore
@@ -132,12 +132,17 @@ import UnliftIO.Directory (doesFileExist, renameFile)
132132
import UnliftIO.Exception
133133
import UnliftIO.IO
134134
import UnliftIO.STM
135+
135136
#if MIN_VERSION_base(4,18,0)
136137
import Data.List (sort)
137138
import GHC.Conc (listThreads, threadStatus)
138139
import GHC.Conc.Sync (threadLabel)
139140
#endif
140141

142+
#if defined(dbServerPostgres)
143+
import Simplex.Messaging.Server.MsgStore.Postgres (exportDbMessages, getDbMessageStats)
144+
#endif
145+
141146
-- | Runs an SMP server using passed configuration.
142147
--
143148
-- See a full server here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-server/Main.hs
@@ -477,7 +482,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
477482
atomicWriteIORef (msgCount stats) stored
478483
atomicModifyIORef'_ (msgExpired stats) (+ expired)
479484
printMessageStats "STORE: messages" msgStats
480-
Left e -> logError $ "STORE: withAllMsgQueues, error expiring messages, " <> tshow e
485+
Left e -> logError $ "STORE: expireOldMessages, error expiring messages, " <> tshow e
481486

482487
expireNtfsThread :: ServerConfig s -> M s ()
483488
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
@@ -2109,13 +2114,19 @@ saveServerMessages drainMsgs ms = case ms of
21092114
Just f -> exportMessages False ms f drainMsgs
21102115
Nothing -> logNote "undelivered messages are not saved"
21112116
StoreJournal _ -> logNote "closed journal message storage"
2117+
#if defined(dbServerPostgres)
2118+
StoreDatabase _ -> logNote "closed postgres message storage"
2119+
#endif
21122120

21132121
exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO ()
21142122
exportMessages tty st f drainMsgs = do
21152123
logNote $ "saving messages to file " <> T.pack f
21162124
run $ case st of
21172125
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
21182126
StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
2127+
#if defined(dbServerPostgres)
2128+
StoreDatabase ms -> exportDbMessages tty ms
2129+
#endif
21192130
where
21202131
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
21212132
run :: (Handle -> IO Int) -> IO ()
@@ -2125,7 +2136,7 @@ exportMessages tty st f drainMsgs = do
21252136
logError $ "error exporting messages: " <> tshow e
21262137
exitFailure
21272138
getJournalMsgs ms q =
2128-
readTVarIO (msgQueue q) >>= \case
2139+
readTVarIO (msgQueue' q) >>= \case
21292140
Just _ -> getMsgs ms q
21302141
Nothing -> getJournalQueueMessages ms q
21312142
getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message]
@@ -2149,6 +2160,9 @@ processServerMessages StartOptions {skipWarnings} = do
21492160
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_ skipWarnings) (pure Nothing)
21502161
Nothing -> pure Nothing
21512162
StoreJournal ms -> processJournalMessages old_ expire ms
2163+
#if defined(dbServerPostgres)
2164+
StoreDatabase ms -> processDbMessages old_ expire ms
2165+
#endif
21522166
processJournalMessages :: forall s. Maybe Int64 -> Bool -> JournalMsgStore s -> IO (Maybe MessageStats)
21532167
processJournalMessages old_ expire ms
21542168
| expire = Just <$> case old_ of
@@ -2171,6 +2185,17 @@ processServerMessages StartOptions {skipWarnings} = do
21712185
processValidateQueue q = unsafeRunStore q "processValidateQueue" $ do
21722186
storedMsgsCount <- getQueueSize_ =<< getMsgQueue ms q False
21732187
pure newMessageStats {storedMsgsCount, storedQueues = 1}
2188+
#if defined(dbServerPostgres)
2189+
processDbMessages old_ expire ms
2190+
| expire = Just <$> case old_ of
2191+
Just old -> do
2192+
-- TODO [messages] expire messages from all queues, not only recent
2193+
logNote "expiring database store messages..."
2194+
now <- systemSeconds <$> getSystemTime
2195+
expireOldMessages False ms now (now - old)
2196+
Nothing -> getDbMessageStats ms
2197+
| otherwise = logWarn "skipping message expiration" $> Nothing
2198+
#endif
21742199

21752200
importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> Bool -> IO MessageStats
21762201
importMessages tty ms f old_ skipWarnings = do

src/Simplex/Messaging/Server/CLI.hs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
3333
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
3434
import Simplex.Messaging.Encoding.String
3535
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), ProtocolServer (..), ProtocolTypeI)
36-
import Simplex.Messaging.Server.Env.STM (ServerStoreCfg (..), StartOptions (..), StorePaths (..))
36+
import Simplex.Messaging.Server.Env.STM (ServerStoreCfg (..), StartOptions (..), dbStoreCfg, storeLogFile')
3737
import Simplex.Messaging.Server.Main.GitCommit
3838
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
3939
import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), TLS, Transport (..), simplexMQVersion)
@@ -414,12 +414,13 @@ printServerTransports protocol ts = do
414414
\Set `port` in smp-server.ini section [TRANSPORT] to `5223,443`\n"
415415

416416
printSMPServerConfig :: [(ServiceName, ASrvTransport, AddHTTP)] -> ServerStoreCfg s -> IO ()
417-
printSMPServerConfig transports = \case
418-
SSCMemory sp_ -> printServerConfig "SMP" transports $ (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_
419-
SSCMemoryJournal {storeLogFile} -> printServerConfig "SMP" transports $ Just storeLogFile
420-
SSCDatabaseJournal {storeCfg = PostgresStoreCfg {dbOpts = DBOpts {connstr, schema}}} -> do
421-
B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema
422-
printServerTransports "SMP" transports
417+
printSMPServerConfig transports st = case dbStoreCfg st of
418+
Just cfg -> printDBConfig cfg
419+
Nothing -> printServerConfig "SMP" transports $ storeLogFile' st
420+
where
421+
printDBConfig PostgresStoreCfg {dbOpts = DBOpts {connstr, schema}} = do
422+
B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema
423+
printServerTransports "SMP" transports
423424

424425
deleteDirIfExists :: FilePath -> IO ()
425426
deleteDirIfExists path = whenM (doesDirectoryExist path) $ removeDirectoryRecursive path

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ module Simplex.Messaging.Server.Env.STM
7272
defaultIdleQueueInterval,
7373
journalMsgStoreDepth,
7474
readWriteQueueStore,
75+
noPostgresExitStr,
7576
noPostgresExit,
77+
dbStoreCfg,
78+
storeLogFile',
7679
)
7780
where
7881

@@ -131,6 +134,10 @@ import System.IO (IOMode (..))
131134
import System.Mem.Weak (Weak)
132135
import UnliftIO.STM
133136

137+
#if defined(dbServerPostgres)
138+
import Simplex.Messaging.Server.MsgStore.Postgres
139+
#endif
140+
134141
data ServerConfig s = ServerConfig
135142
{ transports :: [(ServiceName, ASrvTransport, AddHTTP)],
136143
smpHandshakeTimeout :: Int,
@@ -275,14 +282,25 @@ fromMsgStore :: MsgStore s -> s
275282
fromMsgStore = \case
276283
StoreMemory s -> s
277284
StoreJournal s -> s
285+
#if defined(dbServerPostgres)
286+
StoreDatabase s -> s
287+
#endif
278288
{-# INLINE fromMsgStore #-}
279289

280290
type family SupportedStore (qs :: QSType) (ms :: MSType) :: Constraint where
281291
SupportedStore 'QSMemory 'MSMemory = ()
282292
SupportedStore 'QSMemory 'MSJournal = ()
283-
SupportedStore 'QSPostgres 'MSJournal = ()
293+
SupportedStore 'QSMemory 'MSPostgres =
294+
(Int ~ Bool, TypeError ('TE.Text "Storing messages in Postgres DB with queues in memory is not supported"))
284295
SupportedStore 'QSPostgres 'MSMemory =
285-
(Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with Postgres DB is not supported"))
296+
(Int ~ Bool, TypeError ('TE.Text "Storing messages in memory with queues in Postgres DB is not supported"))
297+
SupportedStore 'QSPostgres 'MSJournal = ()
298+
#if defined(dbServerPostgres)
299+
SupportedStore 'QSPostgres 'MSPostgres = ()
300+
#else
301+
SupportedStore 'QSPostgres 'MSPostgres =
302+
(Int ~ Bool, TypeError ('TE.Text "Server compiled without server_postgres flag"))
303+
#endif
286304

287305
data AStoreType =
288306
forall qs ms. (SupportedStore qs ms, MsgStoreClass (MsgStoreType qs ms)) =>
@@ -292,16 +310,43 @@ data ServerStoreCfg s where
292310
SSCMemory :: Maybe StorePaths -> ServerStoreCfg STMMsgStore
293311
SSCMemoryJournal :: {storeLogFile :: FilePath, storeMsgsPath :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSMemory)
294312
SSCDatabaseJournal :: {storeCfg :: PostgresStoreCfg, storeMsgsPath' :: FilePath} -> ServerStoreCfg (JournalMsgStore 'QSPostgres)
313+
#if defined(dbServerPostgres)
314+
SSCDatabase :: PostgresStoreCfg -> ServerStoreCfg PostgresMsgStore
315+
#endif
316+
317+
dbStoreCfg :: ServerStoreCfg s -> Maybe PostgresStoreCfg
318+
dbStoreCfg = \case
319+
SSCMemory _ -> Nothing
320+
SSCMemoryJournal {} -> Nothing
321+
SSCDatabaseJournal {storeCfg} -> Just storeCfg
322+
#if defined(dbServerPostgres)
323+
SSCDatabase cfg -> Just cfg
324+
#endif
325+
326+
storeLogFile' :: ServerStoreCfg s -> Maybe FilePath
327+
storeLogFile' = \case
328+
SSCMemory sp_ -> (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_
329+
SSCMemoryJournal {storeLogFile} -> Just storeLogFile
330+
SSCDatabaseJournal {storeCfg = PostgresStoreCfg {dbStoreLogPath}} -> dbStoreLogPath
331+
#if defined(dbServerPostgres)
332+
SSCDatabase (PostgresStoreCfg {dbStoreLogPath}) -> dbStoreLogPath
333+
#endif
295334

296335
data StorePaths = StorePaths {storeLogFile :: FilePath, storeMsgsFile :: Maybe FilePath}
297336

298337
type family MsgStoreType (qs :: QSType) (ms :: MSType) where
299338
MsgStoreType 'QSMemory 'MSMemory = STMMsgStore
300339
MsgStoreType qs 'MSJournal = JournalMsgStore qs
340+
#if defined(dbServerPostgres)
341+
MsgStoreType 'QSPostgres 'MSPostgres = PostgresMsgStore
342+
#endif
301343

302344
data MsgStore s where
303345
StoreMemory :: STMMsgStore -> MsgStore STMMsgStore
304346
StoreJournal :: JournalMsgStore qs -> MsgStore (JournalMsgStore qs)
347+
#if defined(dbServerPostgres)
348+
StoreDatabase :: PostgresMsgStore -> MsgStore PostgresMsgStore
349+
#endif
305350

306351
data Server s = Server
307352
{ clients :: ServerClients s,
@@ -533,8 +578,12 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
533578
qsCfg = PQStoreCfg (storeCfg {confirmMigrations} :: PostgresStoreCfg)
534579
cfg = mkJournalStoreConfig qsCfg storeMsgsPath' msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval
535580
when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
536-
ms <- newMsgStore cfg
537-
pure $ StoreJournal ms
581+
StoreJournal <$> newMsgStore cfg
582+
SSCDatabase storeCfg -> do
583+
let StartOptions {compactLog, confirmMigrations} = startOptions config
584+
cfg = PostgresMsgStoreCfg storeCfg {confirmMigrations} msgQueueQuota
585+
when compactLog $ compactDbStoreLog $ dbStoreLogPath storeCfg
586+
StoreDatabase <$> newMsgStore cfg
538587
#else
539588
SSCDatabaseJournal {} -> noPostgresExit
540589
#endif
@@ -628,10 +677,12 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
628677
_ -> SPMMessages
629678

630679
noPostgresExit :: IO a
631-
noPostgresExit = do
632-
putStrLn "Error: server binary is compiled without support for PostgreSQL database."
633-
putStrLn "Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`."
634-
exitFailure
680+
noPostgresExit = putStrLn noPostgresExitStr >> exitFailure
681+
682+
noPostgresExitStr :: String
683+
noPostgresExitStr =
684+
"Error: server binary is compiled without support for PostgreSQL database.\n"
685+
<> "Please download `smp-server-postgres` or re-compile with `cabal build -fserver_postgres`."
635686

636687
mkJournalStoreConfig :: QStoreCfg s -> FilePath -> Int -> Int -> Int -> Int64 -> JournalStoreConfig s
637688
mkJournalStoreConfig queueStoreCfg storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval =

0 commit comments

Comments
 (0)