Skip to content

Commit 11ae20e

Browse files
authored
ntf server: use different client certs for each SMP server, remove support for store log (#1681)
* ntf server: remove support for store log * ntf server: use different client certificates for each SMP server
1 parent a1277bf commit 11ae20e

File tree

17 files changed

+322
-673
lines changed

17 files changed

+322
-673
lines changed

simplexmq.cabal

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ library
275275
Simplex.Messaging.Notifications.Server.Store.Migrations
276276
Simplex.Messaging.Notifications.Server.Store.Postgres
277277
Simplex.Messaging.Notifications.Server.Store.Types
278-
Simplex.Messaging.Notifications.Server.StoreLog
279278
Simplex.Messaging.Server.MsgStore.Postgres
280279
Simplex.Messaging.Server.QueueStore.Postgres
281280
Simplex.Messaging.Server.QueueStore.Postgres.Migrations

src/Simplex/FileTransfer/Client.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
module Simplex.FileTransfer.Client where
1313

14+
import qualified Control.Exception as E
1415
import Control.Logger.Simple
1516
import Control.Monad
1617
import Control.Monad.Except
@@ -264,7 +265,7 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {
264265
where
265266
errors =
266267
[ Handler $ \(e :: H.HTTP2Error) -> pure $ Left $ PCENetworkError $ NEConnectError $ displayException e,
267-
Handler $ \(e :: IOException) -> pure $ Left $ PCEIOError e,
268+
Handler $ \(e :: IOException) -> pure $ Left $ PCEIOError $ E.displayException e,
268269
Handler $ \(e :: SomeException) -> pure $ Left $ PCENetworkError $ toNetworkError e
269270
]
270271
download cbState =

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -751,8 +751,8 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
751751
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
752752
updateClientService service smp
753753
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
754-
updateClientService service smp = case (service, smpClientService smp) of
755-
(Just (_, serviceId_), Just THClientService {serviceId}) -> withStore' c $ \db -> do
754+
updateClientService service smp = case (service, smpClientServiceId smp) of
755+
(Just (_, serviceId_), Just serviceId) -> withStore' c $ \db -> do
756756
setClientServiceId db userId srv serviceId
757757
forM_ serviceId_ $ \sId -> when (sId /= serviceId) $ removeRcvServiceAssocs db userId srv
758758
(Just _, Nothing) -> withStore' c $ \db -> deleteClientService db userId srv -- e.g., server version downgrade
@@ -1255,7 +1255,7 @@ protocolClientError protocolError_ host = \case
12551255
PCETransportError e -> BROKER host $ TRANSPORT e
12561256
e@PCECryptoError {} -> INTERNAL $ show e
12571257
PCEServiceUnavailable {} -> BROKER host NO_SERVICE
1258-
PCEIOError e -> BROKER host $ NETWORK $ NEConnectError $ E.displayException e
1258+
PCEIOError e -> BROKER host $ NETWORK $ NEConnectError e
12591259

12601260
-- it is consistent with smpClientServiceError
12611261
clientServiceError :: AgentErrorType -> Bool
@@ -1546,6 +1546,7 @@ processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
15461546
Left e -> case smpErrorClientNotice e of
15471547
Just notice_ -> (failed', subscribed, (rq, notice_) : notices, ignored)
15481548
where
1549+
-- TODO [certs rcv] not used?
15491550
notices' = if isJust notice_ || isJust clientNoticeId then (rq, notice_) : notices else notices
15501551
Nothing
15511552
| temporaryClientError e -> acc
@@ -1678,7 +1679,7 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
16781679
(active, (serviceQs, notices)) <- atomically $ do
16791680
r@(_, (_, notices)) <- ifM
16801681
(activeClientSession c tSess sessId)
1681-
((True,) <$> processSubResults c tSess sessId smpServiceId rs)
1682+
((True,) <$> processSubResults c tSess sessId (smpClientServiceId smp) rs)
16821683
((False, ([], [])) <$ incSMPServerStat' c userId srv connSubIgnored (length rs))
16831684
unless (null notices) $ takeTMVar $ clientNoticesLock c
16841685
pure r
@@ -1704,7 +1705,6 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
17041705
where
17051706
tSess = transportSession' smp
17061707
sessId = sessionId $ thParams smp
1707-
smpServiceId = (\THClientService {serviceId} -> serviceId) <$> smpClientService smp
17081708

17091709
processRcvServiceAssocs :: SMPQueue q => AgentClient -> [q] -> AM' ()
17101710
processRcvServiceAssocs _ [] = pure ()
@@ -1752,7 +1752,7 @@ subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) =
17521752
withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a
17531753
withServiceClient c tSess subscribe =
17541754
withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
1755-
case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of
1755+
case smpClientServiceId smp of
17561756
Just smpServiceId -> subscribe smp smpServiceId
17571757
Nothing -> throwE PCEServiceUnavailable
17581758

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,15 +472,21 @@ toServerService (host, port, kh, serviceId, n, Binary idsHash) =
472472
(SMPServer host port kh, ServiceSub serviceId n (IdsHash idsHash))
473473

474474
setClientServiceId :: DB.Connection -> UserId -> SMPServer -> ServiceId -> IO ()
475-
setClientServiceId db userId srv serviceId =
475+
setClientServiceId db userId (SMPServer h p kh) serviceId =
476476
DB.execute
477477
db
478478
[sql|
479479
UPDATE client_services
480480
SET service_id = ?
481-
WHERE user_id = ? AND host = ? AND port = ?
481+
FROM servers s
482+
WHERE client_services.user_id = ?
483+
AND client_services.host = ?
484+
AND client_services.port = ?
485+
AND s.host = client_services.host
486+
AND s.port = client_services.port
487+
AND COALESCE(client_services.server_key_hash, s.key_hash) = ?
482488
|]
483-
(serviceId, userId, host srv, port srv)
489+
(serviceId, userId, h, p, kh)
484490

485491
deleteClientService :: DB.Connection -> UserId -> SMPServer -> IO ()
486492
deleteClientService db userId (SMPServer h p kh) =
@@ -2307,7 +2313,7 @@ unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe =
23072313
setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO ()
23082314
setRcvServiceAssocs db rqs =
23092315
#if defined(dbPostgres)
2310-
DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN " $ Only $ In (map queueId rqs)
2316+
DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs)
23112317
#else
23122318
DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs
23132319
#endif

src/Simplex/Messaging/Client.hs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ module Simplex.Messaging.Client
5252
subscribeSMPQueuesNtfs,
5353
subscribeService,
5454
smpClientService,
55+
smpClientServiceId,
5556
secureSMPQueue,
5657
secureSndSMPQueue,
5758
proxySecureSndSMPQueue,
@@ -128,7 +129,8 @@ import Control.Applicative ((<|>))
128129
import Control.Concurrent (ThreadId, forkFinally, forkIO, killThread, mkWeakThreadId)
129130
import Control.Concurrent.Async
130131
import Control.Concurrent.STM
131-
import Control.Exception
132+
import Control.Exception (Exception, SomeException)
133+
import qualified Control.Exception as E
132134
import Control.Logger.Simple
133135
import Control.Monad
134136
import Control.Monad.Except
@@ -565,7 +567,7 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS
565567
case chooseTransportHost networkConfig (host srv) of
566568
Right useHost ->
567569
(getCurrentTime >>= mkProtocolClient useHost >>= runClient useTransport useHost)
568-
`catch` \(e :: IOException) -> pure . Left $ PCEIOError e
570+
`E.catch` \(e :: SomeException) -> pure $ Left $ PCEIOError $ E.displayException e
569571
Left e -> pure $ Left e
570572
where
571573
NetworkConfig {tcpConnectTimeout, tcpTimeout, smpPingInterval} = networkConfig
@@ -638,7 +640,7 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS
638640
writeTVar (connected c) True
639641
putTMVar cVar $ Right c'
640642
raceAny_ ([send c' th, process c', receive c' th] <> [monitor c' | smpPingInterval > 0])
641-
`finally` disconnected c'
643+
`E.finally` disconnected c'
642644

643645
send :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
644646
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= sendPending
@@ -765,7 +767,7 @@ data ProtocolClientError err
765767
| -- | Error when cryptographically "signing" the command or when initializing crypto_box.
766768
PCECryptoError C.CryptoError
767769
| -- | IO Error
768-
PCEIOError IOException
770+
PCEIOError String
769771
deriving (Eq, Show, Exception)
770772

771773
type SMPClientError = ProtocolClientError ErrorType
@@ -926,6 +928,10 @@ smpClientService :: SMPClient -> Maybe THClientService
926928
smpClientService = thAuth . thParams >=> clientService
927929
{-# INLINE smpClientService #-}
928930

931+
smpClientServiceId :: SMPClient -> Maybe ServiceId
932+
smpClientServiceId = fmap (\THClientService {serviceId} -> serviceId) . smpClientService
933+
{-# INLINE smpClientServiceId #-}
934+
929935
enablePings :: SMPClient -> IO ()
930936
enablePings ProtocolClient {client_ = PClient {sendPings}} = atomically $ writeTVar sendPings True
931937
{-# INLINE enablePings #-}

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Simplex.Messaging.Client.Agent
1515
( SMPClientAgent (..),
1616
SMPClientAgentConfig (..),
1717
SMPClientAgentEvent (..),
18+
DBService (..),
1819
OwnServer,
1920
defaultSMPClientAgentConfig,
2021
newSMPClientAgent,
@@ -133,6 +134,7 @@ defaultSMPClientAgentConfig =
133134
data SMPClientAgent p = SMPClientAgent
134135
{ agentCfg :: SMPClientAgentConfig,
135136
agentParty :: SParty p,
137+
dbService :: Maybe DBService,
136138
active :: TVar Bool,
137139
startedAt :: UTCTime,
138140
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
@@ -155,8 +157,8 @@ data SMPClientAgent p = SMPClientAgent
155157

156158
type OwnServer = Bool
157159

158-
newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
159-
newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg = do
160+
newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p)
161+
newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} dbService randomDrg = do
160162
active <- newTVarIO True
161163
startedAt <- getCurrentTime
162164
msgQ <- newTBQueueIO msgQSize
@@ -173,6 +175,7 @@ newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize
173175
SMPClientAgent
174176
{ agentCfg,
175177
agentParty,
178+
dbService,
176179
active,
177180
startedAt,
178181
msgQ,
@@ -188,6 +191,11 @@ newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize
188191
workerSeq
189192
}
190193

194+
data DBService = DBService
195+
{ getCredentials :: SMPServer -> IO (Either SMPClientError ServiceCredentials),
196+
updateServiceId :: SMPServer -> Maybe ServiceId -> IO (Either SMPClientError ())
197+
}
198+
191199
-- | Get or create SMP client for SMPServer
192200
getSMPServerClient' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO SMPClient
193201
getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv
@@ -218,7 +226,7 @@ getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worke
218226

219227
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
220228
newSMPClient v = do
221-
r <- connectClient ca srv v `E.catch` (pure . Left . PCEIOError)
229+
r <- connectClient ca srv v `E.catch` \(e :: E.SomeException) -> pure $ Left $ PCEIOError $ E.displayException e
222230
case r of
223231
Right smp -> do
224232
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
@@ -227,8 +235,7 @@ getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worke
227235
atomically $ do
228236
putTMVar (sessionVar v) (Right c)
229237
TM.insert (sessionId $ thParams smp) c smpSessions
230-
let serviceId_ = (\THClientService {serviceId} -> serviceId) <$> smpClientService smp
231-
notify ca $ CAConnected srv serviceId_
238+
notify ca $ CAConnected srv $ smpClientServiceId smp
232239
pure $ Right c
233240
Left e -> do
234241
let ei = persistErrorInterval agentCfg
@@ -249,9 +256,18 @@ isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} =
249256

250257
-- | Run an SMP client for SMPClientVar
251258
connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient)
252-
connectClient ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, randomDrg, startedAt} srv v =
253-
getProtocolClient randomDrg NRMBackground (1, srv, Nothing) (smpCfg agentCfg) [] (Just msgQ) startedAt clientDisconnected
259+
connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, msgQ, randomDrg, startedAt} srv v = case dbService of
260+
Just dbs -> runExceptT $ do
261+
creds <- ExceptT $ getCredentials dbs srv
262+
smp <- ExceptT $ getClient cfg {serviceCredentials = Just creds}
263+
whenM (atomically $ activeClientSession ca smp srv) $
264+
ExceptT $ updateServiceId dbs srv $ smpClientServiceId smp
265+
pure smp
266+
Nothing -> getClient cfg
254267
where
268+
cfg = smpCfg agentCfg
269+
getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just msgQ) startedAt clientDisconnected
270+
255271
clientDisconnected :: SMPClient -> IO ()
256272
clientDisconnected smp = do
257273
removeClientAndSubs smp >>= serverDown
@@ -435,7 +451,7 @@ smpSubscribeQueues ca smp srv subs = do
435451
unless (null notPending) $ removePendingSubs ca srv notPending
436452
pure acc
437453
sessId = sessionId $ thParams smp
438-
smpServiceId = (\THClientService {serviceId} -> serviceId) <$> smpClientService smp
454+
smpServiceId = smpClientServiceId smp
439455
groupSub ::
440456
Map QueueId C.APrivateAuthKey ->
441457
((QueueId, C.APrivateAuthKey), Either SMPClientError (Maybe ServiceId)) ->

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
588588
logError $ "SMP server service subscription error " <> showService srv serviceSub <> ": " <> tshow e
589589
CAServiceUnavailable srv serviceSub -> do
590590
logError $ "SMP server service unavailable: " <> showService srv serviceSub
591-
removeServiceAssociation st srv >>= \case
591+
removeServiceAndAssociations st srv >>= \case
592592
Right (srvId, updated) -> do
593593
logSubStatus srv "removed service association" updated updated
594594
void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing)

0 commit comments

Comments
 (0)