@@ -45,7 +45,6 @@ import Crypto.Random (ChaChaDRG)
4545import Data.ByteString.Char8 (ByteString )
4646import qualified Data.ByteString.Char8 as B
4747import Data.Constraint (Dict (.. ))
48- import Data.Int (Int64 )
4948import Data.List.NonEmpty (NonEmpty )
5049import qualified Data.List.NonEmpty as L
5150import Data.Map.Strict (Map )
@@ -69,10 +68,12 @@ import Simplex.Messaging.Protocol
6968 ProtocolServer (.. ),
7069 QueueId ,
7170 SMPServer ,
71+ ServiceSub (.. ),
7272 SParty (.. ),
7373 ServiceParty ,
7474 serviceParty ,
75- partyServiceRole
75+ partyServiceRole ,
76+ queueIdsHash ,
7677 )
7778import Simplex.Messaging.Session
7879import Simplex.Messaging.TMap (TMap )
@@ -91,14 +92,14 @@ data SMPClientAgentEvent
9192 | CADisconnected SMPServer (NonEmpty QueueId )
9293 | CASubscribed SMPServer (Maybe ServiceId ) (NonEmpty QueueId )
9394 | CASubError SMPServer (NonEmpty (QueueId , SMPClientError ))
94- | CAServiceDisconnected SMPServer ( ServiceId , Int64 )
95- | CAServiceSubscribed SMPServer ( ServiceId , Int64 ) Int64
96- | CAServiceSubError SMPServer ( ServiceId , Int64 ) SMPClientError
95+ | CAServiceDisconnected SMPServer ServiceSub
96+ | CAServiceSubscribed { subServer :: SMPServer , expected :: ServiceSub , subscribed :: ServiceSub }
97+ | CAServiceSubError SMPServer ServiceSub SMPClientError
9798 -- CAServiceUnavailable is used when service ID in pending subscription is different from the current service in connection.
9899 -- This will require resubscribing to all queues associated with this service ID individually, creating new associations.
99100 -- It may happen if, for example, SMP server deletes service information (e.g. via downgrade and upgrade)
100101 -- and assigns different service ID to the service certificate.
101- | CAServiceUnavailable SMPServer ( ServiceId , Int64 )
102+ | CAServiceUnavailable SMPServer ServiceSub
102103
103104data SMPClientAgentConfig = SMPClientAgentConfig
104105 { smpCfg :: ProtocolClientConfig SMPVersion ,
@@ -142,11 +143,11 @@ data SMPClientAgent p = SMPClientAgent
142143 -- Only one service subscription can exist per server with this agent.
143144 -- With correctly functioning SMP server, queue and service subscriptions can't be
144145 -- active at the same time.
145- activeServiceSubs :: TMap SMPServer (TVar (Maybe (( ServiceId , Int64 ) , SessionId ))),
146+ activeServiceSubs :: TMap SMPServer (TVar (Maybe (ServiceSub , SessionId ))),
146147 activeQueueSubs :: TMap SMPServer (TMap QueueId (SessionId , C. APrivateAuthKey )),
147148 -- Pending service subscriptions can co-exist with pending queue subscriptions
148149 -- on the same SMP server during subscriptions being transitioned from per-queue to service.
149- pendingServiceSubs :: TMap SMPServer (TVar (Maybe ( ServiceId , Int64 ) )),
150+ pendingServiceSubs :: TMap SMPServer (TVar (Maybe ServiceSub )),
150151 pendingQueueSubs :: TMap SMPServer (TMap QueueId C. APrivateAuthKey ),
151152 smpSubWorkers :: TMap SMPServer (SessionVar (Async () )),
152153 workerSeq :: TVar Int
@@ -256,7 +257,7 @@ connectClient ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, random
256257 removeClientAndSubs smp >>= serverDown
257258 logInfo . decodeUtf8 $ " Agent disconnected from " <> showServer srv
258259
259- removeClientAndSubs :: SMPClient -> IO (Maybe ( ServiceId , Int64 ) , Maybe (Map QueueId C. APrivateAuthKey ))
260+ removeClientAndSubs :: SMPClient -> IO (Maybe ServiceSub , Maybe (Map QueueId C. APrivateAuthKey ))
260261 removeClientAndSubs smp = do
261262 -- Looking up subscription vars outside of STM transaction to reduce re-evaluation.
262263 -- It is possible because these vars are never removed, they are only added.
@@ -287,7 +288,7 @@ connectClient ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, random
287288 then pure Nothing
288289 else Just subs <$ addSubs_ (pendingQueueSubs ca) srv subs
289290
290- serverDown :: (Maybe ( ServiceId , Int64 ) , Maybe (Map QueueId C. APrivateAuthKey )) -> IO ()
291+ serverDown :: (Maybe ServiceSub , Maybe (Map QueueId C. APrivateAuthKey )) -> IO ()
291292 serverDown (sSub, qSubs) = do
292293 mapM_ (notify ca . CAServiceDisconnected srv) sSub
293294 let qIds = L. nonEmpty . M. keys =<< qSubs
@@ -317,7 +318,7 @@ reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} s
317318 loop
318319 ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
319320 noPending (sSub, qSubs) = isNothing sSub && maybe True M. null qSubs
320- getPending :: Monad m => (forall a . SMPServer -> TMap SMPServer a -> m (Maybe a )) -> (forall a . TVar a -> m a ) -> m (Maybe ( ServiceId , Int64 ) , Maybe (Map QueueId C. APrivateAuthKey ))
321+ getPending :: Monad m => (forall a . SMPServer -> TMap SMPServer a -> m (Maybe a )) -> (forall a . TVar a -> m a ) -> m (Maybe ServiceSub , Maybe (Map QueueId C. APrivateAuthKey ))
321322 getPending lkup rd = do
322323 sSub <- lkup srv (pendingServiceSubs ca) $>>= rd
323324 qSubs <- lkup srv (pendingQueueSubs ca) >>= mapM rd
@@ -329,7 +330,7 @@ reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} s
329330 whenM (isEmptyTMVar $ sessionVar v) retry
330331 removeSessVar v srv smpSubWorkers
331332
332- reconnectSMPClient :: forall p . SMPClientAgent p -> SMPServer -> (Maybe ( ServiceId , Int64 ) , Maybe (Map QueueId C. APrivateAuthKey )) -> ExceptT SMPClientError IO ()
333+ reconnectSMPClient :: forall p . SMPClientAgent p -> SMPServer -> (Maybe ServiceSub , Maybe (Map QueueId C. APrivateAuthKey )) -> ExceptT SMPClientError IO ()
333334reconnectSMPClient ca@ SMPClientAgent {agentCfg, agentParty} srv (sSub_, qSubs_) =
334335 withSMP ca srv $ \ smp -> liftIO $ case serviceParty agentParty of
335336 Just Dict -> resubscribe smp
@@ -430,7 +431,7 @@ smpSubscribeQueues ca smp srv subs = do
430431 let acc@ (_, _, (qOks, sQs), notPending) = foldr (groupSub pending) (False , [] , ([] , [] ), [] ) (L. zip subs rs)
431432 unless (null qOks) $ addActiveSubs ca srv qOks
432433 unless (null sQs) $ forM_ smpServiceId $ \ serviceId ->
433- updateActiveServiceSub ca srv (( serviceId, fromIntegral $ length sQs), sessId)
434+ updateActiveServiceSub ca srv (ServiceSub serviceId ( fromIntegral $ length sQs) (queueIdsHash sQs), sessId)
434435 unless (null notPending) $ removePendingSubs ca srv notPending
435436 pure acc
436437 sessId = sessionId $ thParams smp
@@ -454,40 +455,40 @@ smpSubscribeQueues ca smp srv subs = do
454455 notify_ :: (SMPServer -> NonEmpty a -> SMPClientAgentEvent ) -> [a ] -> IO ()
455456 notify_ evt qs = mapM_ (notify ca . evt srv) $ L. nonEmpty qs
456457
457- subscribeServiceNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> ( ServiceId , Int64 ) -> IO ()
458+ subscribeServiceNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> ServiceSub -> IO ()
458459subscribeServiceNtfs = subscribeService_
459460{-# INLINE subscribeServiceNtfs #-}
460461
461- subscribeService_ :: (PartyI p , ServiceParty p ) => SMPClientAgent p -> SMPServer -> ( ServiceId , Int64 ) -> IO ()
462+ subscribeService_ :: (PartyI p , ServiceParty p ) => SMPClientAgent p -> SMPServer -> ServiceSub -> IO ()
462463subscribeService_ ca srv serviceSub = do
463464 atomically $ setPendingServiceSub ca srv $ Just serviceSub
464465 runExceptT (getSMPServerClient' ca srv) >>= \ case
465466 Right smp -> smpSubscribeService ca smp srv serviceSub
466467 Left _ -> pure () -- no call to reconnectClient - failing getSMPServerClient' does that
467468
468- smpSubscribeService :: (PartyI p , ServiceParty p ) => SMPClientAgent p -> SMPClient -> SMPServer -> ( ServiceId , Int64 ) -> IO ()
469- smpSubscribeService ca smp srv serviceSub@ (serviceId, _ ) = case smpClientService smp of
469+ smpSubscribeService :: (PartyI p , ServiceParty p ) => SMPClientAgent p -> SMPClient -> SMPServer -> ServiceSub -> IO ()
470+ smpSubscribeService ca smp srv serviceSub@ (ServiceSub serviceId n idsHash ) = case smpClientService smp of
470471 Just service | serviceAvailable service -> subscribe
471472 _ -> notifyUnavailable
472473 where
473474 subscribe = do
474- r <- runExceptT $ subscribeService smp $ agentParty ca
475+ r <- runExceptT $ subscribeService smp ( agentParty ca) n idsHash
475476 ok <-
476477 atomically $
477478 ifM
478479 (activeClientSession ca smp srv)
479480 (True <$ processSubscription r)
480481 (pure False )
481482 if ok
482- then case r of -- TODO [certs rcv] compare hash
483- Right (n, _idsHash) -> notify ca $ CAServiceSubscribed srv serviceSub n
483+ then case r of
484+ Right serviceSub' -> notify ca $ CAServiceSubscribed srv serviceSub serviceSub'
484485 Left e
485486 | smpClientServiceError e -> notifyUnavailable
486487 | temporaryClientError e -> reconnectClient ca srv
487488 | otherwise -> notify ca $ CAServiceSubError srv serviceSub e
488489 else reconnectClient ca srv
489- processSubscription = mapM_ $ \ (n, _idsHash) -> do -- TODO [certs rcv] validate hash here?
490- setActiveServiceSub ca srv $ Just ((serviceId, n) , sessId)
490+ processSubscription = mapM_ $ \ serviceSub' -> do -- TODO [certs rcv] validate hash here?
491+ setActiveServiceSub ca srv $ Just (serviceSub' , sessId)
491492 setPendingServiceSub ca srv Nothing
492493 serviceAvailable THClientService {serviceRole, serviceId = serviceId'} =
493494 serviceId == serviceId' && partyServiceRole (agentParty ca) == serviceRole
@@ -529,11 +530,11 @@ addSubs_ subs srv ss =
529530 Just m -> TM. union ss m
530531 _ -> TM. insertM srv (newTVar ss) subs
531532
532- setActiveServiceSub :: SMPClientAgent p -> SMPServer -> Maybe (( ServiceId , Int64 ) , SessionId ) -> STM ()
533+ setActiveServiceSub :: SMPClientAgent p -> SMPServer -> Maybe (ServiceSub , SessionId ) -> STM ()
533534setActiveServiceSub = setServiceSub_ activeServiceSubs
534535{-# INLINE setActiveServiceSub #-}
535536
536- setPendingServiceSub :: SMPClientAgent p -> SMPServer -> Maybe ( ServiceId , Int64 ) -> STM ()
537+ setPendingServiceSub :: SMPClientAgent p -> SMPServer -> Maybe ServiceSub -> STM ()
537538setPendingServiceSub = setServiceSub_ pendingServiceSubs
538539{-# INLINE setPendingServiceSub #-}
539540
@@ -548,12 +549,12 @@ setServiceSub_ subsSel ca srv sub =
548549 Just v -> writeTVar v sub
549550 Nothing -> TM. insertM srv (newTVar sub) (subsSel ca)
550551
551- updateActiveServiceSub :: SMPClientAgent p -> SMPServer -> (( ServiceId , Int64 ) , SessionId ) -> STM ()
552- updateActiveServiceSub ca srv sub@ (( serviceId', n') , sessId') =
552+ updateActiveServiceSub :: SMPClientAgent p -> SMPServer -> (ServiceSub , SessionId ) -> STM ()
553+ updateActiveServiceSub ca srv sub@ (ServiceSub serviceId' n' idsHash' , sessId') =
553554 TM. lookup srv (activeServiceSubs ca) >>= \ case
554555 Just v -> modifyTVar' v $ \ case
555- Just (( serviceId, n) , sessId) | serviceId == serviceId' && sessId == sessId' ->
556- Just (( serviceId, n + n'), sessId)
556+ Just (ServiceSub serviceId n idsHash , sessId) | serviceId == serviceId' && sessId == sessId' ->
557+ Just (ServiceSub serviceId ( n + n') (idsHash <> idsHash '), sessId)
557558 _ -> Just sub
558559 Nothing -> TM. insertM srv (newTVar $ Just sub) (activeServiceSubs ca)
559560
0 commit comments