Skip to content

Commit f024ab1

Browse files
authored
ntf server: prometheus metrics (#1527)
* ntf server: save prometheus stats * info metrics * fix test
1 parent f4bc1f0 commit f024ab1

File tree

10 files changed

+421
-62
lines changed

10 files changed

+421
-62
lines changed

simplexmq.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ library
252252
Simplex.Messaging.Notifications.Server.Control
253253
Simplex.Messaging.Notifications.Server.Env
254254
Simplex.Messaging.Notifications.Server.Main
255+
Simplex.Messaging.Notifications.Server.Prometheus
255256
Simplex.Messaging.Notifications.Server.Push.APNS
256257
Simplex.Messaging.Notifications.Server.Push.APNS.Internal
257258
Simplex.Messaging.Notifications.Server.Stats

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 128 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
{-# LANGUAGE PatternSynonyms #-}
1313
{-# LANGUAGE ScopedTypeVariables #-}
1414
{-# LANGUAGE TupleSections #-}
15+
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
1516

1617
module Simplex.Messaging.Notifications.Server where
1718

19+
import Control.Concurrent (threadDelay)
1820
import Control.Logger.Simple
1921
import Control.Monad
2022
import Control.Monad.Except
@@ -27,13 +29,15 @@ import Data.Functor (($>))
2729
import Data.IORef
2830
import Data.Int (Int64)
2931
import qualified Data.IntSet as IS
30-
import Data.List (intercalate, partition, sort)
32+
import Data.List (foldl', intercalate)
3133
import Data.List.NonEmpty (NonEmpty (..))
3234
import qualified Data.List.NonEmpty as L
3335
import qualified Data.Map.Strict as M
3436
import Data.Maybe (mapMaybe)
3537
import qualified Data.Set as S
38+
import Data.Text (Text)
3639
import qualified Data.Text as T
40+
import qualified Data.Text.IO as T
3741
import Data.Text.Encoding (decodeLatin1)
3842
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
3943
import Data.Time.Clock.System (getSystemTime)
@@ -48,6 +52,7 @@ import Simplex.Messaging.Encoding.String
4852
import Simplex.Messaging.Notifications.Protocol
4953
import Simplex.Messaging.Notifications.Server.Control
5054
import Simplex.Messaging.Notifications.Server.Env
55+
import Simplex.Messaging.Notifications.Server.Prometheus
5156
import Simplex.Messaging.Notifications.Server.Push.APNS (PushNotification (..), PushProviderError (..))
5257
import Simplex.Messaging.Notifications.Server.Stats
5358
import Simplex.Messaging.Notifications.Server.Store (NtfSTMStore, TokenNtfMessageRecord (..), stmStoreTokenLastNtf)
@@ -60,13 +65,14 @@ import Simplex.Messaging.Server
6065
import Simplex.Messaging.Server.Control (CPClientRole (..))
6166
import Simplex.Messaging.Server.Env.STM (StartOptions (..))
6267
import Simplex.Messaging.Server.QueueStore (getSystemDate)
63-
import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, updatePeriodStats)
68+
import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, periodStatDataCounts, updatePeriodStats)
6469
import Simplex.Messaging.TMap (TMap)
6570
import qualified Simplex.Messaging.TMap as TM
6671
import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams)
6772
import Simplex.Messaging.Transport.Buffer (trimCR)
6873
import Simplex.Messaging.Transport.Server (AddHTTP, runTransportServer, runLocalTCPServer)
6974
import Simplex.Messaging.Util
75+
import System.Environment (lookupEnv)
7076
import System.Exit (exitFailure, exitSuccess)
7177
import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode)
7278
import System.Mem.Weak (deRefWeak)
@@ -99,7 +105,15 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
99105
stopServer
100106
liftIO $ exitSuccess
101107
resubscribe s
102-
raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports <> serverStatsThread_ cfg <> controlPortThread_ cfg) `finally` stopServer
108+
raceAny_
109+
( ntfSubscriber s
110+
: ntfPush ps
111+
: map runServer transports
112+
<> serverStatsThread_ cfg
113+
<> prometheusMetricsThread_ cfg
114+
<> controlPortThread_ cfg
115+
)
116+
`finally` stopServer
103117
where
104118
runServer :: (ServiceName, ATransport, AddHTTP) -> M ()
105119
runServer (tcpPort, ATransport t, _addHTTP) = do
@@ -193,6 +207,90 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
193207
]
194208
liftIO $ threadDelay' interval
195209

210+
prometheusMetricsThread_ :: NtfServerConfig -> [M ()]
211+
prometheusMetricsThread_ NtfServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} =
212+
[savePrometheusMetrics interval prometheusMetricsFile]
213+
prometheusMetricsThread_ _ = []
214+
215+
savePrometheusMetrics :: Int -> FilePath -> M ()
216+
savePrometheusMetrics saveInterval metricsFile = do
217+
labelMyThread "savePrometheusMetrics"
218+
liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile
219+
st <- asks store
220+
ss <- asks serverStats
221+
env <- ask
222+
rtsOpts <- liftIO $ maybe ("set " <> rtsOptionsEnv) T.pack <$> lookupEnv (T.unpack rtsOptionsEnv)
223+
let interval = 1000000 * saveInterval
224+
liftIO $ forever $ do
225+
threadDelay interval
226+
ts <- getCurrentTime
227+
sm <- getNtfServerMetrics st ss rtsOpts
228+
rtm <- getNtfRealTimeMetrics env
229+
T.writeFile metricsFile $ ntfPrometheusMetrics sm rtm ts
230+
231+
getNtfServerMetrics :: NtfPostgresStore -> NtfServerStats -> Text -> IO NtfServerMetrics
232+
getNtfServerMetrics st ss rtsOptions = do
233+
d <- getNtfServerStatsData ss
234+
let psTkns = periodStatDataCounts $ _activeTokens d
235+
psSubs = periodStatDataCounts $ _activeSubs d
236+
(tokenCount, approxSubCount, lastNtfCount) <- getEntityCounts st
237+
pure NtfServerMetrics {statsData = d, activeTokensCounts = psTkns, activeSubsCounts = psSubs, tokenCount, approxSubCount, lastNtfCount, rtsOptions}
238+
239+
getNtfRealTimeMetrics :: NtfEnv -> IO NtfRealTimeMetrics
240+
getNtfRealTimeMetrics NtfEnv {subscriber, pushServer} = do
241+
#if MIN_VERSION_base(4,18,0)
242+
threadsCount <- length <$> listThreads
243+
#else
244+
let threadsCount = 0
245+
#endif
246+
let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber
247+
NtfPushServer {pushQ} = pushServer
248+
SMPClientAgent {smpClients, smpSessions, srvSubs, pendingSrvSubs, smpSubWorkers} = a
249+
srvSubscribers <- getSMPWorkerMetrics a smpSubscribers
250+
srvClients <- getSMPWorkerMetrics a smpClients
251+
srvSubWorkers <- getSMPWorkerMetrics a smpSubWorkers
252+
ntfActiveSubs <- getSMPSubMetrics a srvSubs
253+
ntfPendingSubs <- getSMPSubMetrics a pendingSrvSubs
254+
smpSessionCount <- M.size <$> readTVarIO smpSessions
255+
apnsPushQLength <- fromIntegral <$> atomically (lengthTBQueue pushQ)
256+
pure NtfRealTimeMetrics {threadsCount, srvSubscribers, srvClients, srvSubWorkers, ntfActiveSubs, ntfPendingSubs, smpSessionCount, apnsPushQLength}
257+
where
258+
getSMPSubMetrics :: SMPClientAgent -> TMap SMPServer (TMap SMPSub a) -> IO NtfSMPSubMetrics
259+
getSMPSubMetrics a v = do
260+
subs <- readTVarIO v
261+
let metrics = NtfSMPSubMetrics {ownSrvSubs = M.empty, otherServers = 0, otherSrvSubCount = 0}
262+
(metrics', otherSrvs) <- foldM countSubs (metrics, S.empty) $ M.assocs subs
263+
pure (metrics' :: NtfSMPSubMetrics) {otherServers = S.size otherSrvs}
264+
where
265+
countSubs :: (NtfSMPSubMetrics, S.Set Text) -> (SMPServer, TMap SMPSub a) -> IO (NtfSMPSubMetrics, S.Set Text)
266+
countSubs acc@(metrics, !otherSrvs) (srv@(SMPServer (h :| _) _ _), srvSubs) =
267+
result . M.size <$> readTVarIO srvSubs
268+
where
269+
result 0 = acc
270+
result cnt
271+
| isOwnServer a srv =
272+
let !ownSrvSubs' = M.alter (Just . maybe cnt (+ cnt)) host ownSrvSubs
273+
metrics' = metrics {ownSrvSubs = ownSrvSubs'} :: NtfSMPSubMetrics
274+
in (metrics', otherSrvs)
275+
| otherwise =
276+
let metrics' = metrics {otherSrvSubCount = otherSrvSubCount + cnt} :: NtfSMPSubMetrics
277+
in (metrics', S.insert host otherSrvs)
278+
NtfSMPSubMetrics {ownSrvSubs, otherSrvSubCount} = metrics
279+
host = safeDecodeUtf8 $ strEncode h
280+
281+
getSMPWorkerMetrics :: SMPClientAgent -> TMap SMPServer a -> IO NtfSMPWorkerMetrics
282+
getSMPWorkerMetrics a v = workerMetrics a . M.keys <$> readTVarIO v
283+
workerMetrics :: SMPClientAgent -> [SMPServer] -> NtfSMPWorkerMetrics
284+
workerMetrics a srvs = NtfSMPWorkerMetrics {ownServers = reverse ownSrvs, otherServers}
285+
where
286+
(ownSrvs, otherServers) = foldl' countSrv ([], 0) srvs
287+
countSrv (!own, !other) srv@(SMPServer (h :| _) _ _)
288+
| isOwnServer a srv = (host : own, other)
289+
| otherwise = (own, other + 1)
290+
where
291+
host = safeDecodeUtf8 $ strEncode h
292+
293+
196294
controlPortThread_ :: NtfServerConfig -> [M ()]
197295
controlPortThread_ NtfServerConfig {controlPort = Just port} = [runCPServer port]
198296
controlPortThread_ _ = []
@@ -266,59 +364,38 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
266364
logError "Unauthorized control port command"
267365
hPutStrLn h "AUTH"
268366
r -> do
367+
NtfRealTimeMetrics {threadsCount, srvSubscribers, srvClients, srvSubWorkers, ntfActiveSubs, ntfPendingSubs, smpSessionCount, apnsPushQLength} <-
368+
getNtfRealTimeMetrics =<< unliftIO u ask
269369
#if MIN_VERSION_base(4,18,0)
270-
threads <- liftIO listThreads
271-
hPutStrLn h $ "Threads: " <> show (length threads)
370+
hPutStrLn h $ "Threads: " <> show threadsCount
272371
#else
273372
hPutStrLn h "Threads: not available on GHC 8.10"
274373
#endif
275-
NtfEnv {subscriber, pushServer} <- unliftIO u ask
276-
let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber
277-
NtfPushServer {pushQ} = pushServer
278-
SMPClientAgent {smpClients, smpSessions, srvSubs, pendingSrvSubs, smpSubWorkers} = a
279-
putSMPWorkers a "SMP subcscribers" smpSubscribers
280-
putSMPWorkers a "SMP clients" smpClients
281-
putSMPWorkers a "SMP subscription workers" smpSubWorkers
282-
sessions <- readTVarIO smpSessions
283-
hPutStrLn h $ "SMP sessions count: " <> show (M.size sessions)
284-
putSMPSubs a "SMP subscriptions" srvSubs
285-
putSMPSubs a "Pending SMP subscriptions" pendingSrvSubs
286-
sz <- atomically $ lengthTBQueue pushQ
287-
hPutStrLn h $ "Push notifications queue length: " <> show sz
374+
putSMPWorkers "SMP subcscribers" srvSubscribers
375+
putSMPWorkers "SMP clients" srvClients
376+
putSMPWorkers "SMP subscription workers" srvSubWorkers
377+
hPutStrLn h $ "SMP sessions count: " <> show smpSessionCount
378+
putSMPSubs "SMP subscriptions" ntfActiveSubs
379+
putSMPSubs "Pending SMP subscriptions" ntfPendingSubs
380+
hPutStrLn h $ "Push notifications queue length: " <> show apnsPushQLength
288381
where
289-
putSMPSubs :: SMPClientAgent -> String -> TMap SMPServer (TMap SMPSub a) -> IO ()
290-
putSMPSubs a name v = do
291-
subs <- readTVarIO v
292-
(totalCnt, ownCount, otherCnt, servers, ownByServer) <- foldM countSubs (0, 0, 0, [], M.empty) $ M.assocs subs
293-
showServers a name servers
294-
hPutStrLn h $ name <> " total: " <> show totalCnt
295-
hPutStrLn h $ name <> " on own servers: " <> show ownCount
296-
when (r == CPRAdmin && not (null ownByServer)) $
297-
forM_ (M.assocs ownByServer) $ \(SMPServer (host :| _) _ _, cnt) ->
298-
hPutStrLn h $ name <> " on " <> B.unpack (strEncode host) <> ": " <> show cnt
299-
hPutStrLn h $ name <> " on other servers: " <> show otherCnt
300-
where
301-
countSubs :: (Int, Int, Int, [SMPServer], M.Map SMPServer Int) -> (SMPServer, TMap SMPSub a) -> IO (Int, Int, Int, [SMPServer], M.Map SMPServer Int)
302-
countSubs (!totalCnt, !ownCount, !otherCnt, !servers, !ownByServer) (srv, srvSubs) = do
303-
cnt <- M.size <$> readTVarIO srvSubs
304-
let totalCnt' = totalCnt + cnt
305-
ownServer = isOwnServer a srv
306-
(ownCount', otherCnt')
307-
| ownServer = (ownCount + cnt, otherCnt)
308-
| otherwise = (ownCount, otherCnt + cnt)
309-
servers' = if cnt > 0 then srv : servers else servers
310-
ownByServer'
311-
| r == CPRAdmin && ownServer && cnt > 0 = M.alter (Just . maybe cnt (+ cnt)) srv ownByServer
312-
| otherwise = ownByServer
313-
pure (totalCnt', ownCount', otherCnt', servers', ownByServer')
314-
putSMPWorkers :: SMPClientAgent -> String -> TMap SMPServer a -> IO ()
315-
putSMPWorkers a name v = readTVarIO v >>= showServers a name . M.keys
316-
showServers :: SMPClientAgent -> String -> [SMPServer] -> IO ()
317-
showServers a name srvs = do
318-
let (ownSrvs, otherSrvs) = partition (isOwnServer a) srvs
319-
hPutStrLn h $ name <> " own servers count: " <> show (length ownSrvs)
320-
when (r == CPRAdmin) $ hPutStrLn h $ name <> " own servers: " <> intercalate "," (sort $ map (\(SMPServer (host :| _) _ _) -> B.unpack $ strEncode host) ownSrvs)
321-
hPutStrLn h $ name <> " other servers count: " <> show (length otherSrvs)
382+
putSMPSubs :: Text -> NtfSMPSubMetrics -> IO ()
383+
putSMPSubs name NtfSMPSubMetrics {ownSrvSubs, otherServers, otherSrvSubCount} = do
384+
showServers name (M.keys ownSrvSubs) otherServers
385+
let ownSrvSubCount = M.foldl' (+) 0 ownSrvSubs
386+
T.hPutStrLn h $ name <> " total: " <> tshow (ownSrvSubCount + otherSrvSubCount)
387+
T.hPutStrLn h $ name <> " on own servers: " <> tshow ownSrvSubCount
388+
when (r == CPRAdmin && not (M.null ownSrvSubs)) $
389+
forM_ (M.assocs ownSrvSubs) $ \(host, cnt) ->
390+
T.hPutStrLn h $ name <> " on " <> host <> ": " <> tshow cnt
391+
T.hPutStrLn h $ name <> " on other servers: " <> tshow otherSrvSubCount
392+
putSMPWorkers :: Text -> NtfSMPWorkerMetrics -> IO ()
393+
putSMPWorkers name NtfSMPWorkerMetrics {ownServers, otherServers} = showServers name ownServers otherServers
394+
showServers :: Text -> [Text] -> Int -> IO ()
395+
showServers name ownServers otherServers = do
396+
T.hPutStrLn h $ name <> " own servers count: " <> tshow (length ownServers)
397+
when (r == CPRAdmin) $ T.hPutStrLn h $ name <> " own servers: " <> T.intercalate "," ownServers
398+
T.hPutStrLn h $ name <> " other servers count: " <> tshow otherServers
322399
CPHelp -> hPutStrLn h "commands: stats, stats-rts, server-info, help, quit"
323400
CPQuit -> pure ()
324401
CPSkip -> pure ()

src/Simplex/Messaging/Notifications/Server/Env.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ data NtfServerConfig = NtfServerConfig
5959
logStatsStartTime :: Int64,
6060
serverStatsLogFile :: FilePath,
6161
serverStatsBackupFile :: Maybe FilePath,
62+
-- | interval and file to save prometheus metrics
63+
prometheusInterval :: Maybe Int,
64+
prometheusMetricsFile :: FilePath,
6265
ntfServerVRange :: VersionRangeNTF,
6366
transportConfig :: TransportServerConfig,
6467
startOptions :: StartOptions

src/Simplex/Messaging/Notifications/Server/Main.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import Simplex.Messaging.Server.StoreLog (closeStoreLog)
4444
import Simplex.Messaging.Transport (ATransport, simplexMQVersion)
4545
import Simplex.Messaging.Transport.Client (TransportHost (..))
4646
import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials (..), TransportServerConfig (..), defaultTransportServerConfig)
47-
import Simplex.Messaging.Util (ifM, tshow)
47+
import Simplex.Messaging.Util (eitherToMaybe, ifM, tshow)
4848
import System.Directory (createDirectoryIfMissing, doesFileExist, renameFile)
4949
import System.Exit (exitFailure)
5050
import System.FilePath (combine)
@@ -267,6 +267,8 @@ ntfServerCLI cfgPath logPath =
267267
logStatsStartTime = 0, -- seconds from 00:00 UTC
268268
serverStatsLogFile = combine logPath "ntf-server-stats.daily.log",
269269
serverStatsBackupFile = logStats $> combine logPath "ntf-server-stats.log",
270+
prometheusInterval = eitherToMaybe $ read . T.unpack <$> lookupValue "STORE_LOG" "prometheus_interval" ini,
271+
prometheusMetricsFile = combine logPath "ntf-server-metrics.txt",
270272
ntfServerVRange = supportedServerNTFVRange,
271273
transportConfig =
272274
defaultTransportServerConfig

0 commit comments

Comments
 (0)