Skip to content

Commit 0739f7b

Browse files
authored
smp server: persist notifications to avoid losing them when ntf server is offline (#1336)
* ntf server: types for storing notifications * fix tests * remove comments * batch NMSGs test * fix test * thread to expire notifications * persist notifications on restart * optimize * refactor * remove reverse * control port stats
1 parent e12710f commit 0739f7b

File tree

15 files changed

+612
-337
lines changed

15 files changed

+612
-337
lines changed

simplexmq.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ library
173173
Simplex.Messaging.Server.Main
174174
Simplex.Messaging.Server.MsgStore
175175
Simplex.Messaging.Server.MsgStore.STM
176+
Simplex.Messaging.Server.NtfStore
176177
Simplex.Messaging.Server.QueueStore
177178
Simplex.Messaging.Server.QueueStore.QueueInfo
178179
Simplex.Messaging.Server.QueueStore.STM

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,8 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
219219
NtfPushServer {pushQ} <- asks pushServer
220220
stats <- asks serverStats
221221
liftIO $ updatePeriodStats (activeSubs stats) ntfId
222-
atomically $
223-
findNtfSubscriptionToken st smpQueue
224-
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage (PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} :| [])))
222+
atomically (findNtfSubscriptionToken st smpQueue)
223+
>>= mapM_ (\tkn -> atomically (writeTBQueue pushQ (tkn, PNMessage (PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} :| []))))
225224
incNtfStat ntfReceived
226225
Right SMP.END ->
227226
whenM (atomically $ activeClientSession' ca sessionId srv) $

src/Simplex/Messaging/Server.hs

Lines changed: 128 additions & 45 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import Simplex.Messaging.Protocol
3838
import Simplex.Messaging.Server.Expiration
3939
import Simplex.Messaging.Server.Information
4040
import Simplex.Messaging.Server.MsgStore.STM
41+
import Simplex.Messaging.Server.NtfStore
4142
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..))
4243
import Simplex.Messaging.Server.QueueStore.STM
4344
import Simplex.Messaging.Server.Stats
@@ -61,6 +62,7 @@ data ServerConfig = ServerConfig
6162
msgIdBytes :: Int,
6263
storeLogFile :: Maybe FilePath,
6364
storeMsgsFile :: Maybe FilePath,
65+
storeNtfsFile :: Maybe FilePath,
6466
-- | set to False to prohibit creating new queues
6567
allowNewQueues :: Bool,
6668
-- | simple password that the clients need to pass in handshake to be able to create new queues
@@ -70,6 +72,8 @@ data ServerConfig = ServerConfig
7072
controlPortAdminAuth :: Maybe BasicAuth,
7173
-- | time after which the messages can be removed from the queues and check interval, seconds
7274
messageExpiration :: Maybe ExpirationConfig,
75+
-- | notification expiration interval (seconds)
76+
notificationExpiration :: ExpirationConfig,
7377
-- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING),
7478
-- and check interval, seconds
7579
inactiveClientExpiration :: Maybe ExpirationConfig,
@@ -82,6 +86,8 @@ data ServerConfig = ServerConfig
8286
serverStatsLogFile :: FilePath,
8387
-- | file to save and restore stats
8488
serverStatsBackupFile :: Maybe FilePath,
89+
-- | notification delivery interval
90+
ntfDeliveryInterval :: Int,
8591
-- | interval between sending pending END events to unsubscribed clients, seconds
8692
pendingENDInterval :: Int,
8793
smpCredentials :: ServerCredentials,
@@ -110,6 +116,16 @@ defaultMessageExpiration =
110116
checkInterval = 43200 -- seconds, 12 hours
111117
}
112118

119+
defNtfExpirationHours :: Int64
120+
defNtfExpirationHours = 24
121+
122+
defaultNtfExpiration :: ExpirationConfig
123+
defaultNtfExpiration =
124+
ExpirationConfig
125+
{ ttl = defNtfExpirationHours * 3600, -- seconds
126+
checkInterval = 3600 -- seconds, 1 hour
127+
}
128+
113129
defaultInactiveClientExpiration :: ExpirationConfig
114130
defaultInactiveClientExpiration =
115131
ExpirationConfig
@@ -127,6 +143,7 @@ data Env = Env
127143
serverIdentity :: KeyHash,
128144
queueStore :: QueueStore,
129145
msgStore :: STMMsgStore,
146+
ntfStore :: NtfStore,
130147
random :: TVar ChaChaDRG,
131148
storeLog :: Maybe (StoreLog 'WriteMode),
132149
tlsServerCreds :: T.Credential,
@@ -229,6 +246,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, smpAg
229246
server <- newServer
230247
queueStore <- newQueueStore
231248
msgStore <- newMsgStore
249+
ntfStore <- NtfStore <$> TM.emptyIO
232250
random <- C.newRandom
233251
storeLog <-
234252
forM storeLogFile $ \f -> do
@@ -244,7 +262,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, smpAg
244262
clientSeq <- newTVarIO 0
245263
clients <- newTVarIO mempty
246264
proxyAgent <- newSMPProxyAgent smpAgentCfg random
247-
pure Env {config, serverInfo, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
265+
pure Env {config, serverInfo, server, serverIdentity, queueStore, msgStore, ntfStore, random, storeLog, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
248266
where
249267
getCredentials protocol creds = do
250268
files <- missingCreds

src/Simplex/Messaging/Server/Main.hs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import Simplex.Messaging.Parsers (parseAll)
3838
import Simplex.Messaging.Protocol (BasicAuth (..), ProtoServerWithAuth (ProtoServerWithAuth), pattern SMPServer)
3939
import Simplex.Messaging.Server (AttachHTTP, runSMPServer)
4040
import Simplex.Messaging.Server.CLI
41-
import Simplex.Messaging.Server.Env.STM (ServerConfig (..), defMsgExpirationDays, defaultInactiveClientExpiration, defaultMessageExpiration, defaultProxyClientConcurrency)
41+
import Simplex.Messaging.Server.Env.STM
4242
import Simplex.Messaging.Server.Expiration
4343
import Simplex.Messaging.Server.Information
4444
import Simplex.Messaging.Transport (batchCmdsSMPVersion, sendingProxySMPVersion, simplexMQVersion, supportedServerSMPRelayVRange)
@@ -154,7 +154,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
154154
<> "# Undelivered messages are optionally saved and restored when the server restarts,\n\
155155
\# they are preserved in the .bak file until the next restart.\n"
156156
<> ("restore_messages: " <> onOff enableStoreLog <> "\n")
157-
<> ("expire_messages_days: " <> tshow defMsgExpirationDays <> "\n\n")
157+
<> ("expire_messages_days: " <> tshow defMsgExpirationDays <> "\n")
158+
<> ("expire_ntfs_hours: " <> tshow defNtfExpirationHours <> "\n\n")
158159
<> "# Log daily server statistics to CSV file\n"
159160
<> ("log_stats: " <> onOff logStats <> "\n\n")
160161
<> "[AUTH]\n\
@@ -268,6 +269,11 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
268269
enableStoreLog = settingIsOn "STORE_LOG" "enable" ini
269270
logStats = settingIsOn "STORE_LOG" "log_stats" ini
270271
c = combine cfgPath . ($ defaultX509Config)
272+
restoreMessagesFile path = case iniOnOff "STORE_LOG" "restore_messages" ini of
273+
Just True -> Just path
274+
Just False -> Nothing
275+
-- if the setting is not set, it is enabled when store log is enabled
276+
_ -> enableStoreLog $> path
271277
transports = iniTransports ini
272278
sharedHTTP = any (\(_, _, addHTTP) -> addHTTP) transports
273279
serverConfig =
@@ -286,13 +292,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
286292
},
287293
httpCredentials = (\WebHttpsParams {key, cert} -> ServerCredentials {caCertificateFile = Nothing, privateKeyFile = key, certificateFile = cert}) <$> webHttpsParams',
288294
storeLogFile = enableStoreLog $> storeLogFilePath,
289-
storeMsgsFile =
290-
let messagesPath = combine logPath "smp-server-messages.log"
291-
in case iniOnOff "STORE_LOG" "restore_messages" ini of
292-
Just True -> Just messagesPath
293-
Just False -> Nothing
294-
-- if the setting is not set, it is enabled when store log is enabled
295-
_ -> enableStoreLog $> messagesPath,
295+
storeMsgsFile = restoreMessagesFile $ combine logPath "smp-server-messages.log",
296+
storeNtfsFile = restoreMessagesFile $ combine logPath "smp-server-ntfs.log",
296297
-- allow creating new queues by default
297298
allowNewQueues = fromMaybe True $ iniOnOff "AUTH" "new_queues" ini,
298299
newQueueBasicAuth = either error id <$!> strDecodeIni "AUTH" "create_password" ini,
@@ -303,6 +304,10 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
303304
defaultMessageExpiration
304305
{ ttl = 86400 * readIniDefault defMsgExpirationDays "STORE_LOG" "expire_messages_days" ini
305306
},
307+
notificationExpiration =
308+
defaultNtfExpiration
309+
{ ttl = 3600 * readIniDefault defNtfExpirationHours "STORE_LOG" "expire_ntfs_hours" ini
310+
},
306311
inactiveClientExpiration =
307312
settingIsOn "INACTIVE_CLIENTS" "disconnect" ini
308313
$> ExpirationConfig
@@ -314,6 +319,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
314319
serverStatsLogFile = combine logPath "smp-server-stats.daily.log",
315320
serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log",
316321
pendingENDInterval = 15000000, -- 15 seconds
322+
ntfDeliveryInterval = 3000000, -- 3 seconds
317323
smpServerVRange = supportedServerSMPRelayVRange,
318324
transportConfig =
319325
defaultTransportServerConfig
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{-# LANGUAGE BangPatterns #-}
2+
{-# LANGUAGE LambdaCase #-}
3+
{-# LANGUAGE NamedFieldPuns #-}
4+
{-# LANGUAGE OverloadedStrings #-}
5+
6+
module Simplex.Messaging.Server.NtfStore where
7+
8+
import Control.Concurrent.STM
9+
import Control.Monad (foldM)
10+
import Data.Int (Int64)
11+
import qualified Data.Map.Strict as M
12+
import Data.Time.Clock.System (SystemTime (..))
13+
import qualified Simplex.Messaging.Crypto as C
14+
import Simplex.Messaging.Encoding.String
15+
import Simplex.Messaging.Protocol (EncNMsgMeta, MsgId, NotifierId)
16+
import Simplex.Messaging.TMap (TMap)
17+
import qualified Simplex.Messaging.TMap as TM
18+
19+
newtype NtfStore = NtfStore (TMap NotifierId (TVar [MsgNtf]))
20+
21+
data MsgNtf = MsgNtf
22+
{ ntfMsgId :: MsgId,
23+
ntfTs :: SystemTime,
24+
ntfNonce :: C.CbNonce,
25+
ntfEncMeta :: EncNMsgMeta
26+
}
27+
28+
storeNtf :: NtfStore -> NotifierId -> MsgNtf -> IO ()
29+
storeNtf (NtfStore ns) nId ntf = do
30+
TM.lookupIO nId ns >>= atomically . maybe newNtfs (`modifyTVar'` (ntf :))
31+
-- TODO coalesce messages here once the client is updated to process multiple messages
32+
-- for single notification.
33+
-- when (isJust prevNtf) $ incStat $ msgNtfReplaced stats
34+
where
35+
newNtfs = TM.lookup nId ns >>= maybe (TM.insertM nId (newTVar [ntf]) ns) (`modifyTVar'` (ntf :))
36+
37+
deleteNtfs :: NtfStore -> NotifierId -> IO ()
38+
deleteNtfs (NtfStore ns) nId = atomically $ TM.delete nId ns
39+
40+
flushNtfs :: NtfStore -> NotifierId -> IO [MsgNtf]
41+
flushNtfs (NtfStore ns) nId = do
42+
TM.lookupIO nId ns >>= maybe (pure []) swapNtfs
43+
where
44+
swapNtfs v =
45+
readTVarIO v >>= \case
46+
[] -> pure []
47+
-- if notifications available, atomically swap with empty array
48+
_ -> atomically (swapTVar v [])
49+
50+
deleteExpiredNtfs :: NtfStore -> Int64 -> IO Int
51+
deleteExpiredNtfs (NtfStore ns) old =
52+
foldM (\expired -> fmap (expired +) . expireQueue) 0 . M.keys =<< readTVarIO ns
53+
where
54+
expireQueue nId = TM.lookupIO nId ns >>= maybe (pure 0) expire
55+
expire v = readTVarIO v >>= \case
56+
[] -> pure 0
57+
_ ->
58+
atomically $ readTVar v >>= \case
59+
[] -> pure 0
60+
-- check the last message first, it is the earliest
61+
ntfs | systemSeconds (ntfTs $ last $ ntfs) < old -> do
62+
let !ntfs' = filter (\MsgNtf {ntfTs = ts} -> systemSeconds ts >= old) ntfs
63+
writeTVar v ntfs'
64+
pure $! length ntfs - length ntfs'
65+
_ -> pure 0
66+
67+
data NtfLogRecord = NLRv1 NotifierId MsgNtf
68+
69+
instance StrEncoding MsgNtf where
70+
strEncode MsgNtf {ntfMsgId, ntfTs, ntfNonce, ntfEncMeta} = strEncode (ntfMsgId, ntfTs, ntfNonce, ntfEncMeta)
71+
strP = do
72+
(ntfMsgId, ntfTs, ntfNonce, ntfEncMeta) <- strP
73+
pure MsgNtf {ntfMsgId, ntfTs, ntfNonce, ntfEncMeta}
74+
75+
instance StrEncoding NtfLogRecord where
76+
strEncode (NLRv1 nId ntf) = strEncode (Str "v1", nId, ntf)
77+
strP = "v1 " *> (NLRv1 <$> strP_ <*> strP)

src/Simplex/Messaging/Server/Stats.hs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ data ServerStats = ServerStats
6767
msgRecvNtf :: IORef Int, -- received messages with NTF flag
6868
activeQueuesNtf :: PeriodStats,
6969
msgNtfs :: IORef Int, -- messages notications delivered to NTF server (<= msgSentNtf)
70+
msgNtfsB :: IORef Int, -- messages notication batches delivered to NTF server
7071
msgNtfNoSub :: IORef Int, -- no subscriber to notifications (e.g., NTF server not connected)
7172
msgNtfLost :: IORef Int, -- notification is lost because NTF delivery queue is full
73+
msgNtfExpired :: IORef Int, -- expired
7274
pRelays :: ProxyStats,
7375
pRelaysOwn :: ProxyStats,
7476
pMsgFwds :: ProxyStats,
@@ -117,8 +119,10 @@ data ServerStatsData = ServerStatsData
117119
_msgRecvNtf :: Int,
118120
_activeQueuesNtf :: PeriodStatsData,
119121
_msgNtfs :: Int,
122+
_msgNtfsB :: Int,
120123
_msgNtfNoSub :: Int,
121124
_msgNtfLost :: Int,
125+
_msgNtfExpired :: Int,
122126
_pRelays :: ProxyStatsData,
123127
_pRelaysOwn :: ProxyStatsData,
124128
_pMsgFwds :: ProxyStatsData,
@@ -169,8 +173,10 @@ newServerStats ts = do
169173
msgRecvNtf <- newIORef 0
170174
activeQueuesNtf <- newPeriodStats
171175
msgNtfs <- newIORef 0
176+
msgNtfsB <- newIORef 0
172177
msgNtfNoSub <- newIORef 0
173178
msgNtfLost <- newIORef 0
179+
msgNtfExpired <- newIORef 0
174180
pRelays <- newProxyStats
175181
pRelaysOwn <- newProxyStats
176182
pMsgFwds <- newProxyStats
@@ -218,8 +224,10 @@ newServerStats ts = do
218224
msgRecvNtf,
219225
activeQueuesNtf,
220226
msgNtfs,
227+
msgNtfsB,
221228
msgNtfNoSub,
222229
msgNtfLost,
230+
msgNtfExpired,
223231
pRelays,
224232
pRelaysOwn,
225233
pMsgFwds,
@@ -269,8 +277,10 @@ getServerStatsData s = do
269277
_msgRecvNtf <- readIORef $ msgRecvNtf s
270278
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
271279
_msgNtfs <- readIORef $ msgNtfs s
280+
_msgNtfsB <- readIORef $ msgNtfsB s
272281
_msgNtfNoSub <- readIORef $ msgNtfNoSub s
273282
_msgNtfLost <- readIORef $ msgNtfLost s
283+
_msgNtfExpired <- readIORef $ msgNtfExpired s
274284
_pRelays <- getProxyStatsData $ pRelays s
275285
_pRelaysOwn <- getProxyStatsData $ pRelaysOwn s
276286
_pMsgFwds <- getProxyStatsData $ pMsgFwds s
@@ -318,8 +328,10 @@ getServerStatsData s = do
318328
_msgRecvNtf,
319329
_activeQueuesNtf,
320330
_msgNtfs,
331+
_msgNtfsB,
321332
_msgNtfNoSub,
322333
_msgNtfLost,
334+
_msgNtfExpired,
323335
_pRelays,
324336
_pRelaysOwn,
325337
_pMsgFwds,
@@ -370,8 +382,10 @@ setServerStats s d = do
370382
writeIORef (msgRecvNtf s) $! _msgRecvNtf d
371383
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
372384
writeIORef (msgNtfs s) $! _msgNtfs d
385+
writeIORef (msgNtfsB s) $! _msgNtfsB d
373386
writeIORef (msgNtfNoSub s) $! _msgNtfNoSub d
374387
writeIORef (msgNtfLost s) $! _msgNtfLost d
388+
writeIORef (msgNtfExpired s) $! _msgNtfExpired d
375389
setProxyStats (pRelays s) $! _pRelays d
376390
setProxyStats (pRelaysOwn s) $! _pRelaysOwn d
377391
setProxyStats (pMsgFwds s) $! _pMsgFwds d
@@ -420,8 +434,10 @@ instance StrEncoding ServerStatsData where
420434
"msgSentNtf=" <> strEncode (_msgSentNtf d),
421435
"msgRecvNtf=" <> strEncode (_msgRecvNtf d),
422436
"msgNtfs=" <> strEncode (_msgNtfs d),
437+
"msgNtfsB=" <> strEncode (_msgNtfsB d),
423438
"msgNtfNoSub=" <> strEncode (_msgNtfNoSub d),
424439
"msgNtfLost=" <> strEncode (_msgNtfLost d),
440+
"msgNtfExpired=" <> strEncode (_msgNtfExpired d),
425441
"activeQueues:",
426442
strEncode (_activeQueues d),
427443
"activeQueuesNtf:",
@@ -475,8 +491,10 @@ instance StrEncoding ServerStatsData where
475491
_msgSentNtf <- opt "msgSentNtf="
476492
_msgRecvNtf <- opt "msgRecvNtf="
477493
_msgNtfs <- opt "msgNtfs="
494+
_msgNtfsB <- opt "msgNtfsB="
478495
_msgNtfNoSub <- opt "msgNtfNoSub="
479496
_msgNtfLost <- opt "msgNtfLost="
497+
_msgNtfExpired <- opt "msgNtfExpired="
480498
_activeQueues <-
481499
optional ("activeQueues:" <* A.endOfLine) >>= \case
482500
Just _ -> strP <* optional A.endOfLine
@@ -536,8 +554,10 @@ instance StrEncoding ServerStatsData where
536554
_msgSentNtf,
537555
_msgRecvNtf,
538556
_msgNtfs,
557+
_msgNtfsB,
539558
_msgNtfNoSub,
540559
_msgNtfLost,
560+
_msgNtfExpired,
541561
_activeQueues,
542562
_activeQueuesNtf,
543563
_pRelays,

src/Simplex/Messaging/TMap.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ module Simplex.Messaging.TMap
99
member,
1010
memberIO,
1111
insert,
12+
insertM,
1213
delete,
1314
lookupInsert,
1415
lookupDelete,
@@ -62,6 +63,10 @@ insert :: Ord k => k -> a -> TMap k a -> STM ()
6263
insert k v m = modifyTVar' m $ M.insert k v
6364
{-# INLINE insert #-}
6465

66+
insertM :: Ord k => k -> STM a -> TMap k a -> STM ()
67+
insertM k f m = modifyTVar' m . M.insert k =<< f
68+
{-# INLINE insertM #-}
69+
6570
delete :: Ord k => k -> TMap k a -> STM ()
6671
delete k m = modifyTVar' m $ M.delete k
6772
{-# INLINE delete #-}

0 commit comments

Comments
 (0)