Skip to content
Merged
1 change: 0 additions & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1539,7 +1539,6 @@ resubscribeConnections' c connIds = do
[] -> pure True
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'

-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSubResult))
subscribeClientServices' c userId =
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
Expand Down
10 changes: 5 additions & 5 deletions src/Simplex/Messaging/Notifications/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,10 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
-- TODO [certs rcv] resubscribe queues with statuses NSErr and NSService
CAServiceDisconnected srv serviceSub ->
logNote $ "SMP server service disconnected " <> showService srv serviceSub
CAServiceSubscribed srv serviceSub@(ServiceSub _ expected _) (ServiceSub _ n _) -- TODO [certs rcv] compare hash
| expected == n -> logNote msg
| otherwise -> logWarn $ msg <> ", confirmed subs: " <> tshow n
CAServiceSubscribed srv serviceSub@(ServiceSub _ n idsHash) (ServiceSub _ n' idsHash')
| n /= n' -> logWarn $ msg <> ", confirmed subs: " <> tshow n'
| idsHash /= idsHash' -> logWarn $ msg <> ", different IDs hash"
| otherwise -> logNote msg
where
msg = "SMP server service subscribed " <> showService srv serviceSub
CAServiceSubError srv serviceSub e ->
Expand All @@ -593,8 +594,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing)
Left e -> logError $ "SMP server update and resubscription error " <> tshow e
where
-- TODO [certs rcv] compare hash
showService srv (ServiceSub serviceId n _idsHash) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs"
showService srv (ServiceSub serviceId n _) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs"

logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int -> IO ()
logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> do
Expand Down
1 change: 0 additions & 1 deletion src/Simplex/Messaging/Notifications/Server/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Simplex.Messaging.Server.Stats
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM

-- TODO [certs rcv] track service subscriptions and count/hash diffs for own and other servers + prometheus
data NtfServerStats = NtfServerStats
{ fromTime :: IORef UTCTime,
tknCreated :: IORef Int,
Expand Down
11 changes: 11 additions & 0 deletions src/Simplex/Messaging/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ module Simplex.Messaging.Protocol
serviceSubResult,
queueIdsHash,
queueIdHash,
noIdsHash,
addServiceSubs,
subtractServiceSubs,
MaxMessageLen,
MaxRcvMessageLen,
EncRcvMsgBody (..),
Expand Down Expand Up @@ -1526,6 +1529,14 @@ queueIdHash :: QueueId -> IdsHash
queueIdHash = IdsHash . C.md5Hash . unEntityId
{-# INLINE queueIdHash #-}

addServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash)
addServiceSubs (n', idsHash') (n, idsHash) = (n + n', idsHash <> idsHash')

subtractServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash)
subtractServiceSubs (n', idsHash') (n, idsHash)
| n > n' = (n - n', idsHash <> idsHash') -- concat is a reversible xor: (x `xor` y) `xor` y == x
| otherwise = (0, noIdsHash)

data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock

-- | Type for protocol errors.
Expand Down
95 changes: 52 additions & 43 deletions src/Simplex/Messaging/Server.hs

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ data ServerSubscribers s = ServerSubscribers
{ subQ :: TQueue (ClientSub, ClientId),
queueSubscribers :: SubscribedClients s,
serviceSubscribers :: SubscribedClients s, -- service clients with long-term certificates that have subscriptions
totalServiceSubs :: TVar Int64,
totalServiceSubs :: TVar (Int64, IdsHash),
subClients :: TVar IntSet, -- clients with individual or service subscriptions
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
}
Expand Down Expand Up @@ -426,7 +426,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
data ClientSub
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
| CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
| CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
| CSService ServiceId (Int64, IdsHash) -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB

newtype ProxyAgent = ProxyAgent
{ smpAgent :: SMPClientAgent 'Sender
Expand All @@ -440,8 +440,8 @@ data Client s = Client
ntfSubscriptions :: TMap NotifierId (),
serviceSubscribed :: TVar Bool, -- set independently of serviceSubsCount, to track whether service subscription command was received
ntfServiceSubscribed :: TVar Bool,
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
serviceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count
ntfServiceSubsCount :: TVar (Int64, IdsHash), -- only one service can be subscribed, based on its certificate, this is subscription count
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
Expand Down Expand Up @@ -502,7 +502,7 @@ newServerSubscribers = do
subQ <- newTQueueIO
queueSubscribers <- SubscribedClients <$> TM.emptyIO
serviceSubscribers <- SubscribedClients <$> TM.emptyIO
totalServiceSubs <- newTVarIO 0
totalServiceSubs <- newTVarIO (0, noIdsHash)
subClients <- newTVarIO IS.empty
pendingEvents <- newTVarIO IM.empty
pure ServerSubscribers {subQ, queueSubscribers, serviceSubscribers, totalServiceSubs, subClients, pendingEvents}
Expand All @@ -513,8 +513,8 @@ newClient clientId qSize clientTHParams createdAt = do
ntfSubscriptions <- TM.emptyIO
serviceSubscribed <- newTVarIO False
ntfServiceSubscribed <- newTVarIO False
serviceSubsCount <- newTVarIO 0
ntfServiceSubsCount <- newTVarIO 0
serviceSubsCount <- newTVarIO (0, noIdsHash)
ntfServiceSubsCount <- newTVarIO (0, noIdsHash)
rcvQ <- newTBQueueIO qSize
sndQ <- newTBQueueIO qSize
msgQ <- newTBQueueIO qSize
Expand Down
15 changes: 9 additions & 6 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
module Simplex.Messaging.Server.Main where

import Control.Concurrent.STM
import Control.Exception (SomeException, finally, try)
import Control.Exception (finally)
import Control.Logger.Simple
import Control.Monad
import qualified Data.Attoparsec.ByteString.Char8 as A
Expand All @@ -28,10 +28,8 @@ import Data.Char (isAlpha, isAscii, toUpper)
import Data.Either (fromRight)
import Data.Functor (($>))
import Data.Ini (Ini, lookupValue, readIniFile)
import Data.Int (Int64)
import Data.List (find, isPrefixOf)
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, isNothing)
import Data.Text (Text)
import qualified Data.Text as T
Expand Down Expand Up @@ -61,14 +59,17 @@ import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupp
import Simplex.Messaging.Transport.Client (TransportHost (..), defaultSocksProxy)
import Simplex.Messaging.Transport.HTTP2 (httpALPN)
import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig)
import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM)
import Simplex.Messaging.Util (eitherToMaybe, ifM)
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
import System.Exit (exitFailure)
import System.FilePath (combine)
import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile)
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
import Text.Read (readMaybe)

#if defined(dbServerPostgres)
import Control.Exception (SomeException, try)
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Semigroup (Sum (..))
import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists)
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
Expand All @@ -79,7 +80,9 @@ import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchIns
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..))
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog (closeStoreLog, logNewService, logCreateQueue, openWriteStoreLog)
import Simplex.Messaging.Util (unlessM)
import System.Directory (renameFile)
import System.IO (IOMode (..), withFile)
#endif

smpServerCLI :: FilePath -> FilePath -> IO ()
Expand Down Expand Up @@ -556,7 +559,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
mkTransportServerConfig
(fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini)
(Just $ alpnSupportedSMPHandshakes <> httpALPN)
(fromMaybe True $ iniOnOff "TRANSPORT" "accept_service_credentials" ini), -- TODO [certs rcv] remove this option
True,
controlPort = eitherToMaybe $ T.unpack <$> lookupValue "TRANSPORT" "control_port" ini,
smpAgentCfg =
defaultSMPClientAgentConfig
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ instance MsgStoreClass (JournalMsgStore s) where
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)

foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
foldRcvServiceMessages ms serviceId f acc = case queueStore_ ms of
MQStore st -> foldRcvServiceQueues st serviceId f' acc
MQStore st -> fmap Right $ foldRcvServiceQueues st serviceId f' acc
where
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
#if defined(dbServerPostgres)
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/MsgStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ instance MsgStoreClass PostgresMsgStore where
toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) =
MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues}

foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
foldRcvServiceMessages ms serviceId f acc =
withTransaction (dbStore $ queueStore_ ms) $ \db ->
runExceptT $ withDB' "foldRcvServiceMessages" (queueStore_ ms) $ \db ->
DB.fold
db
[sql|
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ instance MsgStoreClass STMMsgStore where
expireOldMessages _tty ms now ttl =
withLoadedQueues (queueStore_ ms) $ atomically . expireQueueMsgs ms now (now - ttl)

foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
foldRcvServiceMessages ms serviceId f=
foldRcvServiceQueues (queueStore_ ms) serviceId $ \a (q, qr) ->
runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
foldRcvServiceMessages ms serviceId f = fmap Right . foldRcvServiceQueues (queueStore_ ms) serviceId f'
where
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)

logQueueStates _ = pure ()
{-# INLINE logQueueStates #-}
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
-- tty, store, now, ttl
expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
foldRcvServiceMessages :: s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
foldRcvServiceMessages :: s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueStore :: s -> QueueStore s
Expand Down
35 changes: 34 additions & 1 deletion src/Simplex/Messaging/Server/Prometheus.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import Simplex.Messaging.Transport (simplexMQVersion)
import Simplex.Messaging.Transport.Server (SocketStats (..))
import Simplex.Messaging.Util (tshow)

-- TODO [certs rcv] add service subscriptions and count/hash diffs
data ServerMetrics = ServerMetrics
{ statsData :: ServerStatsData,
activeQueueCounts :: PeriodStatCounts,
Expand Down Expand Up @@ -118,6 +117,8 @@ prometheusMetrics sm rtm ts =
_pMsgFwdsRecv,
_rcvServices,
_ntfServices,
_rcvServicesSubMsg,
_rcvServicesSubDuplicate,
_qCount,
_msgCount,
_ntfCount
Expand Down Expand Up @@ -383,6 +384,14 @@ prometheusMetrics sm rtm ts =
\# HELP simplex_smp_ntf_services_queues_count The count of queues associated with notification services.\n\
\# TYPE simplex_smp_ntf_services_queues_count gauge\n\
\simplex_smp_ntf_services_queues_count " <> mshow (ntfServiceQueuesCount entityCounts) <> "\n# ntfServiceQueuesCount\n\
\\n\
\# HELP simplex_smp_rcv_services_sub_msg The count of subscribed service queues with messages.\n\
\# TYPE simplex_smp_rcv_services_sub_msg counter\n\
\simplex_smp_rcv_services_sub_msg " <> mshow _rcvServicesSubMsg <> "\n# rcvServicesSubMsg\n\
\\n\
\# HELP simplex_smp_rcv_services_sub_duplicate The count of duplicate subscribed service queues.\n\
\# TYPE simplex_smp_rcv_services_sub_duplicate counter\n\
\simplex_smp_rcv_services_sub_duplicate " <> mshow _rcvServicesSubDuplicate <> "\n# rcvServicesSubDuplicate\n\
\\n"
<> showServices _rcvServices "rcv" "receiving"
<> showServices _ntfServices "ntf" "notification"
Expand Down Expand Up @@ -418,6 +427,30 @@ prometheusMetrics sm rtm ts =
\# HELP simplex_smp_" <> pfx <> "_services_sub_end Ended subscriptions with " <> name <> " services.\n\
\# TYPE simplex_smp_" <> pfx <> "_services_sub_end gauge\n\
\simplex_smp_" <> pfx <> "_services_sub_end " <> mshow (_srvSubEnd ss) <> "\n# " <> pfx <> ".srvSubEnd\n\
\\n\
\# HELP simplex_smp_" <> pfx <> "_services_sub_ok Service subscriptions for " <> name <> " services.\n\
\# TYPE simplex_smp_" <> pfx <> "_services_sub_ok gauge\n\
\simplex_smp_" <> pfx <> "_services_sub_ok " <> mshow (_srvSubOk ss) <> "\n# " <> pfx <> ".srvSubOk\n\
\\n\
\# HELP simplex_smp_" <> pfx <> "_services_sub_more Service subscriptions for " <> name <> " services with more queues than in the client.\n\
\# TYPE simplex_smp_" <> pfx <> "_services_sub_more gauge\n\
\simplex_smp_" <> pfx <> "_services_sub_more " <> mshow (_srvSubMore ss) <> "\n# " <> pfx <> ".srvSubMore\n\
\\n\
\# HELP simplex_smp_" <> pfx <> "_services_sub_fewer Service subscriptions for " <> name <> " services with fewer queues than in the client.\n\
\# TYPE simplex_smp_" <> pfx <> "_services_sub_fewer gauge\n\
\simplex_smp_" <> pfx <> "_services_sub_fewer " <> mshow (_srvSubFewer ss) <> "\n# " <> pfx <> ".srvSubFewer\n\
\\n\
\# HELP simplex_smp_" <> pfx <> "_services_sub_diff Service subscriptions for " <> name <> " services with different hash than in the client.\n\
\# TYPE simplex_smp_" <> pfx <> "_services_sub_diff gauge\n\
\simplex_smp_" <> pfx <> "_services_sub_diff " <> mshow (_srvSubDiff ss) <> "\n# " <> pfx <> ".srvSubDiff\n\
\\n\
\# HELP simplex_smp_" <> pfx <> "_services_sub_more_total Service subscriptions for " <> name <> " services with more queues than in the client total.\n\
\# TYPE simplex_smp_" <> pfx <> "_services_sub_more_total gauge\n\
\simplex_smp_" <> pfx <> "_services_sub_more_total " <> mshow (_srvSubMoreTotal ss) <> "\n# " <> pfx <> ".srvSubMoreTotal\n\
\\n\
\# HELP simplex_smp_" <> pfx <> "_services_sub_fewer_total Service subscriptions for " <> name <> " services with fewer queues than in the client total.\n\
\# TYPE simplex_smp_" <> pfx <> "_services_sub_fewer_total gauge\n\
\simplex_smp_" <> pfx <> "_services_sub_fewer_total " <> mshow (_srvSubFewerTotal ss) <> "\n# " <> pfx <> ".srvSubFewerTotal\n\
\\n"
info =
"# Info\n\
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/QueueStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,9 @@ foldServiceRecs st f =
DB.fold_ db "SELECT service_id, service_role, service_cert, service_cert_hash, created_at FROM services" mempty $
\ !acc -> fmap (acc <>) . f . rowToServiceRec

foldRcvServiceQueueRecs :: PostgresQueueStore q -> ServiceId -> (a -> (RecipientId, QueueRec) -> IO a) -> a -> IO a
foldRcvServiceQueueRecs :: PostgresQueueStore q -> ServiceId -> (a -> (RecipientId, QueueRec) -> IO a) -> a -> IO (Either ErrorType a)
foldRcvServiceQueueRecs st serviceId f acc =
withTransaction (dbStore st) $ \db ->
runExceptT $ withDB' "foldRcvServiceQueueRecs" st $ \db ->
DB.fold db (queueRecQuery <> " WHERE rcv_service_id = ? AND deleted_at IS NULL") (Only serviceId) acc $ \a -> f a . rowToQueueRec

foldQueueRecs :: Monoid a => Bool -> Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/QueueStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ data STMQueueStore q = STMQueueStore

data STMService = STMService
{ serviceRec :: ServiceRec,
serviceRcvQueues :: TVar (Set RecipientId, IdsHash), -- TODO [certs rcv] get/maintain hash
serviceNtfQueues :: TVar (Set NotifierId, IdsHash) -- TODO [certs rcv] get/maintain hash
serviceRcvQueues :: TVar (Set RecipientId, IdsHash),
serviceNtfQueues :: TVar (Set NotifierId, IdsHash)
}

setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO ()
Expand Down
Loading
Loading