diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f155ce77b..f44708fe6 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1539,7 +1539,6 @@ resubscribeConnections' c connIds = do [] -> pure True rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs' --- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSubResult)) subscribeClientServices' c userId = ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed" diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 67ed89d71..e7c1ca5f9 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -576,9 +576,10 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = -- TODO [certs rcv] resubscribe queues with statuses NSErr and NSService CAServiceDisconnected srv serviceSub -> logNote $ "SMP server service disconnected " <> showService srv serviceSub - CAServiceSubscribed srv serviceSub@(ServiceSub _ expected _) (ServiceSub _ n _) -- TODO [certs rcv] compare hash - | expected == n -> logNote msg - | otherwise -> logWarn $ msg <> ", confirmed subs: " <> tshow n + CAServiceSubscribed srv serviceSub@(ServiceSub _ n idsHash) (ServiceSub _ n' idsHash') + | n /= n' -> logWarn $ msg <> ", confirmed subs: " <> tshow n' + | idsHash /= idsHash' -> logWarn $ msg <> ", different IDs hash" + | otherwise -> logNote msg where msg = "SMP server service subscribed " <> showService srv serviceSub CAServiceSubError srv serviceSub e -> @@ -593,8 +594,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing) Left e -> logError $ "SMP server update and resubscription error " <> tshow e where - -- TODO [certs rcv] compare hash - showService srv (ServiceSub serviceId n _idsHash) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs" + showService srv (ServiceSub serviceId n _) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs" logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int -> IO () logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> do diff --git a/src/Simplex/Messaging/Notifications/Server/Stats.hs b/src/Simplex/Messaging/Notifications/Server/Stats.hs index 7125ce290..a20e41c34 100644 --- a/src/Simplex/Messaging/Notifications/Server/Stats.hs +++ b/src/Simplex/Messaging/Notifications/Server/Stats.hs @@ -17,7 +17,6 @@ import Simplex.Messaging.Server.Stats import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM --- TODO [certs rcv] track service subscriptions and count/hash diffs for own and other servers + prometheus data NtfServerStats = NtfServerStats { fromTime :: IORef UTCTime, tknCreated :: IORef Int, diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 6b232f12b..51128597c 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -147,6 +147,9 @@ module Simplex.Messaging.Protocol serviceSubResult, queueIdsHash, queueIdHash, + noIdsHash, + addServiceSubs, + subtractServiceSubs, MaxMessageLen, MaxRcvMessageLen, EncRcvMsgBody (..), @@ -1526,6 +1529,14 @@ queueIdHash :: QueueId -> IdsHash queueIdHash = IdsHash . C.md5Hash . unEntityId {-# INLINE queueIdHash #-} +addServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash) +addServiceSubs (n', idsHash') (n, idsHash) = (n + n', idsHash <> idsHash') + +subtractServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash) +subtractServiceSubs (n', idsHash') (n, idsHash) + | n > n' = (n - n', idsHash <> idsHash') -- concat is a reversible xor: (x `xor` y) `xor` y == x + | otherwise = (0, noIdsHash) + data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock -- | Type for protocol errors. diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 0fc15b3e3..b7bb0efaa 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -166,8 +166,8 @@ type AttachHTTP = Socket -> TLS.Context -> IO () -- actions used in serverThread to reduce STM transaction scope data ClientSubAction = CSAEndSub QueueId -- end single direct queue subscription - | CSAEndServiceSub -- end service subscription to one queue - | CSADecreaseSubs Int64 -- reduce service subscriptions when cancelling. Fixed number is used to correctly handle race conditions when service resubscribes + | CSAEndServiceSub QueueId -- end service subscription to one queue + | CSADecreaseSubs (Int64, IdsHash) -- reduce service subscriptions when cancelling. Fixed number is used to correctly handle race conditions when service resubscribes type PrevClientSub s = (Client s, ClientSubAction, (EntityId, BrokerMsg)) @@ -251,7 +251,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt Server s -> (Server s -> ServerSubscribers s) -> (Client s -> TMap QueueId sub) -> - (Client s -> TVar Int64) -> + (Client s -> TVar (Int64, IdsHash)) -> Maybe (sub -> IO ()) -> M s () serverThread label srv srvSubscribers clientSubs clientServiceSubs unsub_ = do @@ -277,7 +277,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt as'' <- if prevServiceId == serviceId_ then pure [] else endServiceSub prevServiceId qId END case serviceId_ of Just serviceId -> do - modifyTVar' totalServiceSubs (+ 1) -- server count for all services + modifyTVar' totalServiceSubs $ addServiceSubs (1, queueIdHash qId) -- server count and IDs hash for all services as <- endQueueSub qId END as' <- cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers pure $ as ++ as' ++ as'' @@ -289,9 +289,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt as <- endQueueSub qId DELD as' <- endServiceSub serviceId qId DELD pure $ as ++ as' - CSService serviceId count -> do + CSService serviceId changedSubs -> do modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients - modifyTVar' totalServiceSubs (+ count) -- server count for all services + modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs -- server count and IDs hash for all services cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers updateSubDisconnected = case clntSub of -- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service @@ -309,15 +309,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt endQueueSub qId msg = prevSub qId msg (CSAEndSub qId) =<< lookupDeleteSubscribedClient qId queueSubscribers endServiceSub :: Maybe ServiceId -> QueueId -> BrokerMsg -> STM [PrevClientSub s] endServiceSub Nothing _ _ = pure [] - endServiceSub (Just serviceId) qId msg = prevSub qId msg CSAEndServiceSub =<< lookupSubscribedClient serviceId serviceSubscribers + endServiceSub (Just serviceId) qId msg = prevSub qId msg (CSAEndServiceSub qId) =<< lookupSubscribedClient serviceId serviceSubscribers prevSub :: QueueId -> BrokerMsg -> ClientSubAction -> Maybe (Client s) -> STM [PrevClientSub s] prevSub qId msg action = checkAnotherClient $ \c -> pure [(c, action, (qId, msg))] cancelServiceSubs :: ServiceId -> Maybe (Client s) -> STM [PrevClientSub s] cancelServiceSubs serviceId = checkAnotherClient $ \c -> do - n <- swapTVar (clientServiceSubs c) 0 - pure [(c, CSADecreaseSubs n, (serviceId, ENDS n))] + changedSubs@(n, _) <- swapTVar (clientServiceSubs c) (0, noIdsHash) + pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n))] checkAnotherClient :: (Client s -> STM [PrevClientSub s]) -> Maybe (Client s) -> STM [PrevClientSub s] checkAnotherClient mkSub = \case Just c@Client {clientId, connected} | clntId /= clientId -> @@ -332,20 +332,21 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt where a (Just unsub) (Just s) = unsub s a _ _ = pure () - CSAEndServiceSub -> atomically $ do + CSAEndServiceSub qId -> atomically $ do modifyTVar' (clientServiceSubs c) decrease modifyTVar' totalServiceSubs decrease where - decrease n = max 0 (n - 1) - -- TODO [certs rcv] for SMP subscriptions CSADecreaseSubs should also remove all delivery threads of the passed client - CSADecreaseSubs n' -> atomically $ modifyTVar' totalServiceSubs $ \n -> max 0 (n - n') + decrease = subtractServiceSubs (1, queueIdHash qId) + CSADecreaseSubs changedSubs -> do + atomically $ modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs + forM_ unsub_ $ \unsub -> atomically (swapTVar (clientSubs c) M.empty) >>= mapM_ unsub where endSub :: Client s -> QueueId -> STM (Maybe sub) endSub c qId = TM.lookupDelete qId (clientSubs c) >>= (removeWhenNoSubs c $>) -- remove client from server's subscribed cients removeWhenNoSubs c = do noClientSubs <- null <$> readTVar (clientSubs c) - noServiceSubs <- (0 ==) <$> readTVar (clientServiceSubs c) + noServiceSubs <- ((0 ==) . fst) <$> readTVar (clientServiceSubs c) when (noClientSubs && noServiceSubs) $ modifyTVar' subClients $ IS.delete (clientId c) deliverNtfsThread :: Server s -> M s () @@ -1112,10 +1113,10 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, serviceS updateSubscribers subs ServerSubscribers {queueSubscribers, subClients} = do mapM_ (\qId -> deleteSubcribedClient qId c queueSubscribers) (M.keys subs) atomically $ modifyTVar' subClients $ IS.delete clientId - updateServiceSubs :: ServiceId -> TVar Int64 -> ServerSubscribers s -> IO () + updateServiceSubs :: ServiceId -> TVar (Int64, IdsHash) -> ServerSubscribers s -> IO () updateServiceSubs serviceId subsCount ServerSubscribers {totalServiceSubs, serviceSubscribers} = do deleteSubcribedClient serviceId c serviceSubscribers - atomically . modifyTVar' totalServiceSubs . subtract =<< readTVarIO subsCount + atomically . modifyTVar' totalServiceSubs . subtractServiceSubs =<< readTVarIO subsCount cancelSub :: Sub -> IO () cancelSub s = case subThread s of @@ -1357,7 +1358,6 @@ forkClient Client {endThreads, endThreadSeq} label action = do client :: forall s. MsgStoreClass s => Server s -> s -> Client s -> M s () client - -- TODO [certs rcv] rcv subscriptions Server {subscribers, ntfSubscribers} ms clnt@Client {clientId, rcvQ, sndQ, msgQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do @@ -1661,7 +1661,7 @@ client subscribeNewQueue :: RecipientId -> QueueRec -> M s () subscribeNewQueue rId QueueRec {rcvServiceId} = do case rcvServiceId of - Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) (+ 1) + Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) $ addServiceSubs (1, queueIdHash rId) Nothing -> do sub <- atomically $ newSubscription NoSub atomically $ TM.insert rId sub $ subscriptions clnt @@ -1741,7 +1741,7 @@ client Maybe ServiceId -> ServerSubscribers s -> (Client s -> TMap QueueId sub) -> - (Client s -> TVar Int64) -> + (Client s -> TVar (Int64, IdsHash)) -> STM sub -> (ServerStats -> ServiceStats) -> M s (Either ErrorType (Bool, Maybe sub)) @@ -1771,9 +1771,9 @@ client incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId pure (hasSub, Nothing) where - hasServiceSub = (0 /=) <$> readTVar (clientServiceSubs clnt) + hasServiceSub = ((0 /=) . fst) <$> readTVar (clientServiceSubs clnt) -- This function is used when queue association with the service is created. - incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) (+ 1) -- service count + incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDs hash Nothing -> case queueServiceId of Just _ -> runExceptT $ do ExceptT $ setQueueService (queueStore ms) q party Nothing @@ -1801,27 +1801,36 @@ client sharedSubscribeService SRecipientService serviceId expected subscribers serviceSubscribed serviceSubsCount rcvServices >>= \case Left e -> pure $ ERR e Right (hasSub, (count, idsHash)) -> do - unless hasSub $ forkClient clnt "deliverServiceMessages" $ liftIO $ deliverServiceMessages count + stats <- asks serverStats + unless hasSub $ forkClient clnt "deliverServiceMessages" $ liftIO $ deliverServiceMessages stats count pure $ SOKS count idsHash where - deliverServiceMessages expectedCnt = do - (qCnt, _msgCnt, _dupCnt, _errCnt) <- foldRcvServiceMessages ms serviceId deliverQueueMsg (0, 0, 0, 0) - atomically $ writeTBQueue msgQ [(NoCorrId, NoEntity, ALLS)] - -- TODO [certs rcv] compare with expected - logNote $ "Service subscriptions for " <> tshow serviceId <> " (" <> tshow qCnt <> " queues)" - deliverQueueMsg :: (Int, Int, Int, Int) -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO (Int, Int, Int, Int) - deliverQueueMsg (!qCnt, !msgCnt, !dupCnt, !errCnt) rId = \case - Left e -> pure (qCnt + 1, msgCnt, dupCnt, errCnt + 1) -- TODO [certs rcv] deliver subscription error + deliverServiceMessages stats expectedCnt = do + foldRcvServiceMessages ms serviceId deliverQueueMsg (0, 0, 0, [(NoCorrId, NoEntity, ALLS)]) >>= \case + Right (qCnt, msgCnt, dupCnt, evts) -> do + atomically $ writeTBQueue msgQ evts + atomicModifyIORef'_ (rcvServicesSubMsg stats) (+ msgCnt) + atomicModifyIORef'_ (rcvServicesSubDuplicate stats) (+ dupCnt) + let logMsg = "Subscribed service " <> tshow serviceId <> " (" + if qCnt == expectedCnt + then logNote $ logMsg <> tshow qCnt <> " queues)" + else logError $ logMsg <> "expected " <> tshow expectedCnt <> "," <> tshow qCnt <> " queues)" + Left e -> do + logError $ "Service subscription error for " <> tshow serviceId <> ": " <> tshow e + atomically $ writeTBQueue msgQ [(NoCorrId, NoEntity, ERR e)] + deliverQueueMsg :: (Int64, Int, Int, NonEmpty (Transmission BrokerMsg)) -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO (Int64, Int, Int, NonEmpty (Transmission BrokerMsg)) + deliverQueueMsg (!qCnt, !msgCnt, !dupCnt, evts) rId = \case + Left e -> pure (qCnt + 1, msgCnt, dupCnt, (NoCorrId, rId, ERR e) <| evts) Right qMsg_ -> case qMsg_ of - Nothing -> pure (qCnt + 1, msgCnt, dupCnt, errCnt) + Nothing -> pure (qCnt + 1, msgCnt, dupCnt, evts) Just (qr, msg) -> atomically (getSubscription rId) >>= \case - Nothing -> pure (qCnt + 1, msgCnt, dupCnt + 1, errCnt) + Nothing -> pure (qCnt + 1, msgCnt, dupCnt + 1, evts) Just sub -> do ts <- getSystemSeconds atomically $ setDelivered sub msg ts atomically $ writeTBQueue msgQ [(NoCorrId, rId, MSG (encryptMsg qr msg))] - pure (qCnt + 1, msgCnt + 1, dupCnt, errCnt) + pure (qCnt + 1, msgCnt + 1, dupCnt, evts) getSubscription rId = TM.lookup rId (subscriptions clnt) >>= \case -- If delivery subscription already exists, then there is no need to deliver message. @@ -1836,28 +1845,28 @@ client subscribeServiceNotifications serviceId expected = either ERR (uncurry SOKS . snd) <$> sharedSubscribeService SNotifierService serviceId expected ntfSubscribers ntfServiceSubscribed ntfServiceSubsCount ntfServices - sharedSubscribeService :: (PartyI p, ServiceParty p) => SParty p -> ServiceId -> (Int64, IdsHash) -> ServerSubscribers s -> (Client s -> TVar Bool) -> (Client s -> TVar Int64) -> (ServerStats -> ServiceStats) -> M s (Either ErrorType (Bool, (Int64, IdsHash))) + sharedSubscribeService :: (PartyI p, ServiceParty p) => SParty p -> ServiceId -> (Int64, IdsHash) -> ServerSubscribers s -> (Client s -> TVar Bool) -> (Client s -> TVar (Int64, IdsHash)) -> (ServerStats -> ServiceStats) -> M s (Either ErrorType (Bool, (Int64, IdsHash))) sharedSubscribeService party serviceId (count, idsHash) srvSubscribers clientServiceSubscribed clientServiceSubs servicesSel = do subscribed <- readTVarIO $ clientServiceSubscribed clnt stats <- asks serverStats liftIO $ runExceptT $ (subscribed,) <$> if subscribed - then (,mempty) <$> readTVarIO (clientServiceSubs clnt) -- TODO [certs rcv] get IDs hash + then readTVarIO $ clientServiceSubs clnt else do - (count', idsHash') <- ExceptT $ getServiceQueueCountHash @(StoreQueue s) (queueStore ms) party serviceId - incCount <- atomically $ do + subs'@(count', idsHash') <- ExceptT $ getServiceQueueCountHash @(StoreQueue s) (queueStore ms) party serviceId + subsChange <- atomically $ do writeTVar (clientServiceSubscribed clnt) True - currCount <- swapTVar (clientServiceSubs clnt) count' -- TODO [certs rcv] maintain IDs hash here? - pure $ count' - currCount + currSubs <- swapTVar (clientServiceSubs clnt) subs' + pure $ subtractServiceSubs currSubs subs' let incSrvStat sel n = liftIO $ atomicModifyIORef'_ (sel $ servicesSel stats) (+ n) diff = fromIntegral $ count' - count - if -- TODO [certs rcv] account for not provided counts/hashes (expected n = -1) - | diff == 0 && idsHash == idsHash' -> incSrvStat srvSubOk 1 + if -- `count == -1` only for subscriptions by old NTF servers + | count == -1 && (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1 | diff > 0 -> incSrvStat srvSubMore 1 >> incSrvStat srvSubMoreTotal diff | diff < 0 -> incSrvStat srvSubFewer 1 >> incSrvStat srvSubFewerTotal (- diff) | otherwise -> incSrvStat srvSubDiff 1 - atomically $ writeTQueue (subQ srvSubscribers) (CSService serviceId incCount, clientId) + atomically $ writeTQueue (subQ srvSubscribers) (CSService serviceId subsChange, clientId) pure (count', idsHash') acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg) @@ -2133,7 +2142,7 @@ client -- we delete subscription here, so the client with no subscriptions can be disconnected. sub <- atomically $ TM.lookupDelete entId $ subscriptions clnt liftIO $ mapM_ cancelSub sub - when (isJust rcvServiceId) $ atomically $ modifyTVar' (serviceSubsCount clnt) $ \n -> max 0 (n - 1) + when (isJust rcvServiceId) $ atomically $ modifyTVar' (serviceSubsCount clnt) $ subtractServiceSubs (1, queueIdHash (recipientId q)) atomically $ writeTQueue (subQ subscribers) (CSDeleted entId rcvServiceId, clientId) forM_ (notifier qr) $ \NtfCreds {notifierId = nId, ntfServiceId} -> do -- queue is deleted by a different client from the one subscribed to notifications, diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 24cd6dfcc..02cf136c7 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -363,7 +363,7 @@ data ServerSubscribers s = ServerSubscribers { subQ :: TQueue (ClientSub, ClientId), queueSubscribers :: SubscribedClients s, serviceSubscribers :: SubscribedClients s, -- service clients with long-term certificates that have subscriptions - totalServiceSubs :: TVar Int64, + totalServiceSubs :: TVar (Int64, IdsHash), subClients :: TVar IntSet, -- clients with individual or service subscriptions pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg))) } @@ -426,7 +426,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv data ClientSub = CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs | CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs - | CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB + | CSService ServiceId (Int64, IdsHash) -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB newtype ProxyAgent = ProxyAgent { smpAgent :: SMPClientAgent 'Sender @@ -440,8 +440,8 @@ data Client s = Client ntfSubscriptions :: TMap NotifierId (), serviceSubscribed :: TVar Bool, -- set independently of serviceSubsCount, to track whether service subscription command was received ntfServiceSubscribed :: TVar Bool, - serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count - ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count + serviceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count + ntfServiceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)), sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]), msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), @@ -502,7 +502,7 @@ newServerSubscribers = do subQ <- newTQueueIO queueSubscribers <- SubscribedClients <$> TM.emptyIO serviceSubscribers <- SubscribedClients <$> TM.emptyIO - totalServiceSubs <- newTVarIO 0 + totalServiceSubs <- newTVarIO (0, noIdsHash) subClients <- newTVarIO IS.empty pendingEvents <- newTVarIO IM.empty pure ServerSubscribers {subQ, queueSubscribers, serviceSubscribers, totalServiceSubs, subClients, pendingEvents} @@ -513,8 +513,8 @@ newClient clientId qSize clientTHParams createdAt = do ntfSubscriptions <- TM.emptyIO serviceSubscribed <- newTVarIO False ntfServiceSubscribed <- newTVarIO False - serviceSubsCount <- newTVarIO 0 - ntfServiceSubsCount <- newTVarIO 0 + serviceSubsCount <- newTVarIO (0, noIdsHash) + ntfServiceSubsCount <- newTVarIO (0, noIdsHash) rcvQ <- newTBQueueIO qSize sndQ <- newTBQueueIO qSize msgQ <- newTBQueueIO qSize diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 7de966c36..86ff3d4a9 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -18,7 +18,7 @@ module Simplex.Messaging.Server.Main where import Control.Concurrent.STM -import Control.Exception (SomeException, finally, try) +import Control.Exception (finally) import Control.Logger.Simple import Control.Monad import qualified Data.Attoparsec.ByteString.Char8 as A @@ -28,10 +28,8 @@ import Data.Char (isAlpha, isAscii, toUpper) import Data.Either (fromRight) import Data.Functor (($>)) import Data.Ini (Ini, lookupValue, readIniFile) -import Data.Int (Int64) import Data.List (find, isPrefixOf) import qualified Data.List.NonEmpty as L -import qualified Data.Map.Strict as M import Data.Maybe (fromMaybe, isJust, isNothing) import Data.Text (Text) import qualified Data.Text as T @@ -61,14 +59,17 @@ import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupp import Simplex.Messaging.Transport.Client (TransportHost (..), defaultSocksProxy) import Simplex.Messaging.Transport.HTTP2 (httpALPN) import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig) -import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM) +import Simplex.Messaging.Util (eitherToMaybe, ifM) import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist) import System.Exit (exitFailure) import System.FilePath (combine) -import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile) +import System.IO (BufferMode (..), hSetBuffering, stderr, stdout) import Text.Read (readMaybe) #if defined(dbServerPostgres) +import Control.Exception (SomeException, try) +import Data.Int (Int64) +import qualified Data.Map.Strict as M import Data.Semigroup (Sum (..)) import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists) import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue) @@ -79,7 +80,9 @@ import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchIns import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..)) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog (closeStoreLog, logNewService, logCreateQueue, openWriteStoreLog) +import Simplex.Messaging.Util (unlessM) import System.Directory (renameFile) +import System.IO (IOMode (..), withFile) #endif smpServerCLI :: FilePath -> FilePath -> IO () @@ -556,7 +559,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = mkTransportServerConfig (fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini) (Just $ alpnSupportedSMPHandshakes <> httpALPN) - (fromMaybe True $ iniOnOff "TRANSPORT" "accept_service_credentials" ini), -- TODO [certs rcv] remove this option + True, controlPort = eitherToMaybe $ T.unpack <$> lookupValue "TRANSPORT" "control_port" ini, smpAgentCfg = defaultSMPClientAgentConfig diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 89e9f0383..c65660c93 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -444,9 +444,9 @@ instance MsgStoreClass (JournalMsgStore s) where getLoadedQueue :: JournalQueue s -> IO (JournalQueue s) getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms) - foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a + foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a) foldRcvServiceMessages ms serviceId f acc = case queueStore_ ms of - MQStore st -> foldRcvServiceQueues st serviceId f' acc + MQStore st -> fmap Right $ foldRcvServiceQueues st serviceId f' acc where f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>) #if defined(dbServerPostgres) diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs index f3000811b..edf7f481c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -119,9 +119,9 @@ instance MsgStoreClass PostgresMsgStore where toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) = MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues} - foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a + foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a) foldRcvServiceMessages ms serviceId f acc = - withTransaction (dbStore $ queueStore_ ms) $ \db -> + runExceptT $ withDB' "foldRcvServiceMessages" (queueStore_ ms) $ \db -> DB.fold db [sql| diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 24d489acc..f118e007c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -87,10 +87,10 @@ instance MsgStoreClass STMMsgStore where expireOldMessages _tty ms now ttl = withLoadedQueues (queueStore_ ms) $ atomically . expireQueueMsgs ms now (now - ttl) - foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a - foldRcvServiceMessages ms serviceId f= - foldRcvServiceQueues (queueStore_ ms) serviceId $ \a (q, qr) -> - runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>) + foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a) + foldRcvServiceMessages ms serviceId f = fmap Right . foldRcvServiceQueues (queueStore_ ms) serviceId f' + where + f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>) logQueueStates _ = pure () {-# INLINE logQueueStates #-} diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index e186da05a..fc97bbc20 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -45,7 +45,7 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a -- tty, store, now, ttl expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats - foldRcvServiceMessages :: s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a + foldRcvServiceMessages :: s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a) logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () queueStore :: s -> QueueStore s diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index e4d6a2774..1e3c5132d 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -21,7 +21,6 @@ import Simplex.Messaging.Transport (simplexMQVersion) import Simplex.Messaging.Transport.Server (SocketStats (..)) import Simplex.Messaging.Util (tshow) --- TODO [certs rcv] add service subscriptions and count/hash diffs data ServerMetrics = ServerMetrics { statsData :: ServerStatsData, activeQueueCounts :: PeriodStatCounts, @@ -118,6 +117,8 @@ prometheusMetrics sm rtm ts = _pMsgFwdsRecv, _rcvServices, _ntfServices, + _rcvServicesSubMsg, + _rcvServicesSubDuplicate, _qCount, _msgCount, _ntfCount @@ -383,6 +384,14 @@ prometheusMetrics sm rtm ts = \# HELP simplex_smp_ntf_services_queues_count The count of queues associated with notification services.\n\ \# TYPE simplex_smp_ntf_services_queues_count gauge\n\ \simplex_smp_ntf_services_queues_count " <> mshow (ntfServiceQueuesCount entityCounts) <> "\n# ntfServiceQueuesCount\n\ + \\n\ + \# HELP simplex_smp_rcv_services_sub_msg The count of subscribed service queues with messages.\n\ + \# TYPE simplex_smp_rcv_services_sub_msg counter\n\ + \simplex_smp_rcv_services_sub_msg " <> mshow _rcvServicesSubMsg <> "\n# rcvServicesSubMsg\n\ + \\n\ + \# HELP simplex_smp_rcv_services_sub_duplicate The count of duplicate subscribed service queues.\n\ + \# TYPE simplex_smp_rcv_services_sub_duplicate counter\n\ + \simplex_smp_rcv_services_sub_duplicate " <> mshow _rcvServicesSubDuplicate <> "\n# rcvServicesSubDuplicate\n\ \\n" <> showServices _rcvServices "rcv" "receiving" <> showServices _ntfServices "ntf" "notification" @@ -418,6 +427,30 @@ prometheusMetrics sm rtm ts = \# HELP simplex_smp_" <> pfx <> "_services_sub_end Ended subscriptions with " <> name <> " services.\n\ \# TYPE simplex_smp_" <> pfx <> "_services_sub_end gauge\n\ \simplex_smp_" <> pfx <> "_services_sub_end " <> mshow (_srvSubEnd ss) <> "\n# " <> pfx <> ".srvSubEnd\n\ + \\n\ + \# HELP simplex_smp_" <> pfx <> "_services_sub_ok Service subscriptions for " <> name <> " services.\n\ + \# TYPE simplex_smp_" <> pfx <> "_services_sub_ok gauge\n\ + \simplex_smp_" <> pfx <> "_services_sub_ok " <> mshow (_srvSubOk ss) <> "\n# " <> pfx <> ".srvSubOk\n\ + \\n\ + \# HELP simplex_smp_" <> pfx <> "_services_sub_more Service subscriptions for " <> name <> " services with more queues than in the client.\n\ + \# TYPE simplex_smp_" <> pfx <> "_services_sub_more gauge\n\ + \simplex_smp_" <> pfx <> "_services_sub_more " <> mshow (_srvSubMore ss) <> "\n# " <> pfx <> ".srvSubMore\n\ + \\n\ + \# HELP simplex_smp_" <> pfx <> "_services_sub_fewer Service subscriptions for " <> name <> " services with fewer queues than in the client.\n\ + \# TYPE simplex_smp_" <> pfx <> "_services_sub_fewer gauge\n\ + \simplex_smp_" <> pfx <> "_services_sub_fewer " <> mshow (_srvSubFewer ss) <> "\n# " <> pfx <> ".srvSubFewer\n\ + \\n\ + \# HELP simplex_smp_" <> pfx <> "_services_sub_diff Service subscriptions for " <> name <> " services with different hash than in the client.\n\ + \# TYPE simplex_smp_" <> pfx <> "_services_sub_diff gauge\n\ + \simplex_smp_" <> pfx <> "_services_sub_diff " <> mshow (_srvSubDiff ss) <> "\n# " <> pfx <> ".srvSubDiff\n\ + \\n\ + \# HELP simplex_smp_" <> pfx <> "_services_sub_more_total Service subscriptions for " <> name <> " services with more queues than in the client total.\n\ + \# TYPE simplex_smp_" <> pfx <> "_services_sub_more_total gauge\n\ + \simplex_smp_" <> pfx <> "_services_sub_more_total " <> mshow (_srvSubMoreTotal ss) <> "\n# " <> pfx <> ".srvSubMoreTotal\n\ + \\n\ + \# HELP simplex_smp_" <> pfx <> "_services_sub_fewer_total Service subscriptions for " <> name <> " services with fewer queues than in the client total.\n\ + \# TYPE simplex_smp_" <> pfx <> "_services_sub_fewer_total gauge\n\ + \simplex_smp_" <> pfx <> "_services_sub_fewer_total " <> mshow (_srvSubFewerTotal ss) <> "\n# " <> pfx <> ".srvSubFewerTotal\n\ \\n" info = "# Info\n\ diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index eb1ba3b2c..a8c8c040a 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -581,9 +581,9 @@ foldServiceRecs st f = DB.fold_ db "SELECT service_id, service_role, service_cert, service_cert_hash, created_at FROM services" mempty $ \ !acc -> fmap (acc <>) . f . rowToServiceRec -foldRcvServiceQueueRecs :: PostgresQueueStore q -> ServiceId -> (a -> (RecipientId, QueueRec) -> IO a) -> a -> IO a +foldRcvServiceQueueRecs :: PostgresQueueStore q -> ServiceId -> (a -> (RecipientId, QueueRec) -> IO a) -> a -> IO (Either ErrorType a) foldRcvServiceQueueRecs st serviceId f acc = - withTransaction (dbStore st) $ \db -> + runExceptT $ withDB' "foldRcvServiceQueueRecs" st $ \db -> DB.fold db (queueRecQuery <> " WHERE rcv_service_id = ? AND deleted_at IS NULL") (Only serviceId) acc $ \a -> f a . rowToQueueRec foldQueueRecs :: Monoid a => Bool -> Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 8b64db55a..3a236076c 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -63,8 +63,8 @@ data STMQueueStore q = STMQueueStore data STMService = STMService { serviceRec :: ServiceRec, - serviceRcvQueues :: TVar (Set RecipientId, IdsHash), -- TODO [certs rcv] get/maintain hash - serviceNtfQueues :: TVar (Set NotifierId, IdsHash) -- TODO [certs rcv] get/maintain hash + serviceRcvQueues :: TVar (Set RecipientId, IdsHash), + serviceNtfQueues :: TVar (Set NotifierId, IdsHash) } setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO () diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 120fad7b6..613c5e8be 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -86,6 +86,8 @@ data ServerStats = ServerStats pMsgFwdsRecv :: IORef Int, rcvServices :: ServiceStats, ntfServices :: ServiceStats, + rcvServicesSubMsg :: IORef Int, + rcvServicesSubDuplicate :: IORef Int, qCount :: IORef Int, msgCount :: IORef Int, ntfCount :: IORef Int @@ -145,6 +147,8 @@ data ServerStatsData = ServerStatsData _pMsgFwdsRecv :: Int, _ntfServices :: ServiceStatsData, _rcvServices :: ServiceStatsData, + _rcvServicesSubMsg :: Int, + _rcvServicesSubDuplicate :: Int, _qCount :: Int, _msgCount :: Int, _ntfCount :: Int @@ -206,6 +210,8 @@ newServerStats ts = do pMsgFwdsRecv <- newIORef 0 rcvServices <- newServiceStats ntfServices <- newServiceStats + rcvServicesSubMsg <- newIORef 0 + rcvServicesSubDuplicate <- newIORef 0 qCount <- newIORef 0 msgCount <- newIORef 0 ntfCount <- newIORef 0 @@ -264,6 +270,8 @@ newServerStats ts = do pMsgFwdsRecv, rcvServices, ntfServices, + rcvServicesSubMsg, + rcvServicesSubDuplicate, qCount, msgCount, ntfCount @@ -324,6 +332,8 @@ getServerStatsData s = do _pMsgFwdsRecv <- readIORef $ pMsgFwdsRecv s _rcvServices <- getServiceStatsData $ rcvServices s _ntfServices <- getServiceStatsData $ ntfServices s + _rcvServicesSubMsg <- readIORef $ rcvServicesSubMsg s + _rcvServicesSubDuplicate <- readIORef $ rcvServicesSubDuplicate s _qCount <- readIORef $ qCount s _msgCount <- readIORef $ msgCount s _ntfCount <- readIORef $ ntfCount s @@ -382,6 +392,8 @@ getServerStatsData s = do _pMsgFwdsRecv, _rcvServices, _ntfServices, + _rcvServicesSubMsg, + _rcvServicesSubDuplicate, _qCount, _msgCount, _ntfCount @@ -443,6 +455,8 @@ setServerStats s d = do writeIORef (pMsgFwdsRecv s) $! _pMsgFwdsRecv d setServiceStats (rcvServices s) $! _rcvServices d setServiceStats (ntfServices s) $! _ntfServices d + writeIORef (rcvServicesSubMsg s) $! _rcvServicesSubMsg d + writeIORef (rcvServicesSubDuplicate s) $! _rcvServicesSubDuplicate d writeIORef (qCount s) $! _qCount d writeIORef (msgCount s) $! _msgCount d writeIORef (ntfCount s) $! _ntfCount d @@ -636,6 +650,8 @@ instance StrEncoding ServerStatsData where _pMsgFwdsRecv, _rcvServices, _ntfServices, + _rcvServicesSubMsg = 0, + _rcvServicesSubDuplicate = 0, _qCount, _msgCount = 0, _ntfCount = 0 diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index a14118ce4..f1eb1a8bd 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -560,7 +560,6 @@ data SMPClientHandshake = SMPClientHandshake keyHash :: C.KeyHash, -- | pub key to agree shared secret for entity ID encryption, shared secret for command authorization is agreed using per-queue keys. authPubKey :: Maybe C.PublicKeyX25519, - -- TODO [certs rcv] remove proxyServer, as serviceInfo includes it as clientRole -- | Whether connecting client is a proxy server (send from SMP v12). -- This property, if True, disables additional transport encrytion inside TLS. -- (Proxy server connection already has additional encryption, so this layer is not needed there). diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 31967917a..b63e4cb48 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -3693,7 +3693,6 @@ testClientServiceConnection ps = do ("", "", DOWN _ [_]) <- nGet user ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 qIdHash')) <- nGet service qIdHash' `shouldBe` qIdHash - -- TODO [certs rcv] how to integrate service counts into stats withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do ("", "", UP _ [_]) <- nGet user -- Nothing in ServiceSubResult confirms that both counts and IDs hash match