Skip to content

Commit ba3c75e

Browse files
authored
smp server: correctly track if ntf service is subscribed and total subscribed queues count (fixes race condition between NSUB and NSUBS from notification server) (#1583)
1 parent 1062ccc commit ba3c75e

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ client
13211321
-- TODO [certs rcv] rcv subscriptions
13221322
Server {subscribers, ntfSubscribers}
13231323
ms
1324-
clnt@Client {clientId, subscriptions, ntfSubscriptions, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
1324+
clnt@Client {clientId, subscriptions, ntfSubscriptions, ntfServiceSubscribed, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
13251325
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
13261326
let THandleParams {thVersion} = thParams'
13271327
service = peerClientService =<< thAuth thParams'
@@ -1721,18 +1721,19 @@ client
17211721

17221722
subscribeServiceNotifications :: THPeerClientService -> M s BrokerMsg
17231723
subscribeServiceNotifications THClientService {serviceId} = do
1724-
srvSubs <- readTVarIO ntfServiceSubsCount
1725-
if srvSubs == 0
1726-
then
1724+
subscribed <- readTVarIO ntfServiceSubscribed
1725+
if subscribed
1726+
then SOKS <$> readTVarIO ntfServiceSubsCount
1727+
else
17271728
liftIO (getNtfServiceQueueCount @(StoreQueue s) (queueStore ms) serviceId) >>= \case
17281729
Left e -> pure $ ERR e
1729-
Right count -> do
1730+
Right !count' -> do
17301731
atomically $ do
1731-
modifyTVar' ntfServiceSubsCount (+ count) -- service count
1732-
modifyTVar' (totalServiceSubs ntfSubscribers) (+ count) -- server count for all services
1732+
writeTVar ntfServiceSubscribed True
1733+
count <- swapTVar ntfServiceSubsCount count'
1734+
modifyTVar' (totalServiceSubs ntfSubscribers) (+ (count' - count)) -- server count for all services
17331735
atomically $ writeTQueue (subQ ntfSubscribers) (CSService serviceId, clientId)
1734-
pure $ SOKS count
1735-
else pure $ SOKS srvSubs
1736+
pure $ SOKS count'
17361737

17371738
acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
17381739
acknowledgeMsg msgId q qr = time "ACK" $ do

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,8 @@ data Client s = Client
389389
{ clientId :: ClientId,
390390
subscriptions :: TMap RecipientId Sub,
391391
ntfSubscriptions :: TMap NotifierId (),
392+
serviceSubscribed :: TVar Bool, -- set independently of serviceSubsCount, to track whether service subscription command was received
393+
ntfServiceSubscribed :: TVar Bool,
392394
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
393395
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
394396
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
@@ -458,6 +460,8 @@ newClient :: ClientId -> Natural -> THandleParams SMPVersion 'TServer -> SystemT
458460
newClient clientId qSize clientTHParams createdAt = do
459461
subscriptions <- TM.emptyIO
460462
ntfSubscriptions <- TM.emptyIO
463+
serviceSubscribed <- newTVarIO False
464+
ntfServiceSubscribed <- newTVarIO False
461465
serviceSubsCount <- newTVarIO 0
462466
ntfServiceSubsCount <- newTVarIO 0
463467
rcvQ <- newTBQueueIO qSize
@@ -474,6 +478,8 @@ newClient clientId qSize clientTHParams createdAt = do
474478
{ clientId,
475479
subscriptions,
476480
ntfSubscriptions,
481+
serviceSubscribed,
482+
ntfServiceSubscribed,
477483
serviceSubsCount,
478484
ntfServiceSubsCount,
479485
rcvQ,

0 commit comments

Comments
 (0)