Skip to content

Commit c63d93d

Browse files
committed
smp server: maintain IDs hash in session subscription states
1 parent bdf8e7e commit c63d93d

File tree

4 files changed

+55
-41
lines changed

4 files changed

+55
-41
lines changed

src/Simplex/Messaging/Protocol.hs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ module Simplex.Messaging.Protocol
147147
serviceSubResult,
148148
queueIdsHash,
149149
queueIdHash,
150+
noIdsHash,
151+
addServiceSubs,
152+
subtractServiceSubs,
150153
MaxMessageLen,
151154
MaxRcvMessageLen,
152155
EncRcvMsgBody (..),
@@ -1526,6 +1529,14 @@ queueIdHash :: QueueId -> IdsHash
15261529
queueIdHash = IdsHash . C.md5Hash . unEntityId
15271530
{-# INLINE queueIdHash #-}
15281531

1532+
addServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash)
1533+
addServiceSubs (n', idsHash') (n, idsHash) = (n + n', idsHash <> idsHash')
1534+
1535+
subtractServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash)
1536+
subtractServiceSubs (n', idsHash') (n, idsHash)
1537+
| n > n' = (n - n', idsHash <> idsHash') -- concat is a reversible xor: (x `xor` y) `xor` y == x
1538+
| otherwise = (0, noIdsHash)
1539+
15291540
data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock
15301541

15311542
-- | Type for protocol errors.

src/Simplex/Messaging/Server.hs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ type AttachHTTP = Socket -> TLS.Context -> IO ()
166166
-- actions used in serverThread to reduce STM transaction scope
167167
data ClientSubAction
168168
= CSAEndSub QueueId -- end single direct queue subscription
169-
| CSAEndServiceSub -- end service subscription to one queue
170-
| CSADecreaseSubs Int64 -- reduce service subscriptions when cancelling. Fixed number is used to correctly handle race conditions when service resubscribes
169+
| CSAEndServiceSub QueueId -- end service subscription to one queue
170+
| CSADecreaseSubs (Int64, IdsHash) -- reduce service subscriptions when cancelling. Fixed number is used to correctly handle race conditions when service resubscribes
171171

172172
type PrevClientSub s = (Client s, ClientSubAction, (EntityId, BrokerMsg))
173173

@@ -251,7 +251,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
251251
Server s ->
252252
(Server s -> ServerSubscribers s) ->
253253
(Client s -> TMap QueueId sub) ->
254-
(Client s -> TVar Int64) ->
254+
(Client s -> TVar (Int64, IdsHash)) ->
255255
Maybe (sub -> IO ()) ->
256256
M s ()
257257
serverThread label srv srvSubscribers clientSubs clientServiceSubs unsub_ = do
@@ -277,7 +277,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
277277
as'' <- if prevServiceId == serviceId_ then pure [] else endServiceSub prevServiceId qId END
278278
case serviceId_ of
279279
Just serviceId -> do
280-
modifyTVar' totalServiceSubs (+ 1) -- server count for all services
280+
modifyTVar' totalServiceSubs $ addServiceSubs (1, queueIdHash qId) -- server count and IDs hash for all services
281281
as <- endQueueSub qId END
282282
as' <- cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers
283283
pure $ as ++ as' ++ as''
@@ -289,9 +289,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
289289
as <- endQueueSub qId DELD
290290
as' <- endServiceSub serviceId qId DELD
291291
pure $ as ++ as'
292-
CSService serviceId count -> do
292+
CSService serviceId changedSubs -> do
293293
modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients
294-
modifyTVar' totalServiceSubs (+ count) -- server count for all services
294+
modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs -- server count and IDs hash for all services
295295
cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers
296296
updateSubDisconnected = case clntSub of
297297
-- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service
@@ -309,15 +309,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
309309
endQueueSub qId msg = prevSub qId msg (CSAEndSub qId) =<< lookupDeleteSubscribedClient qId queueSubscribers
310310
endServiceSub :: Maybe ServiceId -> QueueId -> BrokerMsg -> STM [PrevClientSub s]
311311
endServiceSub Nothing _ _ = pure []
312-
endServiceSub (Just serviceId) qId msg = prevSub qId msg CSAEndServiceSub =<< lookupSubscribedClient serviceId serviceSubscribers
312+
endServiceSub (Just serviceId) qId msg = prevSub qId msg (CSAEndServiceSub qId) =<< lookupSubscribedClient serviceId serviceSubscribers
313313
prevSub :: QueueId -> BrokerMsg -> ClientSubAction -> Maybe (Client s) -> STM [PrevClientSub s]
314314
prevSub qId msg action =
315315
checkAnotherClient $ \c -> pure [(c, action, (qId, msg))]
316316
cancelServiceSubs :: ServiceId -> Maybe (Client s) -> STM [PrevClientSub s]
317317
cancelServiceSubs serviceId =
318318
checkAnotherClient $ \c -> do
319-
n <- swapTVar (clientServiceSubs c) 0
320-
pure [(c, CSADecreaseSubs n, (serviceId, ENDS n))]
319+
changedSubs@(n, _) <- swapTVar (clientServiceSubs c) (0, noIdsHash)
320+
pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n))]
321321
checkAnotherClient :: (Client s -> STM [PrevClientSub s]) -> Maybe (Client s) -> STM [PrevClientSub s]
322322
checkAnotherClient mkSub = \case
323323
Just c@Client {clientId, connected} | clntId /= clientId ->
@@ -332,20 +332,20 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
332332
where
333333
a (Just unsub) (Just s) = unsub s
334334
a _ _ = pure ()
335-
CSAEndServiceSub -> atomically $ do
335+
CSAEndServiceSub qId -> atomically $ do
336336
modifyTVar' (clientServiceSubs c) decrease
337337
modifyTVar' totalServiceSubs decrease
338338
where
339-
decrease n = max 0 (n - 1)
339+
decrease = subtractServiceSubs (1, queueIdHash qId)
340340
-- TODO [certs rcv] for SMP subscriptions CSADecreaseSubs should also remove all delivery threads of the passed client
341-
CSADecreaseSubs n' -> atomically $ modifyTVar' totalServiceSubs $ \n -> max 0 (n - n')
341+
CSADecreaseSubs changedSubs -> atomically $ modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs
342342
where
343343
endSub :: Client s -> QueueId -> STM (Maybe sub)
344344
endSub c qId = TM.lookupDelete qId (clientSubs c) >>= (removeWhenNoSubs c $>)
345345
-- remove client from server's subscribed cients
346346
removeWhenNoSubs c = do
347347
noClientSubs <- null <$> readTVar (clientSubs c)
348-
noServiceSubs <- (0 ==) <$> readTVar (clientServiceSubs c)
348+
noServiceSubs <- ((0 ==) . fst) <$> readTVar (clientServiceSubs c)
349349
when (noClientSubs && noServiceSubs) $ modifyTVar' subClients $ IS.delete (clientId c)
350350

351351
deliverNtfsThread :: Server s -> M s ()
@@ -1112,10 +1112,10 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, serviceS
11121112
updateSubscribers subs ServerSubscribers {queueSubscribers, subClients} = do
11131113
mapM_ (\qId -> deleteSubcribedClient qId c queueSubscribers) (M.keys subs)
11141114
atomically $ modifyTVar' subClients $ IS.delete clientId
1115-
updateServiceSubs :: ServiceId -> TVar Int64 -> ServerSubscribers s -> IO ()
1115+
updateServiceSubs :: ServiceId -> TVar (Int64, IdsHash) -> ServerSubscribers s -> IO ()
11161116
updateServiceSubs serviceId subsCount ServerSubscribers {totalServiceSubs, serviceSubscribers} = do
11171117
deleteSubcribedClient serviceId c serviceSubscribers
1118-
atomically . modifyTVar' totalServiceSubs . subtract =<< readTVarIO subsCount
1118+
atomically . modifyTVar' totalServiceSubs . subtractServiceSubs =<< readTVarIO subsCount
11191119

11201120
cancelSub :: Sub -> IO ()
11211121
cancelSub s = case subThread s of
@@ -1661,7 +1661,7 @@ client
16611661
subscribeNewQueue :: RecipientId -> QueueRec -> M s ()
16621662
subscribeNewQueue rId QueueRec {rcvServiceId} = do
16631663
case rcvServiceId of
1664-
Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) (+ 1)
1664+
Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) $ addServiceSubs (1, queueIdHash rId)
16651665
Nothing -> do
16661666
sub <- atomically $ newSubscription NoSub
16671667
atomically $ TM.insert rId sub $ subscriptions clnt
@@ -1741,7 +1741,7 @@ client
17411741
Maybe ServiceId ->
17421742
ServerSubscribers s ->
17431743
(Client s -> TMap QueueId sub) ->
1744-
(Client s -> TVar Int64) ->
1744+
(Client s -> TVar (Int64, IdsHash)) ->
17451745
STM sub ->
17461746
(ServerStats -> ServiceStats) ->
17471747
M s (Either ErrorType (Bool, Maybe sub))
@@ -1771,9 +1771,9 @@ client
17711771
incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
17721772
pure (hasSub, Nothing)
17731773
where
1774-
hasServiceSub = (0 /=) <$> readTVar (clientServiceSubs clnt)
1774+
hasServiceSub = ((0 /=) . fst) <$> readTVar (clientServiceSubs clnt)
17751775
-- This function is used when queue association with the service is created.
1776-
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) (+ 1) -- service count
1776+
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDs hash
17771777
Nothing -> case queueServiceId of
17781778
Just _ -> runExceptT $ do
17791779
ExceptT $ setQueueService (queueStore ms) q party Nothing
@@ -1836,28 +1836,28 @@ client
18361836
subscribeServiceNotifications serviceId expected =
18371837
either ERR (uncurry SOKS . snd) <$> sharedSubscribeService SNotifierService serviceId expected ntfSubscribers ntfServiceSubscribed ntfServiceSubsCount ntfServices
18381838

1839-
sharedSubscribeService :: (PartyI p, ServiceParty p) => SParty p -> ServiceId -> (Int64, IdsHash) -> ServerSubscribers s -> (Client s -> TVar Bool) -> (Client s -> TVar Int64) -> (ServerStats -> ServiceStats) -> M s (Either ErrorType (Bool, (Int64, IdsHash)))
1839+
sharedSubscribeService :: (PartyI p, ServiceParty p) => SParty p -> ServiceId -> (Int64, IdsHash) -> ServerSubscribers s -> (Client s -> TVar Bool) -> (Client s -> TVar (Int64, IdsHash)) -> (ServerStats -> ServiceStats) -> M s (Either ErrorType (Bool, (Int64, IdsHash)))
18401840
sharedSubscribeService party serviceId (count, idsHash) srvSubscribers clientServiceSubscribed clientServiceSubs servicesSel = do
18411841
subscribed <- readTVarIO $ clientServiceSubscribed clnt
18421842
stats <- asks serverStats
18431843
liftIO $ runExceptT $
18441844
(subscribed,)
18451845
<$> if subscribed
1846-
then (,mempty) <$> readTVarIO (clientServiceSubs clnt) -- TODO [certs rcv] get IDs hash
1846+
then readTVarIO $ clientServiceSubs clnt
18471847
else do
1848-
(count', idsHash') <- ExceptT $ getServiceQueueCountHash @(StoreQueue s) (queueStore ms) party serviceId
1849-
incCount <- atomically $ do
1848+
subs'@(count', idsHash') <- ExceptT $ getServiceQueueCountHash @(StoreQueue s) (queueStore ms) party serviceId
1849+
subsChange <- atomically $ do
18501850
writeTVar (clientServiceSubscribed clnt) True
1851-
currCount <- swapTVar (clientServiceSubs clnt) count' -- TODO [certs rcv] maintain IDs hash here?
1852-
pure $ count' - currCount
1851+
currSubs <- swapTVar (clientServiceSubs clnt) subs'
1852+
pure $ subtractServiceSubs currSubs subs'
18531853
let incSrvStat sel n = liftIO $ atomicModifyIORef'_ (sel $ servicesSel stats) (+ n)
18541854
diff = fromIntegral $ count' - count
1855-
if -- TODO [certs rcv] account for not provided counts/hashes (expected n = -1)
1856-
| diff == 0 && idsHash == idsHash' -> incSrvStat srvSubOk 1
1855+
if -- `count == -1` only for subscriptions by old NTF servers
1856+
| count == -1 && (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1
18571857
| diff > 0 -> incSrvStat srvSubMore 1 >> incSrvStat srvSubMoreTotal diff
18581858
| diff < 0 -> incSrvStat srvSubFewer 1 >> incSrvStat srvSubFewerTotal (- diff)
18591859
| otherwise -> incSrvStat srvSubDiff 1
1860-
atomically $ writeTQueue (subQ srvSubscribers) (CSService serviceId incCount, clientId)
1860+
atomically $ writeTQueue (subQ srvSubscribers) (CSService serviceId subsChange, clientId)
18611861
pure (count', idsHash')
18621862

18631863
acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
@@ -2133,7 +2133,7 @@ client
21332133
-- we delete subscription here, so the client with no subscriptions can be disconnected.
21342134
sub <- atomically $ TM.lookupDelete entId $ subscriptions clnt
21352135
liftIO $ mapM_ cancelSub sub
2136-
when (isJust rcvServiceId) $ atomically $ modifyTVar' (serviceSubsCount clnt) $ \n -> max 0 (n - 1)
2136+
when (isJust rcvServiceId) $ atomically $ modifyTVar' (serviceSubsCount clnt) $ subtractServiceSubs (1, queueIdHash (recipientId q))
21372137
atomically $ writeTQueue (subQ subscribers) (CSDeleted entId rcvServiceId, clientId)
21382138
forM_ (notifier qr) $ \NtfCreds {notifierId = nId, ntfServiceId} -> do
21392139
-- queue is deleted by a different client from the one subscribed to notifications,

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ data ServerSubscribers s = ServerSubscribers
363363
{ subQ :: TQueue (ClientSub, ClientId),
364364
queueSubscribers :: SubscribedClients s,
365365
serviceSubscribers :: SubscribedClients s, -- service clients with long-term certificates that have subscriptions
366-
totalServiceSubs :: TVar Int64,
366+
totalServiceSubs :: TVar (Int64, IdsHash),
367367
subClients :: TVar IntSet, -- clients with individual or service subscriptions
368368
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
369369
}
@@ -426,7 +426,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
426426
data ClientSub
427427
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
428428
| CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
429-
| CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
429+
| CSService ServiceId (Int64, IdsHash) -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
430430

431431
newtype ProxyAgent = ProxyAgent
432432
{ smpAgent :: SMPClientAgent 'Sender
@@ -440,8 +440,8 @@ data Client s = Client
440440
ntfSubscriptions :: TMap NotifierId (),
441441
serviceSubscribed :: TVar Bool, -- set independently of serviceSubsCount, to track whether service subscription command was received
442442
ntfServiceSubscribed :: TVar Bool,
443-
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
444-
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
443+
serviceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count
444+
ntfServiceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count
445445
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
446446
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
447447
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
@@ -502,7 +502,7 @@ newServerSubscribers = do
502502
subQ <- newTQueueIO
503503
queueSubscribers <- SubscribedClients <$> TM.emptyIO
504504
serviceSubscribers <- SubscribedClients <$> TM.emptyIO
505-
totalServiceSubs <- newTVarIO 0
505+
totalServiceSubs <- newTVarIO (0, noIdsHash)
506506
subClients <- newTVarIO IS.empty
507507
pendingEvents <- newTVarIO IM.empty
508508
pure ServerSubscribers {subQ, queueSubscribers, serviceSubscribers, totalServiceSubs, subClients, pendingEvents}
@@ -513,8 +513,8 @@ newClient clientId qSize clientTHParams createdAt = do
513513
ntfSubscriptions <- TM.emptyIO
514514
serviceSubscribed <- newTVarIO False
515515
ntfServiceSubscribed <- newTVarIO False
516-
serviceSubsCount <- newTVarIO 0
517-
ntfServiceSubsCount <- newTVarIO 0
516+
serviceSubsCount <- newTVarIO (0, noIdsHash)
517+
ntfServiceSubsCount <- newTVarIO (0, noIdsHash)
518518
rcvQ <- newTBQueueIO qSize
519519
sndQ <- newTBQueueIO qSize
520520
msgQ <- newTBQueueIO qSize

src/Simplex/Messaging/Server/Main.hs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
module Simplex.Messaging.Server.Main where
1919

2020
import Control.Concurrent.STM
21-
import Control.Exception (SomeException, finally, try)
21+
import Control.Exception (finally)
2222
import Control.Logger.Simple
2323
import Control.Monad
2424
import qualified Data.Attoparsec.ByteString.Char8 as A
@@ -28,10 +28,8 @@ import Data.Char (isAlpha, isAscii, toUpper)
2828
import Data.Either (fromRight)
2929
import Data.Functor (($>))
3030
import Data.Ini (Ini, lookupValue, readIniFile)
31-
import Data.Int (Int64)
3231
import Data.List (find, isPrefixOf)
3332
import qualified Data.List.NonEmpty as L
34-
import qualified Data.Map.Strict as M
3533
import Data.Maybe (fromMaybe, isJust, isNothing)
3634
import Data.Text (Text)
3735
import qualified Data.Text as T
@@ -61,14 +59,17 @@ import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupp
6159
import Simplex.Messaging.Transport.Client (TransportHost (..), defaultSocksProxy)
6260
import Simplex.Messaging.Transport.HTTP2 (httpALPN)
6361
import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig)
64-
import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM)
62+
import Simplex.Messaging.Util (eitherToMaybe, ifM)
6563
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
6664
import System.Exit (exitFailure)
6765
import System.FilePath (combine)
68-
import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile)
66+
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
6967
import Text.Read (readMaybe)
7068

7169
#if defined(dbServerPostgres)
70+
import Control.Exception (SomeException, try)
71+
import Data.Int (Int64)
72+
import qualified Data.Map.Strict as M
7273
import Data.Semigroup (Sum (..))
7374
import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists)
7475
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
@@ -79,7 +80,9 @@ import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchIns
7980
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..))
8081
import Simplex.Messaging.Server.QueueStore.Types
8182
import Simplex.Messaging.Server.StoreLog (closeStoreLog, logNewService, logCreateQueue, openWriteStoreLog)
83+
import Simplex.Messaging.Util (unlessM)
8284
import System.Directory (renameFile)
85+
import System.IO (IOMode (..), withFile)
8386
#endif
8487

8588
smpServerCLI :: FilePath -> FilePath -> IO ()

0 commit comments

Comments
 (0)