Skip to content

Commit d3275ce

Browse files
authored
smp server: combine messages and queue storage to optimise performance, prevent race condition when deleting queue and to avoid "orphan" messages. (#1395)
* smp server: combine queue and message store into one class (WIP) * keep deleted queue tombstones to prevent race conditions and errors when restoring * move store log from server to store implementations * STMQueueStore type class * fix store closed when messages expired, handle store writing errors * types * version * fix recovery from missing write journal, tests * version
1 parent 0fd4aa1 commit d3275ce

File tree

17 files changed

+997
-655
lines changed

17 files changed

+997
-655
lines changed

simplexmq.cabal

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cabal-version: 1.12
22

33
name: simplexmq
4-
version: 6.2.0.1
4+
version: 6.2.0.101
55
synopsis: SimpleXMQ message broker
66
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
77
<./docs/Simplex-Messaging-Client.html client> and
@@ -200,6 +200,7 @@ library
200200
Simplex.Messaging.Server.QueueStore.STM
201201
Simplex.Messaging.Server.Stats
202202
Simplex.Messaging.Server.StoreLog
203+
Simplex.Messaging.Server.StoreLog.Types
203204
Simplex.Messaging.Notifications.Server
204205
Simplex.Messaging.Notifications.Server.Control
205206
Simplex.Messaging.Notifications.Server.Env

src/Simplex/FileTransfer/Server.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
181181
stopServer = do
182182
withFileLog closeStoreLog
183183
saveServerStats
184+
logInfo "Server stopped"
184185

185186
expireFilesThread_ :: XFTPServerConfig -> [M ()]
186187
expireFilesThread_ XFTPServerConfig {fileExpiration = Just fileExp} = [expireFiles fileExp]

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,12 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do
115115

116116
stopServer :: M ()
117117
stopServer = do
118+
logInfo "Saving server state..."
118119
saveServer
119120
NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber
120121
liftIO $ readTVarIO smpSubscribers >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (deRefWeak >=> mapM_ killThread))
121122
liftIO $ closeSMPClientAgent smpAgent
123+
logInfo "Server stopped"
122124

123125
saveServer :: M ()
124126
saveServer = withNtfLog closeStoreLog >> saveServerLastNtfs >> saveServerStats

src/Simplex/Messaging/Protocol.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,11 +538,13 @@ messageId :: Message -> MsgId
538538
messageId = \case
539539
Message {msgId} -> msgId
540540
MessageQuota {msgId} -> msgId
541+
{-# INLINE messageId #-}
541542

542543
messageTs :: Message -> SystemTime
543544
messageTs = \case
544545
Message {msgTs} -> msgTs
545546
MessageQuota {msgTs} -> msgTs
547+
{-# INLINE messageTs #-}
546548

547549
newtype EncRcvMsgBody = EncRcvMsgBody ByteString
548550
deriving (Eq, Show)

src/Simplex/Messaging/Server.hs

Lines changed: 229 additions & 246 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import Simplex.Messaging.Server.MsgStore.Journal
4343
import Simplex.Messaging.Server.MsgStore.STM
4444
import Simplex.Messaging.Server.MsgStore.Types
4545
import Simplex.Messaging.Server.NtfStore
46-
import Simplex.Messaging.Server.QueueStore (QueueRec (..))
46+
import Simplex.Messaging.Server.QueueStore
4747
import Simplex.Messaging.Server.QueueStore.STM
4848
import Simplex.Messaging.Server.Stats
4949
import Simplex.Messaging.Server.StoreLog
@@ -165,27 +165,25 @@ data Env = Env
165165
serverInfo :: ServerInformation,
166166
server :: Server,
167167
serverIdentity :: KeyHash,
168-
queueStore :: QueueStore,
169168
msgStore :: AMsgStore,
170169
ntfStore :: NtfStore,
171170
random :: TVar ChaChaDRG,
172-
storeLog :: Maybe (StoreLog 'WriteMode),
173171
tlsServerCreds :: T.Credential,
174172
httpServerCreds :: Maybe T.Credential,
175173
serverStats :: ServerStats,
176174
sockets :: TVar [(ServiceName, SocketState)],
177175
clientSeq :: TVar ClientId,
178-
clients :: TVar (IntMap (Maybe Client)),
176+
clients :: TVar (IntMap (Maybe AClient)),
179177
proxyAgent :: ProxyAgent -- senders served on this proxy
180178
}
181179

182180
type family MsgStore s where
183181
MsgStore 'MSMemory = STMMsgStore
184182
MsgStore 'MSJournal = JournalMsgStore
185183

186-
data AMsgStore = forall s. MsgStoreClass (MsgStore s) => AMS (SMSType s) (MsgStore s)
184+
data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s)
187185

188-
data AMsgQueue = forall s. MsgStoreClass (MsgStore s) => AMQ (SMSType s) (MsgQueue (MsgStore s))
186+
data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s))
189187

190188
data AMsgStoreCfg = forall s. MsgStoreClass (MsgStore s) => AMSC (SMSType s) (MsgStoreConfig (MsgStore s))
191189

@@ -197,11 +195,11 @@ type Subscribed = Bool
197195

198196
data Server = Server
199197
{ subscribedQ :: TQueue (RecipientId, ClientId, Subscribed),
200-
subscribers :: TMap RecipientId (TVar Client),
198+
subscribers :: TMap RecipientId (TVar AClient),
201199
ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed),
202-
notifiers :: TMap NotifierId (TVar Client),
203-
subClients :: TVar (IntMap Client), -- clients with SMP subscriptions
204-
ntfSubClients :: TVar (IntMap Client), -- clients with Ntf subscriptions
200+
notifiers :: TMap NotifierId (TVar AClient),
201+
subClients :: TVar (IntMap AClient), -- clients with SMP subscriptions
202+
ntfSubClients :: TVar (IntMap AClient), -- clients with Ntf subscriptions
205203
pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))),
206204
pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))),
207205
savingLock :: Lock
@@ -213,11 +211,16 @@ newtype ProxyAgent = ProxyAgent
213211

214212
type ClientId = Int
215213

216-
data Client = Client
214+
data AClient = forall s. MsgStoreClass (MsgStore s) => AClient (SMSType s) (Client (MsgStore s))
215+
216+
clientId' :: AClient -> ClientId
217+
clientId' (AClient _ Client {clientId}) = clientId
218+
219+
data Client s = Client
217220
{ clientId :: ClientId,
218221
subscriptions :: TMap RecipientId Sub,
219222
ntfSubscriptions :: TMap NotifierId (),
220-
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
223+
rcvQ :: TBQueue (NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd)),
221224
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
222225
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
223226
procThreads :: TVar Int,
@@ -253,8 +256,8 @@ newServer = do
253256
savingLock <- createLockIO
254257
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock}
255258

256-
newClient :: ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO Client
257-
newClient clientId qSize thVersion sessionId createdAt = do
259+
newClient :: SMSType s -> ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO (Client (MsgStore s))
260+
newClient _msType clientId qSize thVersion sessionId createdAt = do
258261
subscriptions <- TM.emptyIO
259262
ntfSubscriptions <- TM.emptyIO
260263
rcvQ <- newTBQueueIO qSize
@@ -283,8 +286,7 @@ newEnv :: ServerConfig -> IO Env
283286
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
284287
serverActive <- newTVarIO True
285288
server <- newServer
286-
queueStore <- newQueueStore
287-
msgStore <- case msgStoreType of
289+
msgStore@(AMS _ store) <- case msgStoreType of
288290
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
289291
AMSType SMSJournal -> case storeMsgsFile of
290292
Just storePath ->
@@ -293,10 +295,10 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
293295
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
294296
ntfStore <- NtfStore <$> TM.emptyIO
295297
random <- C.newRandom
296-
storeLog <-
297-
forM storeLogFile $ \f -> do
298-
logInfo $ "restoring queues from file " <> T.pack f
299-
readWriteQueueStore f queueStore
298+
forM_ storeLogFile $ \f -> do
299+
logInfo $ "restoring queues from file " <> T.pack f
300+
sl <- readWriteQueueStore f store
301+
setStoreLog store sl
300302
tlsServerCreds <- getCredentials "SMP" smpCredentials
301303
httpServerCreds <- mapM (getCredentials "HTTPS") httpCredentials
302304
mapM_ checkHTTPSCredentials httpServerCreds
@@ -307,7 +309,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
307309
clientSeq <- newTVarIO 0
308310
clients <- newTVarIO mempty
309311
proxyAgent <- newSMPProxyAgent smpAgentCfg random
310-
pure Env {serverActive, config, serverInfo, server, serverIdentity, queueStore, msgStore, ntfStore, random, storeLog, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
312+
pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
311313
where
312314
getCredentials protocol creds = do
313315
files <- missingCreds
@@ -351,3 +353,6 @@ newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
351353
newSMPProxyAgent smpAgentCfg random = do
352354
smpAgent <- newSMPClientAgent smpAgentCfg random
353355
pure ProxyAgent {smpAgent}
356+
357+
readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode)
358+
readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore

src/Simplex/Messaging/Server/Main.hs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ import Simplex.Messaging.Server.Expiration
4646
import Simplex.Messaging.Server.Information
4747
import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..))
4848
import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore)
49+
import Simplex.Messaging.Server.QueueStore.STM (readQueueStore)
4950
import Simplex.Messaging.Transport (batchCmdsSMPVersion, sendingProxySMPVersion, simplexMQVersion, supportedServerSMPRelayVRange)
5051
import Simplex.Messaging.Transport.Client (SocksProxy, TransportHost (..), defaultSocksProxy)
5152
import Simplex.Messaging.Transport.Server (ServerCredentials (..), TransportServerConfig (..), defaultTransportServerConfig)
52-
import Simplex.Messaging.Util (eitherToMaybe, safeDecodeUtf8, tshow)
53+
import Simplex.Messaging.Util (eitherToMaybe, ifM, safeDecodeUtf8, tshow)
5354
import Simplex.Messaging.Version (mkVersionRange)
5455
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
5556
import System.Exit (exitFailure)
@@ -85,6 +86,14 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
8586
Journal cmd -> withIniFile $ \ini -> do
8687
msgsDirExists <- doesDirectoryExist storeMsgsJournalDir
8788
msgsFileExists <- doesFileExist storeMsgsFilePath
89+
let enableStoreLog = settingIsOn "STORE_LOG" "enable" ini
90+
storeLogFile <- case enableStoreLog $> storeLogFilePath of
91+
Just storeLogFile -> do
92+
ifM
93+
(doesFileExist storeLogFile)
94+
(pure storeLogFile)
95+
(putStrLn ("Store log file " <> storeLogFile <> " not found") >> exitFailure)
96+
Nothing -> putStrLn "Store log disabled, see `[STORE_LOG] enable`" >> exitFailure
8897
case cmd of
8998
JCImport
9099
| msgsFileExists && msgsDirExists -> exitConfigureMsgStorage
@@ -99,6 +108,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
99108
("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir)
100109
"Messages not imported"
101110
ms <- newJournalMsgStore
111+
readQueueStore storeLogFile ms
102112
msgStats <- importMessages True ms storeMsgsFilePath Nothing -- no expiration
103113
putStrLn "Import completed"
104114
printMessageStats "Messages" msgStats
@@ -116,6 +126,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
116126
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
117127
"Journal not exported"
118128
ms <- newJournalMsgStore
129+
readQueueStore storeLogFile ms
119130
exportMessages True ms storeMsgsFilePath False
120131
putStrLn "Export completed"
121132
putStrLn $ case readMsgStoreType ini of

0 commit comments

Comments
 (0)