Skip to content

Commit 46ff37c

Browse files
authored
ntf server: additional statistics (#1558)
* ntf server: additional statistics * version * fix stats * add stats to track notifications without active token * refactor * fix stats parser * version
1 parent 3df2425 commit 46ff37c

File tree

6 files changed

+158
-33
lines changed

6 files changed

+158
-33
lines changed

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ import Simplex.Messaging.Protocol
7474
subscriberParty,
7575
subscriberServiceRole
7676
)
77-
import qualified Simplex.Messaging.Protocol as SMP
7877
import Simplex.Messaging.Session
7978
import Simplex.Messaging.TMap (TMap)
8079
import qualified Simplex.Messaging.TMap as TM

src/Simplex/Messaging/Encoding/String.hs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import qualified Data.X509 as X
4343
import qualified Data.X509.Validation as XV
4444
import Simplex.Messaging.Encoding
4545
import Simplex.Messaging.Parsers (parseAll)
46-
import Simplex.Messaging.Util (bshow, (<$?>))
46+
import Simplex.Messaging.Util (bshow, safeDecodeUtf8, (<$?>))
4747

4848
class TextEncoding a where
4949
textEncode :: a -> Text
@@ -91,6 +91,10 @@ instance StrEncoding String where
9191
strEncode = strEncode . B.pack
9292
strP = B.unpack <$> strP
9393

94+
instance StrEncoding Text where
95+
strEncode = encodeUtf8
96+
strP = safeDecodeUtf8 <$> A.takeTill (\c -> c == ' ' || c == '\n')
97+
9498
instance ToJSON Str where
9599
toJSON (Str s) = strToJSON s
96100
toEncoding (Str s) = strToJEncoding s

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
281281
getSMPServiceSubMetrics a sel subQueueCount = getSubMetrics_ a sel countSubs
282282
where
283283
countSubs :: (NtfSMPSubMetrics, S.Set Text) -> (SMPServer, TVar (Maybe sub)) -> IO (NtfSMPSubMetrics, S.Set Text)
284-
countSubs acc (srv, serviceSubs) = maybe acc (subMetricsResult a acc srv . fromIntegral . subQueueCount) <$> readTVarIO serviceSubs
284+
countSubs acc (srv, serviceSubs) = subMetricsResult a acc srv . fromIntegral . maybe 0 subQueueCount <$> readTVarIO serviceSubs
285285

286286
getSMPSubMetrics :: SMPClientAgent 'Notifier -> (SMPClientAgent 'Notifier -> TMap SMPServer (TMap NotifierId a)) -> IO NtfSMPSubMetrics
287287
getSMPSubMetrics a sel = getSubMetrics_ a sel countSubs
@@ -305,11 +305,11 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
305305
| isOwnServer a srv =
306306
let !ownSrvSubs' = M.alter (Just . maybe cnt (+ cnt)) host ownSrvSubs
307307
metrics' = metrics {ownSrvSubs = ownSrvSubs'} :: NtfSMPSubMetrics
308-
in (metrics', otherSrvs)
308+
in (metrics', otherSrvs)
309309
| cnt == 0 = acc
310310
| otherwise =
311311
let metrics' = metrics {otherSrvSubCount = otherSrvSubCount + cnt} :: NtfSMPSubMetrics
312-
in (metrics', S.insert host otherSrvs)
312+
in (metrics', S.insert host otherSrvs)
313313
where
314314
NtfSMPSubMetrics {ownSrvSubs, otherSrvSubCount} = metrics
315315
host = safeDecodeUtf8 $ strEncode h
@@ -527,7 +527,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
527527
NtfPushServer {pushQ} <- asks pushServer
528528
stats <- asks serverStats
529529
liftIO $ forever $ do
530-
((_, srv, _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
530+
((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
531531
forM ts $ \(ntfId, t) -> case t of
532532
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
533533
STResponse {} -> pure () -- it was already reported as timeout error
@@ -538,9 +538,16 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
538538
ntfTs <- getSystemTime
539539
updatePeriodStats (activeSubs stats) ntfId
540540
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
541-
ntfs_ <- addTokenLastNtf st newNtf
542-
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
543-
incNtfStat_ stats ntfReceived
541+
srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing
542+
addTokenLastNtf st newNtf >>= \case
543+
Right (tkn, lastNtfs) -> do
544+
atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs)
545+
incNtfStat_ stats ntfReceived
546+
mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_
547+
Left AUTH -> do
548+
incNtfStat_ stats ntfReceivedAuth
549+
mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_
550+
Left _ -> pure ()
544551
Right SMP.END ->
545552
whenM (atomically $ activeClientSession' ca sessionId srv) $
546553
void $ updateSrvSubStatus st smpQueue NSEnd
@@ -625,7 +632,7 @@ showServer' = decodeLatin1 . strEncode . host
625632

626633
ntfPush :: NtfPushServer -> M ()
627634
ntfPush s@NtfPushServer {pushQ} = forever $ do
628-
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
635+
(srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
629636
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
630637
st <- asks store
631638
case ntf of
@@ -644,8 +651,14 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
644651
PNMessage {} -> checkActiveTkn tknStatus $ do
645652
stats <- asks serverStats
646653
liftIO $ updatePeriodStats (activeTokens stats) ntfTknId
647-
liftIO (deliverNotification st pp tkn ntf)
648-
>>= incNtfStatT t . (\case Left _ -> ntfFailed; Right () -> ntfDelivered)
654+
liftIO (deliverNotification st pp tkn ntf) >>= \case
655+
Left _ -> do
656+
incNtfStatT t ntfFailed
657+
liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_
658+
Right () -> do
659+
incNtfStatT t ntfDelivered
660+
liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_
661+
649662
where
650663
checkActiveTkn :: NtfTknStatus -> M () -> M ()
651664
checkActiveTkn status action
@@ -686,7 +699,7 @@ periodicNtfsThread NtfPushServer {pushQ} = do
686699
liftIO $ forever $ do
687700
threadDelay interval
688701
now <- systemSeconds <$> getSystemTime
689-
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
702+
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages)
690703
logNote $ "Scheduled periodic notifications: " <> tshow cnt
691704

692705
runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M ()
@@ -794,7 +807,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
794807
ts <- liftIO $ getSystemDate
795808
let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts
796809
withNtfStore (`addNtfToken` tkn) $ \_ -> do
797-
atomically $ writeTBQueue pushQ (tkn, PNVerification regCode)
810+
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode)
798811
incNtfStatT token ntfVrfQueued
799812
incNtfStatT token tknCreated
800813
pure $ NRTknId tknId srvDhPubKey
@@ -810,7 +823,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
810823
| otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification
811824
where
812825
sendVerification = do
813-
atomically $ writeTBQueue pushQ (tkn, PNVerification tknRegCode)
826+
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode)
814827
incNtfStatT token ntfVrfQueued
815828
pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey
816829
TVFY code -- this allows repeated verification for cases when client connection dropped before server response
@@ -828,7 +841,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
828841
regCode <- getRegCode
829842
let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode}
830843
withNtfStore (`replaceNtfToken` tkn') $ \_ -> do
831-
atomically $ writeTBQueue pushQ (tkn', PNVerification regCode)
844+
atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode)
832845
incNtfStatT token ntfVrfQueued
833846
incNtfStatT token tknReplaced
834847
pure NROk

src/Simplex/Messaging/Notifications/Server/Env.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ data SMPSubscriber = SMPSubscriber
146146
}
147147

148148
data NtfPushServer = NtfPushServer
149-
{ pushQ :: TBQueue (NtfTknRec, PushNotification),
149+
{ pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server
150150
pushClients :: TMap PushProvider PushProviderClient,
151151
apnsConfig :: APNSPushClientConfig
152152
}

src/Simplex/Messaging/Notifications/Server/Prometheus.hs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,13 @@ ntfPrometheusMetrics sm rtm ts =
7676
_subCreated,
7777
_subDeleted,
7878
_ntfReceived,
79+
_ntfReceivedAuth,
7980
_ntfDelivered,
8081
_ntfFailed,
82+
_ntfReceivedOwn,
83+
_ntfReceivedAuthOwn,
84+
_ntfDeliveredOwn,
85+
_ntfFailedOwn,
8186
_ntfCronDelivered,
8287
_ntfCronFailed,
8388
_ntfVrfQueued,
@@ -165,6 +170,10 @@ ntfPrometheusMetrics sm rtm ts =
165170
\# TYPE simplex_ntf_notifications_received counter\n\
166171
\simplex_ntf_notifications_received " <> mshow _ntfReceived <> "\n# ntfReceived\n\
167172
\\n\
173+
\# HELP simplex_ntf_notifications_received_auth Received notifications without token or subscription (AUTH error)\n\
174+
\# TYPE simplex_ntf_notifications_received_auth counter\n\
175+
\simplex_ntf_notifications_received_auth " <> mshow _ntfReceivedAuth <> "\n# ntfReceivedAuth\n\
176+
\\n\
168177
\# HELP simplex_ntf_notifications_delivered Delivered notifications\n\
169178
\# TYPE simplex_ntf_notifications_delivered counter\n\
170179
\simplex_ntf_notifications_delivered " <> mshow _ntfDelivered <> "\n# ntfDelivered\n\
@@ -201,6 +210,10 @@ ntfPrometheusMetrics sm rtm ts =
201210
\# TYPE simplex_ntf_notifications_total gauge\n\
202211
\simplex_ntf_notifications_total " <> mshow lastNtfCount <> "\n# lastNtfCount\n\
203212
\\n"
213+
<> showNtfsByServer _ntfReceivedOwn "simplex_ntf_notifications_received_own" "Received notifications" "ntfReceivedOwn"
214+
<> showNtfsByServer _ntfReceivedAuthOwn "simplex_ntf_notifications_received_auth_own" "Received notifications without token or subscription (AUTH error)" "ntfReceivedAuthOwn"
215+
<> showNtfsByServer _ntfDeliveredOwn "simplex_ntf_notifications_delivered_own" "Delivered notifications" "ntfDeliveredOwn"
216+
<> showNtfsByServer _ntfFailedOwn "simplex_ntf_notifications_failed_own" "Failed notifications" "ntfFailedOwn"
204217
info =
205218
"# Info\n\
206219
\# ----\n\
@@ -228,28 +241,41 @@ ntfPrometheusMetrics sm rtm ts =
228241
showOwnSrvSubs <> showOtherSrvSubs
229242
where
230243
showOwnSrvSubs
231-
| M.null ownSrvSubs = showOwn_ "" 0 0
232-
| otherwise = T.concat $ map (\(host, cnt) -> showOwn_ (metricHost host) 1 cnt) $ M.assocs ownSrvSubs
233-
showOwn_ param srvCnt subCnt =
234-
gaugeMetric (mPfx <> "server_count_own") param srvCnt (descrPfx <> " SMP subscriptions, own server count") "ownSrvSubs server"
235-
<> gaugeMetric (mPfx <> "sub_count_own") param subCnt (descrPfx <> " SMP subscriptions count for own servers") "ownSrvSubs count"
244+
| M.null ownSrvSubs = ""
245+
| otherwise =
246+
gaugeMetrics (mPfx <> "server_count_own") srvMetrics (descrPfx <> " SMP subscriptions, own server count") "ownSrvSubs server"
247+
<> gaugeMetrics (mPfx <> "sub_count_own") subMetrics (descrPfx <> " SMP subscriptions count for own servers") "ownSrvSubs count"
248+
where
249+
subs = M.assocs ownSrvSubs
250+
srvMetrics = map (\(host, _) -> (metricHost host, 1)) subs
251+
subMetrics = map (\(host, cnt) -> (metricHost host, cnt)) subs
236252
showOtherSrvSubs =
237-
gaugeMetric (mPfx <> "server_count_other") "" otherServers (descrPfx <> " SMP subscriptions, other server count") "otherServers"
238-
<> gaugeMetric (mPfx <> "sub_count_other") "" otherSrvSubCount (descrPfx <> " SMP subscriptions count for other servers") "otherSrvSubCount"
253+
gaugeMetrics (mPfx <> "server_count_other") [("", otherServers)] (descrPfx <> " SMP subscriptions, other server count") "otherServers"
254+
<> gaugeMetrics (mPfx <> "sub_count_other") [("", otherSrvSubCount)] (descrPfx <> " SMP subscriptions count for other servers") "otherSrvSubCount"
255+
showNtfsByServer (StatsByServerData srvNtfs) mName descr varName
256+
| null srvNtfs = ""
257+
| otherwise =
258+
"# HELP " <> mName <> " " <> descr <> "\n\
259+
\# TYPE " <> mName <> " counter\n"
260+
<> showNtfMetrics
261+
<> "# " <> varName <> "\n\n"
262+
where
263+
showNtfMetrics = T.concat $ map (\(host, value) -> mName <> metricHost host <> " " <> mshow value <> "\n") srvNtfs
239264
showWorkerMetric NtfSMPWorkerMetrics {ownServers, otherServers} mPfx descrPfx =
240265
showOwnServers <> showOtherServers
241266
where
242267
showOwnServers
243-
| null ownServers = showOwn_ "" 0
244-
| otherwise = T.concat $ map (\host -> showOwn_ (metricHost host) 1) ownServers
245-
showOwn_ param cnt = gaugeMetric (mPfx <> "count_own") param cnt (descrPfx <> " count for own servers") "ownServers"
246-
showOtherServers = gaugeMetric (mPfx <> "count_other") "" otherServers (descrPfx <> " count for other servers") "otherServers"
247-
gaugeMetric :: Text -> Text -> Int -> Text -> Text -> Text
248-
gaugeMetric name param value descr codeRef =
268+
| null ownServers = ""
269+
| otherwise = gaugeMetrics (mPfx <> "count_own") subMetrics (descrPfx <> " count for own servers") "ownServers"
270+
where
271+
subMetrics = map (\host -> (metricHost host, 1)) ownServers
272+
showOtherServers = gaugeMetrics (mPfx <> "count_other") [("", otherServers)] (descrPfx <> " count for other servers") "otherServers"
273+
gaugeMetrics :: Text -> [(Text, Int)] -> Text -> Text -> Text
274+
gaugeMetrics name subMetrics descr codeRef =
249275
"# HELP " <> name <> " " <> descr <> "\n\
250-
\# TYPE " <> name <> " gauge\n\
251-
\" <> name <> param <> " " <> mshow value <> "\n# " <> codeRef <> "\n\
252-
\\n"
276+
\# TYPE " <> name <> " gauge\n"
277+
<> T.concat (map (\(param, value) -> name <> param <> " " <> mshow value <> "\n") subMetrics)
278+
<> "# " <> codeRef <> "\n\n"
253279
metricHost host = "{server=\"" <> host <> "\"}"
254280
mstr a = a <> " " <> tsEpoch
255281
mshow :: Show a => a -> Text

0 commit comments

Comments
 (0)