Skip to content

Commit 5dbe633

Browse files
ntf server: additional statistics, new invalid token reasons (#1451)
* agent: check ntf token status on registration * remove check * update on check * refactor * version * fix * ntf server: additional statistics * swap * version * more stats * test, verify invalid * rename * exclude test token from stats * increase delay * handle invalid token in retry, more reasons * focus tests * disable new tests in CI * fix --------- Co-authored-by: spaced4ndy <[email protected]>
1 parent b633f89 commit 5dbe633

File tree

5 files changed

+214
-36
lines changed

5 files changed

+214
-36
lines changed

src/Simplex/Messaging/Notifications/Protocol.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -557,19 +557,21 @@ instance StrEncoding NTInvalidReason where
557557
strEncode = smpEncode
558558
strP = smpP
559559

560-
data NTInvalidReason = NTIRBadToken | NTIRTokenNotForTopic | NTIRGone410
560+
data NTInvalidReason = NTIRBadToken | NTIRTokenNotForTopic | NTIRExpiredToken | NTIRUnregistered
561561
deriving (Eq, Show)
562562

563563
instance Encoding NTInvalidReason where
564564
smpEncode = \case
565565
NTIRBadToken -> "BAD"
566566
NTIRTokenNotForTopic -> "TOPIC"
567-
NTIRGone410 -> "GONE"
567+
NTIRExpiredToken -> "EXPIRED"
568+
NTIRUnregistered -> "UNREGISTERED"
568569
smpP =
569570
A.takeTill (== ' ') >>= \case
570571
"BAD" -> pure NTIRBadToken
571572
"TOPIC" -> pure NTIRTokenNotForTopic
572-
"GONE" -> pure NTIRGone410
573+
"EXPIRED" -> pure NTIRExpiredToken
574+
"UNREGISTERED" -> pure NTIRUnregistered
573575
_ -> fail "bad NTInvalidReason"
574576

575577
instance StrEncoding NtfTknStatus where

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do
136136
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
137137
logInfo $ "server stats log enabled: " <> T.pack statsFilePath
138138
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
139-
NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, subCreated, subDeleted, ntfReceived, ntfDelivered, activeTokens, activeSubs} <- asks serverStats
139+
NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, tknReplaced, subCreated, subDeleted, ntfReceived, ntfDelivered, ntfFailed, ntfCronDelivered, ntfCronFailed, ntfVrfQueued, ntfVrfDelivered, ntfVrfFailed, ntfVrfInvalidTkn, activeTokens, activeSubs} <-
140+
asks serverStats
140141
let interval = 1000000 * logInterval
141142
forever $ do
142143
withFile statsFilePath AppendMode $ \h -> liftIO $ do
@@ -146,10 +147,18 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do
146147
tknCreated' <- atomicSwapIORef tknCreated 0
147148
tknVerified' <- atomicSwapIORef tknVerified 0
148149
tknDeleted' <- atomicSwapIORef tknDeleted 0
150+
tknReplaced' <- atomicSwapIORef tknReplaced 0
149151
subCreated' <- atomicSwapIORef subCreated 0
150152
subDeleted' <- atomicSwapIORef subDeleted 0
151153
ntfReceived' <- atomicSwapIORef ntfReceived 0
152154
ntfDelivered' <- atomicSwapIORef ntfDelivered 0
155+
ntfFailed' <- atomicSwapIORef ntfFailed 0
156+
ntfCronDelivered' <- atomicSwapIORef ntfCronDelivered 0
157+
ntfCronFailed' <- atomicSwapIORef ntfCronFailed 0
158+
ntfVrfQueued' <- atomicSwapIORef ntfVrfQueued 0
159+
ntfVrfDelivered' <- atomicSwapIORef ntfVrfDelivered 0
160+
ntfVrfFailed' <- atomicSwapIORef ntfVrfFailed 0
161+
ntfVrfInvalidTkn' <- atomicSwapIORef ntfVrfInvalidTkn 0
153162
tkn <- liftIO $ periodStatCounts activeTokens ts
154163
sub <- liftIO $ periodStatCounts activeSubs ts
155164
hPutStrLn h $
@@ -168,7 +177,15 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do
168177
monthCount tkn,
169178
dayCount sub,
170179
weekCount sub,
171-
monthCount sub
180+
monthCount sub,
181+
show tknReplaced',
182+
show ntfFailed',
183+
show ntfCronDelivered',
184+
show ntfCronFailed',
185+
show ntfVrfQueued',
186+
show ntfVrfDelivered',
187+
show ntfVrfFailed',
188+
show ntfVrfInvalidTkn'
172189
]
173190
liftIO $ threadDelay' interval
174191

@@ -225,9 +242,18 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do
225242
putStat "tknCreated" tknCreated
226243
putStat "tknVerified" tknVerified
227244
putStat "tknDeleted" tknDeleted
245+
putStat "tknReplaced" tknReplaced
228246
putStat "subCreated" subCreated
229247
putStat "subDeleted" subDeleted
230248
putStat "ntfReceived" ntfReceived
249+
putStat "ntfDelivered" ntfDelivered
250+
putStat "ntfFailed" ntfFailed
251+
putStat "ntfCronDelivered" ntfCronDelivered
252+
putStat "ntfCronFailed" ntfCronFailed
253+
putStat "ntfVrfQueued" ntfVrfQueued
254+
putStat "ntfVrfDelivered" ntfVrfDelivered
255+
putStat "ntfVrfFailed" ntfVrfFailed
256+
putStat "ntfVrfInvalidTkn" ntfVrfInvalidTkn
231257
getStat (day . activeTokens) >>= \v -> hPutStrLn h $ "daily active tokens: " <> show (IS.size v)
232258
getStat (day . activeSubs) >>= \v -> hPutStrLn h $ "daily active subscriptions: " <> show (IS.size v)
233259
CPStatsRTS -> tryAny getRTSStats >>= either (hPrint h) (hPrint h)
@@ -242,15 +268,19 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do
242268
#else
243269
hPutStrLn h "Threads: not available on GHC 8.10"
244270
#endif
245-
NtfSubscriber {smpSubscribers, smpAgent = a} <- unliftIO u $ asks subscriber
271+
NtfEnv {subscriber, pushServer} <- unliftIO u ask
272+
let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber
273+
NtfPushServer {pushQ} = pushServer
274+
SMPClientAgent {smpClients, smpSessions, srvSubs, pendingSrvSubs, smpSubWorkers} = a
246275
putSMPWorkers a "SMP subcscribers" smpSubscribers
247-
let SMPClientAgent {smpClients, smpSessions, srvSubs, pendingSrvSubs, smpSubWorkers} = a
248276
putSMPWorkers a "SMP clients" smpClients
249277
putSMPWorkers a "SMP subscription workers" smpSubWorkers
250278
sessions <- readTVarIO smpSessions
251279
hPutStrLn h $ "SMP sessions count: " <> show (M.size sessions)
252280
putSMPSubs a "SMP subscriptions" srvSubs
253281
putSMPSubs a "Pending SMP subscriptions" pendingSrvSubs
282+
sz <- atomically $ lengthTBQueue pushQ
283+
hPutStrLn h $ "Push notifications queue length: " <> show sz
254284
where
255285
putSMPSubs :: SMPClientAgent -> String -> TMap SMPServer (TMap SMPSub a) -> IO ()
256286
putSMPSubs a name v = do
@@ -432,7 +462,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
432462

433463
ntfPush :: NtfPushServer -> M ()
434464
ntfPush s@NtfPushServer {pushQ} = forever $ do
435-
(tkn@NtfTknData {ntfTknId, token = DeviceToken pp _, tknStatus}, ntf) <- atomically (readTBQueue pushQ)
465+
(tkn@NtfTknData {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
436466
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
437467
status <- readTVarIO tknStatus
438468
case ntf of
@@ -444,14 +474,16 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
444474
NTConfirmed -> (Nothing, NTConfirmed)
445475
_ -> (Just NTConfirmed, NTConfirmed)
446476
forM_ status_ $ \status' -> withNtfLog $ \sl -> logTokenStatus sl ntfTknId status'
447-
_ -> pure ()
477+
incNtfStatT t ntfVrfDelivered
478+
Left _ -> incNtfStatT t ntfVrfFailed
448479
PNCheckMessages -> checkActiveTkn status $ do
449-
void $ deliverNotification pp tkn ntf
480+
deliverNotification pp tkn ntf
481+
>>= incNtfStatT t . (\case Left _ -> ntfCronFailed; Right () -> ntfCronDelivered)
450482
PNMessage {} -> checkActiveTkn status $ do
451483
stats <- asks serverStats
452484
liftIO $ updatePeriodStats (activeTokens stats) ntfTknId
453-
void $ deliverNotification pp tkn ntf
454-
incNtfStat ntfDelivered
485+
deliverNotification pp tkn ntf
486+
>>= incNtfStatT t . (\case Left _ -> ntfFailed; Right () -> ntfDelivered)
455487
where
456488
checkActiveTkn :: NtfTknStatus -> M () -> M ()
457489
checkActiveTkn status action
@@ -466,14 +498,18 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
466498
PPConnection _ -> retryDeliver
467499
PPRetryLater -> retryDeliver
468500
PPCryptoError _ -> err e
469-
PPResponseError _ _ -> err e
501+
PPResponseError {} -> err e
470502
PPTokenInvalid r -> updateTknStatus tkn (NTInvalid $ Just r) >> err e
471503
PPPermanentError -> err e
472504
where
473505
retryDeliver :: M (Either PushProviderError ())
474506
retryDeliver = do
475507
deliver <- liftIO $ newPushClient s pp
476-
liftIO (runExceptT $ deliver tkn ntf) >>= either err (pure . Right)
508+
liftIO (runExceptT $ deliver tkn ntf) >>= \case
509+
Right _ -> pure $ Right ()
510+
Left e -> case e of
511+
PPTokenInvalid r -> updateTknStatus tkn (NTInvalid $ Just r) >> err e
512+
_ -> err e
477513
err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e
478514

479515
updateTknStatus :: NtfTknData -> NtfTknStatus -> M ()
@@ -593,7 +629,6 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
593629
let t' = Just ts'
594630
t <- atomically $ swapTVar tknUpdatedAt t'
595631
unless (t' == t) $ withNtfLog $ \s -> logUpdateTokenTime s ntfTknId ts'
596-
597632
processCommand :: NtfRequest -> M (Transmission NtfResponse)
598633
processCommand = \case
599634
NtfReqNew corrId (ANE SToken newTkn@(NewNtfTkn token _ dhPubKey)) -> do
@@ -607,6 +642,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
607642
tkn <- liftIO $ mkNtfTknData tknId newTkn ks dhSecret regCode ts
608643
atomically $ addNtfToken st tknId tkn
609644
atomically $ writeTBQueue pushQ (tkn, PNVerification regCode)
645+
incNtfStatT token ntfVrfQueued
610646
withNtfLog (`logCreateToken` tkn)
611647
incNtfStatT token tknCreated
612648
pure (corrId, NoEntity, NRTknId tknId srvDhPubKey)
@@ -620,6 +656,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
620656
if tknDhSecret == dhSecret
621657
then do
622658
atomically $ writeTBQueue pushQ (tkn, PNVerification tknRegCode)
659+
incNtfStatT token ntfVrfQueued
623660
pure $ NRTknId ntfTknId srvDhPubKey
624661
else pure $ NRErr AUTH
625662
TVFY code -- this allows repeated verification for cases when client connection dropped before server response
@@ -647,9 +684,9 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
647684
let tkn' = tkn {token = token', tknRegCode = regCode}
648685
addNtfToken st tknId tkn'
649686
writeTBQueue pushQ (tkn', PNVerification regCode)
687+
incNtfStatT token ntfVrfQueued
650688
withNtfLog $ \s -> logUpdateToken s tknId token' regCode
651-
incNtfStatT token tknDeleted
652-
incNtfStatT token tknCreated
689+
incNtfStatT token tknReplaced
653690
pure NROk
654691
TDEL -> do
655692
logDebug "TDEL"

src/Simplex/Messaging/Notifications/Server/Main.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ ntfServerCLI cfgPath logPath =
154154
regCodeBytes = 32,
155155
clientQSize = 64,
156156
subQSize = 512,
157-
pushQSize = 1048,
157+
pushQSize = 16384,
158158
smpAgentCfg =
159159
defaultSMPClientAgentConfig
160160
{ smpCfg =

src/Simplex/Messaging/Notifications/Server/Push/APNS.hs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -337,19 +337,20 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke
337337
result status reason'
338338
| status == Just N.ok200 = pure ()
339339
| status == Just N.badRequest400 =
340-
case reason' of
341-
"BadDeviceToken" -> throwE $ PPTokenInvalid NTIRBadToken
342-
"DeviceTokenNotForTopic" -> throwE $ PPTokenInvalid NTIRTokenNotForTopic
343-
"TopicDisallowed" -> throwE PPPermanentError
344-
_ -> err status reason'
345-
| status == Just N.forbidden403 = case reason' of
346-
"ExpiredProviderToken" -> throwE PPPermanentError -- there should be no point retrying it as the token was refreshed
347-
"InvalidProviderToken" -> throwE PPPermanentError
348-
_ -> err status reason'
349-
| status == Just N.gone410 = throwE $ PPTokenInvalid NTIRGone410
340+
throwE $ case reason' of
341+
"BadDeviceToken" -> PPTokenInvalid NTIRBadToken
342+
"DeviceTokenNotForTopic" -> PPTokenInvalid NTIRTokenNotForTopic
343+
"TopicDisallowed" -> PPPermanentError
344+
_ -> PPResponseError status reason'
345+
| status == Just N.forbidden403 = throwE $ case reason' of
346+
"ExpiredProviderToken" -> PPPermanentError -- there should be no point retrying it as the token was refreshed
347+
"InvalidProviderToken" -> PPPermanentError
348+
_ -> PPResponseError status reason'
349+
| status == Just N.gone410 = throwE $ case reason' of
350+
"ExpiredToken" -> PPTokenInvalid NTIRExpiredToken
351+
"Unregistered" -> PPTokenInvalid NTIRUnregistered
352+
_ -> PPRetryLater
350353
| status == Just N.serviceUnavailable503 = liftIO (disconnectApnsHTTP2Client c) >> throwE PPRetryLater
351354
-- Just tooManyRequests429 -> TooManyRequests - too many requests for the same token
352-
| otherwise = err status reason'
353-
err :: Maybe Status -> Text -> ExceptT PushProviderError IO ()
354-
err s r = throwE $ PPResponseError s r
355+
| otherwise = throwE $ PPResponseError status reason'
355356
liftHTTPS2 a = ExceptT $ first PPConnection <$> a

0 commit comments

Comments
 (0)