Skip to content

Commit 384a335

Browse files
authored
ntf server: more efficient status update query (#1584)
1 parent 7a04fdf commit 384a335

File tree

3 files changed

+18
-17
lines changed

3 files changed

+18
-17
lines changed

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,8 @@ subscribeSrvSubs ca st batchSize (srv, srvId, service_) = do
480480

481481
-- this function is concurrency-safe - only onle subscriber per server can be created at a time,
482482
-- other threads would wait for the first thread to create it.
483-
subscribeNtfs :: NtfSubscriber -> NtfPostgresStore -> SMPServer -> ServerNtfSub -> IO ()
484-
subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st smpServer ntfSub =
483+
subscribeNtfs :: NtfSubscriber -> NtfPostgresStore -> SMPServer -> Int64 -> ServerNtfSub -> IO ()
484+
subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st smpServer srvId ntfSub =
485485
getSubscriberVar
486486
>>= either createSMPSubscriber waitForSMPSubscriber
487487
>>= mapM_ (\sub -> atomically $ writeTQueue (subscriberSubQ sub) ntfSub)
@@ -493,8 +493,8 @@ subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st sm
493493
createSMPSubscriber v =
494494
E.handle (\(e :: SomeException) -> logError ("SMP subscriber exception: " <> tshow e) >> removeSubscriber v) $ do
495495
q <- newTQueueIO
496-
tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber q)
497-
let sub = SMPSubscriber {smpServer, subscriberSubQ = q, subThreadId = tId}
496+
tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber smpServer srvId q)
497+
let sub = SMPSubscriber {smpServer, smpServerId = srvId, subscriberSubQ = q, subThreadId = tId}
498498
atomically $ putTMVar (sessionVar v) sub -- this makes it available for other threads
499499
pure $ Just sub
500500

@@ -510,13 +510,13 @@ subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st sm
510510
atomically $ removeSessVar v smpServer smpSubscribers
511511
pure Nothing
512512

513-
runSMPSubscriber :: TQueue ServerNtfSub -> IO ()
514-
runSMPSubscriber q = forever $ do
513+
runSMPSubscriber :: SMPServer -> Int64 -> TQueue ServerNtfSub -> IO ()
514+
runSMPSubscriber smpServer' srvId' q = forever $ do
515515
-- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds
516516
-- this should be analysed once we have prometheus stats
517517
(nId, sub) <- atomically $ readTQueue q
518-
void $ updateSubStatus st nId NSPending
519-
subscribeQueuesNtfs ca smpServer [sub]
518+
void $ updateSubStatus st srvId' nId NSPending
519+
subscribeQueuesNtfs ca smpServer' [sub]
520520

521521
ntfSubscriber :: NtfSubscriber -> M ()
522522
ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
@@ -866,12 +866,12 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
866866
let sub = mkNtfSubRec subId newSub
867867
resp <-
868868
withNtfStore (`addNtfSubscription` sub) $ \case
869-
True -> do
869+
(srvId, True) -> do
870870
st <- asks store
871-
liftIO $ subscribeNtfs ns st srv (subId, (nId, nKey))
871+
liftIO $ subscribeNtfs ns st srv srvId (subId, (nId, nKey))
872872
incNtfStat subCreated
873873
pure $ NRSubId subId
874-
False -> pure $ NRErr AUTH
874+
(_, False) -> pure $ NRErr AUTH
875875
pure (corrId, NoEntity, resp)
876876
NtfReqCmd SSubscription (NtfSub NtfSubRec {ntfSubId, smpQueue = SMPQueueNtf {smpServer, notifierId}, notifierKey = registeredNKey, subStatus}) (corrId, subId, cmd) -> do
877877
(corrId,subId,) <$> case cmd of

src/Simplex/Messaging/Notifications/Server/Env.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ newNtfSubscriber smpAgentCfg random = do
141141

142142
data SMPSubscriber = SMPSubscriber
143143
{ smpServer :: SMPServer,
144+
smpServerId :: Int64,
144145
subscriberSubQ :: TQueue ServerNtfSub,
145146
subThreadId :: Weak ThreadId
146147
}

src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -401,13 +401,13 @@ updateTokenCronSentAt st tknId now =
401401
withDB' "updateTokenCronSentAt" st $ \db ->
402402
void $ DB.execute db "UPDATE tokens t SET cron_sent_at = ? WHERE token_id = ?" (now, tknId)
403403

404-
addNtfSubscription :: NtfPostgresStore -> NtfSubRec -> IO (Either ErrorType Bool)
404+
addNtfSubscription :: NtfPostgresStore -> NtfSubRec -> IO (Either ErrorType (Int64, Bool))
405405
addNtfSubscription st sub =
406406
withFastDB "addNtfSubscription" st $ \db -> runExceptT $ do
407407
srvId :: Int64 <- ExceptT $ upsertServer db $ ntfSubServer' sub
408408
n <- liftIO $ DB.execute db insertNtfSubQuery $ ntfSubToRow srvId sub
409409
withLog "addNtfSubscription" st (`logCreateSubscription` sub)
410-
pure $ n > 0
410+
pure (srvId, n > 0)
411411
where
412412
-- It is possible to combine these two statements into one with CTEs,
413413
-- to reduce roundtrips in case of `insert`, but it would be making 2 queries in all cases.
@@ -454,19 +454,19 @@ deleteNtfSubscription st subId =
454454
DB.execute db "DELETE FROM subscriptions WHERE subscription_id = ?" (Only subId)
455455
withLog "deleteNtfSubscription" st (`logDeleteSubscription` subId)
456456

457-
updateSubStatus :: NtfPostgresStore -> NotifierId -> NtfSubStatus -> IO (Either ErrorType ())
458-
updateSubStatus st nId status =
457+
updateSubStatus :: NtfPostgresStore -> Int64 -> NotifierId -> NtfSubStatus -> IO (Either ErrorType ())
458+
updateSubStatus st srvId nId status =
459459
withFastDB' "updateSubStatus" st $ \db -> do
460460
sub_ :: Maybe (NtfSubscriptionId, NtfAssociatedService) <-
461461
maybeFirstRow id $
462462
DB.query
463463
db
464464
[sql|
465465
UPDATE subscriptions SET status = ?
466-
WHERE smp_notifier_id = ? AND status != ?
466+
WHERE smp_server_id = ? AND smp_notifier_id = ? AND status != ?
467467
RETURNING subscription_id, ntf_service_assoc
468468
|]
469-
(status, nId, status)
469+
(status, srvId, nId, status)
470470
forM_ sub_ $ \(subId, serviceAssoc) ->
471471
withLog "updateSubStatus" st $ \sl -> logSubscriptionStatus sl (subId, status, serviceAssoc)
472472

0 commit comments

Comments
 (0)