Skip to content

Commit f871f20

Browse files
authored
smp server: fix notification delivery (#1350)
* .401 * stats for undelivered notifications * logs, stats * control port show ntf client IDs * check that Ntf client is still current and that queue is not full, drop notifications otherwise * prevent losing notifications when client is not current or queue full * add log when no notifications, remove some logs * reduce STM transaction * revert version change
1 parent 80d3518 commit f871f20

File tree

4 files changed

+91
-50
lines changed

4 files changed

+91
-50
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ module Simplex.Messaging.Server
3838
)
3939
where
4040

41+
import Control.Concurrent.STM (throwSTM)
4142
import Control.Concurrent.STM.TQueue (flushTQueue)
4243
import Control.Logger.Simple
4344
import Control.Monad
@@ -237,27 +238,46 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
237238
deliverNtfsThread :: Server -> M ()
238239
deliverNtfsThread Server {ntfSubClients} = do
239240
ntfInt <- asks $ ntfDeliveryInterval . config
240-
ns <- asks ntfStore
241+
NtfStore ns <- asks ntfStore
241242
stats <- asks serverStats
242243
liftIO $ forever $ do
243244
threadDelay ntfInt
244245
readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats)
245246
where
246-
deliverNtfs ns stats Client {clientId, ntfSubscriptions, sndQ, connected} = whenM currentClient $
247-
readTVarIO ntfSubscriptions >>= \subs -> do
248-
ts_ <- foldM addNtfs [] (M.keys subs)
249-
mapM_ (atomically . writeTBQueue sndQ) $ L.nonEmpty ts_
250-
updateNtfStats $ length ts_
247+
deliverNtfs ns stats Client {clientId, ntfSubscriptions, sndQ, connected} =
248+
whenM (currentClient readTVarIO) $ do
249+
subs <- readTVarIO ntfSubscriptions
250+
logDebug $ "NOTIFICATIONS: client #" <> tshow clientId <> " is current with " <> tshow (M.size subs) <> " subs"
251+
ntfQs <- M.assocs . M.filterWithKey (\nId _ -> M.member nId subs) <$> readTVarIO ns
252+
tryAny (atomically $ flushSubscribedNtfs ntfQs) >>= \case
253+
Right len -> updateNtfStats len
254+
Left e -> logDebug $ "NOTIFICATIONS: cancelled for client #" <> tshow clientId <> ", reason: " <> tshow e
251255
where
252-
currentClient = (&&) <$> readTVarIO connected <*> (IM.member clientId <$> readTVarIO ntfSubClients)
253-
addNtfs :: [Transmission BrokerMsg] -> NotifierId -> IO [Transmission BrokerMsg]
254-
addNtfs acc nId =
255-
(foldl' (\acc' ntf -> nmsg nId ntf : acc') acc) -- reverses, to order by time
256-
<$> flushNtfs ns nId
256+
flushSubscribedNtfs :: [(NotifierId, TVar [MsgNtf])] -> STM Int
257+
flushSubscribedNtfs ntfQs = do
258+
ts_ <- foldM addNtfs [] ntfQs
259+
forM_ (L.nonEmpty ts_) $ \ts -> do
260+
let cancelNtfs s = throwSTM $ userError $ s <> ", " <> show (length ts_) <> " ntfs kept"
261+
unlessM (currentClient readTVar) $ cancelNtfs "not current client"
262+
whenM (isFullTBQueue sndQ) $ cancelNtfs "sending queue full"
263+
writeTBQueue sndQ ts
264+
pure $ length ts_
265+
currentClient :: Monad m => (forall a. TVar a -> m a) -> m Bool
266+
currentClient rd = (&&) <$> rd connected <*> (IM.member clientId <$> rd ntfSubClients)
267+
addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg]
268+
addNtfs acc (nId, v) =
269+
readTVar v >>= \case
270+
[] -> pure acc
271+
ntfs -> do
272+
writeTVar v []
273+
pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time
257274
nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta)
258-
updateNtfStats len = when (len > 0) $ liftIO $ do
275+
updateNtfStats 0 = logDebug $ "NOTIFICATIONS: no ntfs for client #" <> tshow clientId
276+
updateNtfStats len = liftIO $ do
277+
atomicModifyIORef'_ (ntfCount stats) (subtract len)
259278
atomicModifyIORef'_ (msgNtfs stats) (+ len)
260279
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
280+
logDebug $ "NOTIFICATIONS: delivered to client #" <> tshow clientId <> " " <> tshow len <> " ntfs"
261281

262282
sendPendingEvtsThread :: Server -> M ()
263283
sendPendingEvtsThread s = do
@@ -334,7 +354,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
334354
threadDelay' interval
335355
old <- expireBeforeEpoch expCfg
336356
expired <- deleteExpiredNtfs ns old
337-
when (expired > 0) $ atomicModifyIORef'_ (msgNtfExpired stats) (+ expired)
357+
when (expired > 0) $ do
358+
atomicModifyIORef'_ (msgNtfExpired stats) (+ expired)
359+
atomicModifyIORef'_ (ntfCount stats) (subtract expired)
338360

339361
serverStatsThread_ :: ServerConfig -> [M ()]
340362
serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} =
@@ -347,7 +369,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
347369
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
348370
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
349371
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
350-
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv}
372+
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv}
351373
<- asks serverStats
352374
QueueStore {queues, notifiers} <- asks queueStore
353375
let interval = 1000000 * logInterval
@@ -404,8 +426,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
404426
pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0
405427
qCount' <- readIORef qCount
406428
qCount'' <- M.size <$> readTVarIO queues
407-
ntfCount' <- M.size <$> readTVarIO notifiers
429+
notifierCount' <- M.size <$> readTVarIO notifiers
408430
msgCount' <- readIORef msgCount
431+
ntfCount' <- readIORef ntfCount
409432
hPutStrLn h $
410433
intercalate
411434
","
@@ -462,15 +485,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
462485
show ntfSub',
463486
show ntfSubAuth',
464487
show ntfSubDuplicate',
465-
show ntfCount',
488+
show notifierCount',
466489
show qDeletedAllB',
467490
show qSubAllB',
468491
show qSubEnd',
469492
show qSubEndB',
470493
show ntfDeletedB',
471494
show ntfSubB',
472495
show msgNtfsB',
473-
show msgNtfExpired'
496+
show msgNtfExpired',
497+
show ntfCount'
474498
]
475499
)
476500
liftIO $ threadDelay' interval
@@ -547,6 +571,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
547571
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
548572
CPStats -> withUserRole $ do
549573
ss <- unliftIO u $ asks serverStats
574+
QueueStore {queues, notifiers} <- unliftIO u $ asks queueStore
550575
let getStat :: (ServerStats -> IORef a) -> IO a
551576
getStat var = readIORef (var ss)
552577
putStat :: Show a => String -> (ServerStats -> IORef a) -> IO ()
@@ -584,7 +609,18 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
584609
putStat "msgNtfsB" msgNtfsB
585610
putStat "msgNtfExpired" msgNtfExpired
586611
putStat "qCount" qCount
612+
qCount2 <- M.size <$> readTVarIO queues
613+
hPutStrLn h $ "qCount 2: " <> show qCount2
614+
notifierCount <- M.size <$> readTVarIO notifiers
615+
hPutStrLn h $ "notifiers: " <> show notifierCount
587616
putStat "msgCount" msgCount
617+
putStat "ntfCount" ntfCount
618+
readTVarIO role >>= \case
619+
CPRAdmin -> do
620+
NtfStore ns <- unliftIO u $ asks ntfStore
621+
ntfCount2 <- liftIO . foldM (\(!n) q -> (n +) . length <$> readTVarIO q) 0 =<< readTVarIO ns
622+
hPutStrLn h $ "ntfCount 2: " <> show ntfCount2
623+
_ -> pure ()
588624
putProxyStat "pRelays" pRelays
589625
putProxyStat "pRelaysOwn" pRelaysOwn
590626
putProxyStat "pMsgFwds" pMsgFwds
@@ -650,24 +686,24 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
650686
hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt
651687
hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt
652688
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
653-
putActiveClientsInfo "SMP" subscribers
654-
putActiveClientsInfo "Ntf" notifiers
655-
putSubscribedClients "SMP" subClients
656-
putSubscribedClients "Ntf" ntfSubClients
689+
putActiveClientsInfo "SMP" subscribers False
690+
putActiveClientsInfo "Ntf" notifiers True
691+
putSubscribedClients "SMP" subClients False
692+
putSubscribedClients "Ntf" ntfSubClients True
657693
where
658-
putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> IO ()
659-
putActiveClientsInfo protoName clients = do
694+
putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> Bool -> IO ()
695+
putActiveClientsInfo protoName clients showIds = do
660696
activeSubs <- readTVarIO clients
661697
hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs)
662-
clCnt <- IS.size <$> countSubClients activeSubs
663-
hPutStrLn h $ protoName <> " subscribed clients: " <> show clCnt
698+
clnts <- countSubClients activeSubs
699+
hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "")
664700
where
665701
countSubClients :: M.Map QueueId (TVar Client) -> IO IS.IntSet
666702
countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId <$> readTVarIO c) IS.empty
667-
putSubscribedClients :: String -> TVar (IM.IntMap Client) -> IO ()
668-
putSubscribedClients protoName subClnts = do
703+
putSubscribedClients :: String -> TVar (IM.IntMap Client) -> Bool -> IO ()
704+
putSubscribedClients protoName subClnts showIds = do
669705
clnts <- readTVarIO subClnts
670-
hPutStrLn h $ protoName <> " subscribed clients count:" <> show (IM.size clnts)
706+
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "")
671707
countClientSubs :: (Client -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe Client) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
672708
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
673709
where
@@ -1184,9 +1220,11 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
11841220
liftIO (deleteQueueNotifier st entId) >>= \case
11851221
Right (Just nId) -> do
11861222
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
1187-
asks ntfStore >>= liftIO . (`deleteNtfs` nId)
1223+
stats <- asks serverStats
1224+
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
1225+
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
11881226
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
1189-
incStat . ntfDeleted =<< asks serverStats
1227+
incStat $ ntfDeleted stats
11901228
pure ok
11911229
Right Nothing -> pure ok
11921230
Left e -> pure $ err e
@@ -1459,6 +1497,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
14591497
ns <- asks ntfStore
14601498
ntf <- mkMessageNotification msgId msgTs rcvNtfDhSecret
14611499
liftIO $ storeNtf ns nId ntf
1500+
incStat . ntfCount =<< asks serverStats
14621501

14631502
mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> M MsgNtf
14641503
mkMessageNotification msgId msgTs rcvNtfDhSecret = do
@@ -1569,7 +1608,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
15691608
forM_ (notifierId <$> notifier q) $ \nId -> do
15701609
-- queue is deleted by a different client from the one subscribed to notifications,
15711610
-- so we don't need to remove subscription from the current client.
1572-
asks ntfStore >>= liftIO . (`deleteNtfs` nId)
1611+
stats <- asks serverStats
1612+
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
1613+
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
15731614
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
15741615
updateDeletedStats q
15751616
pure ok
@@ -1760,7 +1801,9 @@ restoreServerStats expiredMsgs expiredNtfs = asks (serverStatsBackupFile . confi
17601801
s <- asks serverStats
17611802
_qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore
17621803
_msgCount <- liftIO . foldM (\(!n) q -> (n +) <$> getQueueSize q) 0 =<< readTVarIO =<< asks msgStore
1763-
liftIO $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredMsgs, _msgNtfExpired = _msgNtfExpired d + expiredNtfs}
1804+
NtfStore ns <- asks ntfStore
1805+
_ntfCount <- liftIO . foldM (\(!n) q -> (n +) . length <$> readTVarIO q) 0 =<< readTVarIO ns
1806+
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgs, _msgNtfExpired = _msgNtfExpired d + expiredNtfs}
17641807
renameFile f $ f <> ".bak"
17651808
logInfo "server stats restored"
17661809
when (_qCount /= statsQCount) $ logWarn $ "Queue count differs: stats: " <> tshow statsQCount <> ", store: " <> tshow _qCount

src/Simplex/Messaging/Server/NtfStore.hs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,8 @@ storeNtf (NtfStore ns) nId ntf = do
3434
where
3535
newNtfs = TM.lookup nId ns >>= maybe (TM.insertM nId (newTVar [ntf]) ns) (`modifyTVar'` (ntf :))
3636

37-
deleteNtfs :: NtfStore -> NotifierId -> IO ()
38-
deleteNtfs (NtfStore ns) nId = atomically $ TM.delete nId ns
39-
40-
flushNtfs :: NtfStore -> NotifierId -> IO [MsgNtf]
41-
flushNtfs (NtfStore ns) nId = do
42-
TM.lookupIO nId ns >>= maybe (pure []) swapNtfs
43-
where
44-
swapNtfs v =
45-
readTVarIO v >>= \case
46-
[] -> pure []
47-
-- if notifications available, atomically swap with empty array
48-
_ -> atomically (swapTVar v [])
37+
deleteNtfs :: NtfStore -> NotifierId -> IO Int
38+
deleteNtfs (NtfStore ns) nId = atomically (TM.lookupDelete nId ns) >>= maybe (pure 0) (fmap length . readTVarIO)
4939

5040
deleteExpiredNtfs :: NtfStore -> Int64 -> IO Int
5141
deleteExpiredNtfs (NtfStore ns) old =

src/Simplex/Messaging/Server/Stats.hs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ data ServerStats = ServerStats
7777
pMsgFwdsOwn :: ProxyStats,
7878
pMsgFwdsRecv :: IORef Int,
7979
qCount :: IORef Int,
80-
msgCount :: IORef Int
80+
msgCount :: IORef Int,
81+
ntfCount :: IORef Int
8182
}
8283

8384
data ServerStatsData = ServerStatsData
@@ -129,7 +130,8 @@ data ServerStatsData = ServerStatsData
129130
_pMsgFwdsOwn :: ProxyStatsData,
130131
_pMsgFwdsRecv :: Int,
131132
_qCount :: Int,
132-
_msgCount :: Int
133+
_msgCount :: Int,
134+
_ntfCount :: Int
133135
}
134136
deriving (Show)
135137

@@ -184,6 +186,7 @@ newServerStats ts = do
184186
pMsgFwdsRecv <- newIORef 0
185187
qCount <- newIORef 0
186188
msgCount <- newIORef 0
189+
ntfCount <- newIORef 0
187190
pure
188191
ServerStats
189192
{ fromTime,
@@ -234,7 +237,8 @@ newServerStats ts = do
234237
pMsgFwdsOwn,
235238
pMsgFwdsRecv,
236239
qCount,
237-
msgCount
240+
msgCount,
241+
ntfCount
238242
}
239243

240244
getServerStatsData :: ServerStats -> IO ServerStatsData
@@ -288,6 +292,7 @@ getServerStatsData s = do
288292
_pMsgFwdsRecv <- readIORef $ pMsgFwdsRecv s
289293
_qCount <- readIORef $ qCount s
290294
_msgCount <- readIORef $ msgCount s
295+
_ntfCount <- readIORef $ ntfCount s
291296
pure
292297
ServerStatsData
293298
{ _fromTime,
@@ -338,7 +343,8 @@ getServerStatsData s = do
338343
_pMsgFwdsOwn,
339344
_pMsgFwdsRecv,
340345
_qCount,
341-
_msgCount
346+
_msgCount,
347+
_ntfCount
342348
}
343349

344350
-- this function is not thread safe, it is used on server start only
@@ -393,6 +399,7 @@ setServerStats s d = do
393399
writeIORef (pMsgFwdsRecv s) $! _pMsgFwdsRecv d
394400
writeIORef (qCount s) $! _qCount d
395401
writeIORef (msgCount s) $! _msgCount d
402+
writeIORef (ntfCount s) $! _ntfCount d
396403

397404
instance StrEncoding ServerStatsData where
398405
strEncode d =
@@ -566,7 +573,8 @@ instance StrEncoding ServerStatsData where
566573
_pMsgFwdsOwn,
567574
_pMsgFwdsRecv,
568575
_qCount,
569-
_msgCount = 0
576+
_msgCount = 0,
577+
_ntfCount = 0
570578
}
571579
where
572580
opt s = A.string s *> strP <* A.endOfLine <|> pure 0

tests/CoreTests/StoreLogTests.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ storeLogTests =
109109

110110
testSMPStoreLog :: String -> [SMPStoreLogTestCase] -> Spec
111111
testSMPStoreLog testSuite tests =
112-
fdescribe testSuite $ forM_ tests $ \t@SLTC {name, saved} -> it name $ do
112+
describe testSuite $ forM_ tests $ \t@SLTC {name, saved} -> it name $ do
113113
l <- openWriteStoreLog testStoreLogFile
114114
mapM_ (writeStoreLogRecord l) saved
115115
closeStoreLog l

0 commit comments

Comments
 (0)