@@ -221,7 +221,9 @@ import Simplex.Messaging.Protocol
221221 SMPMsgMeta ,
222222 SParty (.. ),
223223 SProtocolType (.. ),
224- ServiceSubResult ,
224+ ServiceSub (.. ),
225+ ServiceSubResult (.. ),
226+ ServiceSubError (.. ),
225227 SndPublicAuthKey ,
226228 SubscriptionMode (.. ),
227229 UserProtocol ,
@@ -1040,10 +1042,10 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni
10401042 createRcvQueue nonce_ qd e2eKeys = do
10411043 AgentConfig {smpClientVRange = vr} <- asks config
10421044 ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
1043- (rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \ e -> liftIO (print e) >> throwE e
1045+ (rq, qUri, tSess, sessId, serviceId_ ) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \ e -> liftIO (print e) >> throwE e
10441046 atomically $ incSMPServerStat c userId srv connCreated
10451047 rq' <- withStore c $ \ db -> updateNewConnRcv db connId rq subMode
1046- lift . when (subMode == SMSubscribe ) $ addNewQueueSubscription c rq' tSess sessId
1048+ lift . when (subMode == SMSubscribe ) $ addNewQueueSubscription c rq' tSess sessId serviceId_
10471049 mapM_ (newQueueNtfSubscription c rq') ntfServer_
10481050 pure (rq', qUri)
10491051 createConnReq :: SMPQueueUri -> AM (ConnectionRequestUri c )
@@ -1291,11 +1293,11 @@ joinConnSrvAsync _c _userId _connId _enableNtfs (CRContactUri _) _cInfo _subMode
12911293createReplyQueue :: AgentClient -> NetworkRequestMode -> ConnData -> SndQueue -> SubscriptionMode -> SMPServerWithAuth -> AM SMPQueueInfo
12921294createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do
12931295 ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
1294- (rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
1296+ (rq, qUri, tSess, sessId, serviceId_ ) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
12951297 atomically $ incSMPServerStat c userId (qServer rq) connCreated
12961298 let qInfo = toVersionT qUri smpClientVersion
12971299 rq' <- withStore c $ \ db -> upgradeSndConnToDuplex db connId rq subMode
1298- lift . when (subMode == SMSubscribe ) $ addNewQueueSubscription c rq' tSess sessId
1300+ lift . when (subMode == SMSubscribe ) $ addNewQueueSubscription c rq' tSess sessId serviceId_
12991301 mapM_ (newQueueNtfSubscription c rq') ntfServer_
13001302 pure qInfo
13011303
@@ -1451,22 +1453,14 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14511453 Just activeUserId -> sortOn (\ (uId, _) -> if uId == activeUserId then 0 else 1 :: Int ) userSrvs
14521454 Nothing -> userSrvs
14531455 useServices <- readTVarIO $ useClientServices c
1454- -- These options are possible below:
1455- -- 1) services fully disabled:
1456- -- No service subscriptions will be attempted, and existing services and association will remain in in the database,
1457- -- but they will be ignored because of hasService parameter set to False.
1458- -- This approach preserves performance for all clients that do not use services.
1459- -- 2) at least one user ID has services enabled:
1460- -- Service will be loaded for all user/server combinations:
1461- -- a) service is enabled for user ID and service record exists: subscription will be attempted,
1462- -- b) service is disabled and record exists: service record and all associations will be removed,
1463- -- c) service is disabled or no record: no subscription attempt.
1456+ -- Service will be loaded for all user/server combinations:
1457+ -- a) service is enabled for user ID and service record exists: subscription will be attempted,
1458+ -- b) service is disabled and record exists: service record and all associations will be removed,
1459+ -- c) service is disabled or no record: no subscription attempt.
14641460 -- On successful service subscription, only unassociated queues will be subscribed.
1465- userSrvs'' <-
1466- if any id useServices
1467- then lift $ mapConcurrently (subscribeService useServices) userSrvs'
1468- else pure $ map (,False ) userSrvs'
1469- rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs''
1461+ userSrvs2 <- withStore' c $ \ db -> mapM (getService db useServices) userSrvs'
1462+ userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2
1463+ rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3
14701464 let (errs, oks) = partitionEithers rs
14711465 logInfo $ " subscribed " <> tshow (sum oks) <> " queues"
14721466 forM_ (L. nonEmpty errs) $ notifySub c . ERRS . L. map (" " ,)
@@ -1475,16 +1469,30 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14751469 resumeAllCommands c
14761470 where
14771471 handleErr = (`catchAllErrors` \ e -> notifySub' c " " (ERR e) >> throwE e)
1478- subscribeService :: Map UserId Bool -> (UserId , SMPServer ) -> AM' ((UserId , SMPServer ), ServiceAssoc )
1479- subscribeService useServices us@ (userId, srv) = fmap ((us,) . fromRight False ) $ tryAllErrors' $ do
1480- withStore' c ( \ db -> getSubscriptionService db userId srv) >>= \ case
1472+ getService :: DB. Connection -> Map UserId Bool -> (UserId , SMPServer ) -> IO ((UserId , SMPServer ), Maybe ServiceSub )
1473+ getService db useServices us@ (userId, srv) =
1474+ fmap (us,) $ getSubscriptionService db userId srv >>= \ case
14811475 Just serviceSub -> case M. lookup userId useServices of
1482- Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \ case
1483- Left e | clientServiceError e -> unassocQueues $> False
1476+ Just True -> pure $ Just serviceSub
1477+ _ -> Nothing <$ unassocUserServerRcvQueueSubs' db userId srv
1478+ _ -> pure Nothing
1479+ subscribeService :: ((UserId , SMPServer ), Maybe ServiceSub ) -> AM' ((UserId , SMPServer ), ServiceAssoc )
1480+ subscribeService (us@ (userId, srv), serviceSub_) = fmap ((us,) . fromRight False ) $ tryAllErrors' $
1481+ case serviceSub_ of
1482+ Just serviceSub -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \ case
1483+ Right (ServiceSubResult e _) -> case e of
1484+ Just SSErrorServiceId {} -> unassocQueues
1485+ -- Possibly, we should always resubscribe all when expected is greater than subscribed
1486+ Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
14841487 _ -> pure True
1485- _ -> unassocQueues $> False
1488+ Left e -> do
1489+ atomically $ writeTBQueue (subQ c) (" " , " " , AEvt SAEConn $ ERR e)
1490+ if clientServiceError e
1491+ then unassocQueues
1492+ else pure True
14861493 where
1487- unassocQueues = withStore' c $ \ db -> unassocUserServerRcvQueueSubs db userId srv
1494+ unassocQueues :: AM Bool
1495+ unassocQueues = False <$ withStore' c (\ db -> unassocUserServerRcvQueueSubs' db userId srv)
14881496 _ -> pure False
14891497 subscribeUserServer :: Int -> TVar Int -> ((UserId , SMPServer ), ServiceAssoc ) -> AM' (Either AgentErrorType Int )
14901498 subscribeUserServer maxPending currPending ((userId, srv), hasService) = do
@@ -2219,10 +2227,10 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
22192227 srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth
22202228 -- TODO [notications] possible improvement would be to create ntf credentials here, to avoid creating them after rotation completes.
22212229 -- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions.
2222- (q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
2230+ (q, qUri, tSess, sessId, serviceId_ ) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
22232231 let rq' = (q :: NewRcvQueue ) {primary = True , dbReplaceQueueId = Just dbQueueId}
22242232 rq'' <- withStore c $ \ db -> addConnRcvQueue db connId rq' SMSubscribe
2225- lift $ addNewQueueSubscription c rq'' tSess sessId
2233+ lift $ addNewQueueSubscription c rq'' tSess sessId serviceId_
22262234 void . enqueueMessages c cData sqs SMP. noMsgFlags $ QADD [(qUri, Just (server, sndId))]
22272235 rq1 <- withStore' c $ \ db -> setRcvSwitchStatus db rq $ Just RSSendingQADD
22282236 let rqs' = updatedQs rq1 rqs <> [rq'']
@@ -2908,7 +2916,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
29082916 processSubOk :: RcvQueue -> TVar [ConnId ] -> TVar [RcvQueue ] -> Maybe SMP. ServiceId -> IO ()
29092917 processSubOk rq@ RcvQueue {connId} upConnIds serviceRQs serviceId_ =
29102918 atomically . whenM (isPendingSub rq) $ do
2911- SS. addActiveSub tSess sessId rq $ currentSubs c
2919+ SS. addActiveSub tSess sessId serviceId_ rq $ currentSubs c
29122920 modifyTVar' upConnIds (connId : )
29132921 when (isJust serviceId_ && serviceId_ == clientServiceId_) $ modifyTVar' serviceRQs (rq : )
29142922 clientServiceId_ = (\ THClientService {serviceId} -> serviceId) <$> (clientService =<< thAuth)
@@ -3115,16 +3123,26 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
31153123 notifyEnd removed
31163124 | removed = notify END >> logServer " <--" c srv rId " END"
31173125 | otherwise = logServer " <--" c srv rId " END from disconnected client - ignored"
3118- -- TODO [certs rcv]
3119- r@ (SMP. ENDS _) -> unexpected r
3126+ SMP. ENDS n idsHash ->
3127+ atomically (ifM (activeClientSession c tSess sessId) (SS. deleteServiceSub tSess (currentSubs c) $> True ) (pure False ))
3128+ >>= notifyEnd
3129+ where
3130+ notifyEnd removed
3131+ | removed = do
3132+ forM_ clientServiceId_ $ \ serviceId ->
3133+ notify_ B. empty $ SERVICE_END srv $ ServiceSub serviceId n idsHash
3134+ logServer " <--" c srv rId " ENDS"
3135+ | otherwise = logServer " <--" c srv rId " ENDS from disconnected client - ignored"
31203136 -- TODO [certs rcv] Possibly, we need to add some flag to connection that it was deleted
31213137 SMP. DELD -> atomically (removeSubscription c tSess connId rq) >> notify DELD
31223138 SMP. ERR e -> notify $ ERR $ SMP (B. unpack $ strEncode srv) e
31233139 r -> unexpected r
31243140 where
31253141 notify :: forall e m . (AEntityI e , MonadIO m ) => AEvent e -> m ()
3126- notify msg =
3127- let t = (" " , connId, AEvt (sAEntity @ e ) msg)
3142+ notify = notify_ connId
3143+ notify_ :: forall e m . (AEntityI e , MonadIO m ) => ConnId -> AEvent e -> m ()
3144+ notify_ connId' msg =
3145+ let t = (" " , connId', AEvt (sAEntity @ e ) msg)
31283146 in atomically $ ifM (isFullTBQueue subQ) (modifyTVar' pendingMsgs (t : )) (writeTBQueue subQ t)
31293147
31303148 prohibited :: Text -> AM ()
0 commit comments