@@ -71,6 +71,7 @@ import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
7171import Data.Semigroup (Sum (.. ))
7272import qualified Data.Text as T
7373import Data.Text.Encoding (decodeLatin1 )
74+ import qualified Data.Text.IO as T
7475import Data.Time.Clock (UTCTime (.. ), diffTimeToPicoseconds , getCurrentTime )
7576import Data.Time.Clock.System (SystemTime (.. ), getSystemTime )
7677import Data.Time.Format.ISO8601 (iso8601Show )
@@ -98,6 +99,7 @@ import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue, closeMsgQueue)
9899import Simplex.Messaging.Server.MsgStore.STM
99100import Simplex.Messaging.Server.MsgStore.Types
100101import Simplex.Messaging.Server.NtfStore
102+ import Simplex.Messaging.Server.Prometheus
101103import Simplex.Messaging.Server.QueueStore
102104import Simplex.Messaging.Server.QueueStore.QueueInfo
103105import Simplex.Messaging.Server.QueueStore.STM
@@ -176,7 +178,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
176178 : receiveFromProxyAgent pa
177179 : expireNtfsThread cfg
178180 : sigIntHandlerThread
179- : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
181+ : map runServer transports
182+ <> expireMessagesThread_ cfg
183+ <> serverStatsThread_ cfg
184+ <> prometheusMetricsThread_ cfg
185+ <> controlPortThread_ cfg
180186 )
181187 `finally` stopServer s
182188 where
@@ -555,6 +561,50 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
555561 showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
556562 [show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther]
557563
564+ prometheusMetricsThread_ :: ServerConfig -> [M () ]
565+ prometheusMetricsThread_ ServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} =
566+ [savePrometheusMetrics interval prometheusMetricsFile]
567+ prometheusMetricsThread_ _ = []
568+
569+ savePrometheusMetrics :: Int -> FilePath -> M ()
570+ savePrometheusMetrics saveInterval metricsFile = do
571+ labelMyThread " savePrometheusMetrics"
572+ liftIO $ putStrLn $ " Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile
573+ AMS _ st <- asks msgStore
574+ ss <- asks serverStats
575+ env <- ask
576+ let interval = 1000000 * saveInterval
577+ liftIO $ forever $ do
578+ threadDelay interval
579+ ts <- getCurrentTime
580+ sm <- getServerMetrics st ss
581+ rtm <- getRealTimeMetrics env
582+ T. writeFile metricsFile $ prometheusMetrics sm rtm ts
583+
584+ getServerMetrics :: STMQueueStore s => s -> ServerStats -> IO ServerMetrics
585+ getServerMetrics st ss = do
586+ d <- getServerStatsData ss
587+ let ps = periodStatDataCounts $ _activeQueues d
588+ psNtf = periodStatDataCounts $ _activeQueuesNtf d
589+ queueCount <- M. size <$> readTVarIO (activeMsgQueues st)
590+ notifierCount <- M. size <$> readTVarIO (notifiers' st)
591+ pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount}
592+
593+ getRealTimeMetrics :: Env -> IO RealTimeMetrics
594+ getRealTimeMetrics Env {clients, sockets, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do
595+ socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets
596+ #if MIN_VERSION_base(4,18,0)
597+ threadsCount <- length <$> listThreads
598+ #else
599+ let threadsCount = 0
600+ #endif
601+ clientsCount <- IM. size <$> readTVarIO clients
602+ smpSubsCount <- M. size <$> readTVarIO subscribers
603+ smpSubClientsCount <- IM. size <$> readTVarIO subClients
604+ ntfSubsCount <- M. size <$> readTVarIO notifiers
605+ ntfSubClientsCount <- IM. size <$> readTVarIO ntfSubClients
606+ pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount}
607+
558608 runClient :: Transport c => C. APrivateSignKey -> TProxy c -> c -> M ()
559609 runClient signKey tp h = do
560610 kh <- asks serverIdentity
@@ -695,13 +745,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
695745#endif
696746 CPSockets -> withUserRole $ unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSockets
697747 where
698- putSockets (tcpPort, (accepted', closed', active') ) = do
699- (accepted, closed, active) <- (,,) <$> readTVarIO accepted' <*> readTVarIO closed' <*> readTVarIO active'
748+ putSockets (tcpPort, socketsState ) = do
749+ ss <- getSocketStats socketsState
700750 hPutStrLn h $ " Sockets for port " <> tcpPort <> " :"
701- hPutStrLn h $ " accepted: " <> show accepted
702- hPutStrLn h $ " closed: " <> show closed
703- hPutStrLn h $ " active: " <> show (IM. size active )
704- hPutStrLn h $ " leaked: " <> show (accepted - closed - IM. size active )
751+ hPutStrLn h $ " accepted: " <> show (socketsAccepted ss)
752+ hPutStrLn h $ " closed: " <> show (socketsClosed ss)
753+ hPutStrLn h $ " active: " <> show (socketsActive ss )
754+ hPutStrLn h $ " leaked: " <> show (socketsLeaked ss )
705755 CPSocketThreads -> withAdminRole $ do
706756#if MIN_VERSION_base(4,18,0)
707757 unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSocketThreads
0 commit comments