Skip to content
1 change: 1 addition & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ library
Simplex.Messaging.Server.MsgStore.STM
Simplex.Messaging.Server.MsgStore.Types
Simplex.Messaging.Server.NtfStore
Simplex.Messaging.Server.Prometheus
Simplex.Messaging.Server.QueueStore
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.Stats
Expand Down
64 changes: 57 additions & 7 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import Data.Semigroup (Sum (..))
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import qualified Data.Text.IO as T
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
Expand Down Expand Up @@ -98,6 +99,7 @@
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
import Simplex.Messaging.Server.Prometheus
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.QueueStore.STM
Expand Down Expand Up @@ -176,7 +178,11 @@
: receiveFromProxyAgent pa
: expireNtfsThread cfg
: sigIntHandlerThread
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
: map runServer transports
<> expireMessagesThread_ cfg
<> serverStatsThread_ cfg
<> prometheusMetricsThread_ cfg
<> controlPortThread_ cfg
)
`finally` stopServer s
where
Expand Down Expand Up @@ -555,6 +561,50 @@
showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
[show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther]

prometheusMetricsThread_ :: ServerConfig -> [M ()]
prometheusMetricsThread_ ServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} =
[savePrometheusMetrics interval prometheusMetricsFile]
prometheusMetricsThread_ _ = []

savePrometheusMetrics :: Int -> FilePath -> M ()
savePrometheusMetrics saveInterval metricsFile = do
labelMyThread "savePrometheusMetrics"
liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile
AMS _ st <- asks msgStore
ss <- asks serverStats
env <- ask
let interval = 1000000 * saveInterval
liftIO $ forever $ do
threadDelay interval
ts <- getCurrentTime
sm <- getServerMetrics st ss
rtm <- getRealTimeMetrics env
T.writeFile metricsFile $ prometheusMetrics sm rtm ts

getServerMetrics :: STMQueueStore s => s -> ServerStats -> IO ServerMetrics
getServerMetrics st ss = do
d <- getServerStatsData ss
let ps = periodStatDataCounts $ _activeQueues d
psNtf = periodStatDataCounts $ _activeQueuesNtf d
queueCount <- M.size <$> readTVarIO (activeMsgQueues st)
notifierCount <- M.size <$> readTVarIO (notifiers' st)
pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount}

getRealTimeMetrics :: Env -> IO RealTimeMetrics
getRealTimeMetrics Env {clients, sockets, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do
socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets
#if MIN_VERSION_base(4,18,0)
threadsCount <- length <$> listThreads
#else
let threadsCount = 0
#endif
clientsCount <- IM.size <$> readTVarIO clients
smpSubsCount <- M.size <$> readTVarIO subscribers
smpSubClientsCount <- IM.size <$> readTVarIO subClients
ntfSubsCount <- M.size <$> readTVarIO notifiers
ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount}

runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
runClient signKey tp h = do
kh <- asks serverIdentity
Expand Down Expand Up @@ -695,13 +745,13 @@
#endif
CPSockets -> withUserRole $ unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSockets
where
putSockets (tcpPort, (accepted', closed', active')) = do
(accepted, closed, active) <- (,,) <$> readTVarIO accepted' <*> readTVarIO closed' <*> readTVarIO active'
putSockets (tcpPort, socketsState) = do
ss <- getSocketStats socketsState
hPutStrLn h $ "Sockets for port " <> tcpPort <> ":"
hPutStrLn h $ "accepted: " <> show accepted
hPutStrLn h $ "closed: " <> show closed
hPutStrLn h $ "active: " <> show (IM.size active)
hPutStrLn h $ "leaked: " <> show (accepted - closed - IM.size active)
hPutStrLn h $ "accepted: " <> show (socketsAccepted ss)
hPutStrLn h $ "closed: " <> show (socketsClosed ss)
hPutStrLn h $ "active: " <> show (socketsActive ss)
hPutStrLn h $ "leaked: " <> show (socketsLeaked ss)
CPSocketThreads -> withAdminRole $ do
#if MIN_VERSION_base(4,18,0)
unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSocketThreads
Expand Down Expand Up @@ -1045,7 +1095,7 @@

-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
dummySignKey :: C.SignatureAlgorithm a => C.SAlgorithm a -> C.PublicKey a

Check warning on line 1098 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a

Check warning on line 1098 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a
dummySignKey = \case
C.SEd25519 -> dummyKeyEd25519
C.SEd448 -> dummyKeyEd448
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ data ServerConfig = ServerConfig
serverStatsLogFile :: FilePath,
-- | file to save and restore stats
serverStatsBackupFile :: Maybe FilePath,
-- | interval and file to save prometheus metrics
prometheusInterval :: Maybe Int,
prometheusMetricsFile :: FilePath,
-- | notification delivery interval
ntfDeliveryInterval :: Int,
-- | interval between sending pending END events to unsubscribed clients, seconds
Expand Down
6 changes: 5 additions & 1 deletion src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 12 in src/Simplex/Messaging/Server/Main.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

module Simplex.Messaging.Server.Main where

Expand Down Expand Up @@ -253,7 +253,9 @@
<> ("expire_ntfs_hours: " <> tshow defNtfExpirationHours <> "\n\n")
<> "# Log daily server statistics to CSV file\n"
<> ("log_stats: " <> onOff logStats <> "\n\n")
<> "[AUTH]\n\
<> "# Log interval for real-time Prometheus metrics\n\
\# prometheus_interval: 300\n\n\
\[AUTH]\n\
\# Set new_queues option to off to completely prohibit creating new messaging queues.\n\
\# This can be useful when you want to decommission the server, but not all connections are switched yet.\n\
\new_queues: on\n\n\
Expand Down Expand Up @@ -431,6 +433,8 @@
logStatsStartTime = 0, -- seconds from 00:00 UTC
serverStatsLogFile = combine logPath "smp-server-stats.daily.log",
serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log",
prometheusInterval = eitherToMaybe $ read . T.unpack <$> lookupValue "STORE_LOG" "prometheus_interval" ini,
prometheusMetricsFile = combine logPath "smp-server-metrics.txt",
pendingENDInterval = 15000000, -- 15 seconds
ntfDeliveryInterval = 3000000, -- 3 seconds
smpServerVRange = supportedServerSMPRelayVRange,
Expand Down
Loading
Loading