Skip to content

Commit f5eb735

Browse files
authored
servers: service stats and logging, allow services without option (removed), report errors during service message delivery, remove threads when service subscription ended (#1676)
* smp server: always allow services without option * smp server: maintain IDs hash in session subscription states * smp server: service message delivery error handling * ntf server: log subscription count and hash differences * smp server: remove delivery threads when service subscription ended/client disconnected
1 parent 8389407 commit f5eb735

File tree

17 files changed

+147
-79
lines changed

17 files changed

+147
-79
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1539,7 +1539,6 @@ resubscribeConnections' c connIds = do
15391539
[] -> pure True
15401540
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'
15411541

1542-
-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
15431542
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSubResult))
15441543
subscribeClientServices' c userId =
15451544
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -576,9 +576,10 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
576576
-- TODO [certs rcv] resubscribe queues with statuses NSErr and NSService
577577
CAServiceDisconnected srv serviceSub ->
578578
logNote $ "SMP server service disconnected " <> showService srv serviceSub
579-
CAServiceSubscribed srv serviceSub@(ServiceSub _ expected _) (ServiceSub _ n _) -- TODO [certs rcv] compare hash
580-
| expected == n -> logNote msg
581-
| otherwise -> logWarn $ msg <> ", confirmed subs: " <> tshow n
579+
CAServiceSubscribed srv serviceSub@(ServiceSub _ n idsHash) (ServiceSub _ n' idsHash')
580+
| n /= n' -> logWarn $ msg <> ", confirmed subs: " <> tshow n'
581+
| idsHash /= idsHash' -> logWarn $ msg <> ", different IDs hash"
582+
| otherwise -> logNote msg
582583
where
583584
msg = "SMP server service subscribed " <> showService srv serviceSub
584585
CAServiceSubError srv serviceSub e ->
@@ -593,8 +594,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
593594
void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing)
594595
Left e -> logError $ "SMP server update and resubscription error " <> tshow e
595596
where
596-
-- TODO [certs rcv] compare hash
597-
showService srv (ServiceSub serviceId n _idsHash) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs"
597+
showService srv (ServiceSub serviceId n _) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs"
598598

599599
logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int -> IO ()
600600
logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> do

src/Simplex/Messaging/Notifications/Server/Stats.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import Simplex.Messaging.Server.Stats
1717
import Simplex.Messaging.TMap (TMap)
1818
import qualified Simplex.Messaging.TMap as TM
1919

20-
-- TODO [certs rcv] track service subscriptions and count/hash diffs for own and other servers + prometheus
2120
data NtfServerStats = NtfServerStats
2221
{ fromTime :: IORef UTCTime,
2322
tknCreated :: IORef Int,

src/Simplex/Messaging/Protocol.hs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ module Simplex.Messaging.Protocol
147147
serviceSubResult,
148148
queueIdsHash,
149149
queueIdHash,
150+
noIdsHash,
151+
addServiceSubs,
152+
subtractServiceSubs,
150153
MaxMessageLen,
151154
MaxRcvMessageLen,
152155
EncRcvMsgBody (..),
@@ -1526,6 +1529,14 @@ queueIdHash :: QueueId -> IdsHash
15261529
queueIdHash = IdsHash . C.md5Hash . unEntityId
15271530
{-# INLINE queueIdHash #-}
15281531

1532+
addServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash)
1533+
addServiceSubs (n', idsHash') (n, idsHash) = (n + n', idsHash <> idsHash')
1534+
1535+
subtractServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash)
1536+
subtractServiceSubs (n', idsHash') (n, idsHash)
1537+
| n > n' = (n - n', idsHash <> idsHash') -- concat is a reversible xor: (x `xor` y) `xor` y == x
1538+
| otherwise = (0, noIdsHash)
1539+
15291540
data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock
15301541

15311542
-- | Type for protocol errors.

src/Simplex/Messaging/Server.hs

Lines changed: 52 additions & 43 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ data ServerSubscribers s = ServerSubscribers
363363
{ subQ :: TQueue (ClientSub, ClientId),
364364
queueSubscribers :: SubscribedClients s,
365365
serviceSubscribers :: SubscribedClients s, -- service clients with long-term certificates that have subscriptions
366-
totalServiceSubs :: TVar Int64,
366+
totalServiceSubs :: TVar (Int64, IdsHash),
367367
subClients :: TVar IntSet, -- clients with individual or service subscriptions
368368
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
369369
}
@@ -426,7 +426,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
426426
data ClientSub
427427
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
428428
| CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
429-
| CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
429+
| CSService ServiceId (Int64, IdsHash) -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
430430

431431
newtype ProxyAgent = ProxyAgent
432432
{ smpAgent :: SMPClientAgent 'Sender
@@ -440,8 +440,8 @@ data Client s = Client
440440
ntfSubscriptions :: TMap NotifierId (),
441441
serviceSubscribed :: TVar Bool, -- set independently of serviceSubsCount, to track whether service subscription command was received
442442
ntfServiceSubscribed :: TVar Bool,
443-
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
444-
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
443+
serviceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count
444+
ntfServiceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count
445445
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
446446
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
447447
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
@@ -502,7 +502,7 @@ newServerSubscribers = do
502502
subQ <- newTQueueIO
503503
queueSubscribers <- SubscribedClients <$> TM.emptyIO
504504
serviceSubscribers <- SubscribedClients <$> TM.emptyIO
505-
totalServiceSubs <- newTVarIO 0
505+
totalServiceSubs <- newTVarIO (0, noIdsHash)
506506
subClients <- newTVarIO IS.empty
507507
pendingEvents <- newTVarIO IM.empty
508508
pure ServerSubscribers {subQ, queueSubscribers, serviceSubscribers, totalServiceSubs, subClients, pendingEvents}
@@ -513,8 +513,8 @@ newClient clientId qSize clientTHParams createdAt = do
513513
ntfSubscriptions <- TM.emptyIO
514514
serviceSubscribed <- newTVarIO False
515515
ntfServiceSubscribed <- newTVarIO False
516-
serviceSubsCount <- newTVarIO 0
517-
ntfServiceSubsCount <- newTVarIO 0
516+
serviceSubsCount <- newTVarIO (0, noIdsHash)
517+
ntfServiceSubsCount <- newTVarIO (0, noIdsHash)
518518
rcvQ <- newTBQueueIO qSize
519519
sndQ <- newTBQueueIO qSize
520520
msgQ <- newTBQueueIO qSize

src/Simplex/Messaging/Server/Main.hs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
module Simplex.Messaging.Server.Main where
1919

2020
import Control.Concurrent.STM
21-
import Control.Exception (SomeException, finally, try)
21+
import Control.Exception (finally)
2222
import Control.Logger.Simple
2323
import Control.Monad
2424
import qualified Data.Attoparsec.ByteString.Char8 as A
@@ -28,10 +28,8 @@ import Data.Char (isAlpha, isAscii, toUpper)
2828
import Data.Either (fromRight)
2929
import Data.Functor (($>))
3030
import Data.Ini (Ini, lookupValue, readIniFile)
31-
import Data.Int (Int64)
3231
import Data.List (find, isPrefixOf)
3332
import qualified Data.List.NonEmpty as L
34-
import qualified Data.Map.Strict as M
3533
import Data.Maybe (fromMaybe, isJust, isNothing)
3634
import Data.Text (Text)
3735
import qualified Data.Text as T
@@ -61,14 +59,17 @@ import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupp
6159
import Simplex.Messaging.Transport.Client (TransportHost (..), defaultSocksProxy)
6260
import Simplex.Messaging.Transport.HTTP2 (httpALPN)
6361
import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig)
64-
import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM)
62+
import Simplex.Messaging.Util (eitherToMaybe, ifM)
6563
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
6664
import System.Exit (exitFailure)
6765
import System.FilePath (combine)
68-
import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile)
66+
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
6967
import Text.Read (readMaybe)
7068

7169
#if defined(dbServerPostgres)
70+
import Control.Exception (SomeException, try)
71+
import Data.Int (Int64)
72+
import qualified Data.Map.Strict as M
7273
import Data.Semigroup (Sum (..))
7374
import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists)
7475
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
@@ -79,7 +80,9 @@ import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchIns
7980
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..))
8081
import Simplex.Messaging.Server.QueueStore.Types
8182
import Simplex.Messaging.Server.StoreLog (closeStoreLog, logNewService, logCreateQueue, openWriteStoreLog)
83+
import Simplex.Messaging.Util (unlessM)
8284
import System.Directory (renameFile)
85+
import System.IO (IOMode (..), withFile)
8386
#endif
8487

8588
smpServerCLI :: FilePath -> FilePath -> IO ()
@@ -556,7 +559,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
556559
mkTransportServerConfig
557560
(fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini)
558561
(Just $ alpnSupportedSMPHandshakes <> httpALPN)
559-
(fromMaybe True $ iniOnOff "TRANSPORT" "accept_service_credentials" ini), -- TODO [certs rcv] remove this option
562+
True,
560563
controlPort = eitherToMaybe $ T.unpack <$> lookupValue "TRANSPORT" "control_port" ini,
561564
smpAgentCfg =
562565
defaultSMPClientAgentConfig

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,9 +444,9 @@ instance MsgStoreClass (JournalMsgStore s) where
444444
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
445445
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)
446446

447-
foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
447+
foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
448448
foldRcvServiceMessages ms serviceId f acc = case queueStore_ ms of
449-
MQStore st -> foldRcvServiceQueues st serviceId f' acc
449+
MQStore st -> fmap Right $ foldRcvServiceQueues st serviceId f' acc
450450
where
451451
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
452452
#if defined(dbServerPostgres)

src/Simplex/Messaging/Server/MsgStore/Postgres.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ instance MsgStoreClass PostgresMsgStore where
119119
toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) =
120120
MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues}
121121

122-
foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
122+
foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
123123
foldRcvServiceMessages ms serviceId f acc =
124-
withTransaction (dbStore $ queueStore_ ms) $ \db ->
124+
runExceptT $ withDB' "foldRcvServiceMessages" (queueStore_ ms) $ \db ->
125125
DB.fold
126126
db
127127
[sql|

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ instance MsgStoreClass STMMsgStore where
8787
expireOldMessages _tty ms now ttl =
8888
withLoadedQueues (queueStore_ ms) $ atomically . expireQueueMsgs ms now (now - ttl)
8989

90-
foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
91-
foldRcvServiceMessages ms serviceId f=
92-
foldRcvServiceQueues (queueStore_ ms) serviceId $ \a (q, qr) ->
93-
runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
90+
foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
91+
foldRcvServiceMessages ms serviceId f = fmap Right . foldRcvServiceQueues (queueStore_ ms) serviceId f'
92+
where
93+
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
9494

9595
logQueueStates _ = pure ()
9696
{-# INLINE logQueueStates #-}

0 commit comments

Comments
 (0)