@@ -136,9 +136,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
136136 expired <- restoreServerMessages
137137 restoreServerStats expired
138138 raceAny_
139- ( serverThread s " server subscribedQ" subscribedQ subscribers pendingENDs subscriptions cancelSub
140- : serverThread s " server ntfSubscribedQ" ntfSubscribedQ Env. notifiers pendingNtfENDs ntfSubscriptions (\ _ -> pure () )
141- : sendPendingENDsThread s
139+ ( serverThread s " server subscribedQ" True subscribedQ subscribers pendingENDs subscriptions cancelSub
140+ : serverThread s " server deletedQ" False deletedQ subscribers pendingDELDs subscriptions cancelSub
141+ : serverThread s " server ntfSubscribedQ" True ntfSubscribedQ Env. notifiers pendingNtfENDs ntfSubscriptions (\ _ -> pure () )
142+ : serverThread s " server ntfDeletedQ" False ntfDeletedQ Env. notifiers pendingNtfDELDs ntfSubscriptions (\ _ -> pure () )
143+ : sendPendingEvtsThread s
142144 : receiveFromProxyAgent pa
143145 : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
144146 )
@@ -163,22 +165,23 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
163165 forall s .
164166 Server ->
165167 String ->
166- (Server -> TQueue (QueueId , ClientId , Subscribed )) ->
168+ Subscribed ->
169+ (Server -> TQueue (QueueId , ClientId )) ->
167170 (Server -> TMap QueueId (TVar Client )) ->
168171 (Server -> TVar (IM. IntMap (NonEmpty RecipientId ))) ->
169172 (Client -> TMap QueueId s ) ->
170173 (s -> IO () ) ->
171174 M ()
172- serverThread s label subQ subs ends clientSubs unsub = do
175+ serverThread s label subscribed subQ subs pendingEvts clientSubs unsub = do
173176 labelMyThread label
174177 cls <- asks clients
175178 liftIO . forever $
176179 (atomically (readTQueue $ subQ s) >>= atomically . updateSubscribers cls)
177180 $>>= endPreviousSubscriptions
178181 >>= mapM_ unsub
179182 where
180- updateSubscribers :: TVar (IM. IntMap (Maybe Client )) -> (QueueId , ClientId , Bool ) -> STM (Maybe (QueueId , Client ))
181- updateSubscribers cls (qId, clntId, subscribed ) =
183+ updateSubscribers :: TVar (IM. IntMap (Maybe Client )) -> (QueueId , ClientId ) -> STM (Maybe (QueueId , Client ))
184+ updateSubscribers cls (qId, clntId) =
182185 -- Client lookup by ID is in the same STM transaction.
183186 -- In case client disconnects during the transaction,
184187 -- it will be re-evaluated, and the client won't be stored as subscribed.
@@ -201,37 +204,41 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
201204 | otherwise = (\ yes -> if yes then Just (qId, c') else Nothing ) <$> readTVar (connected c')
202205 endPreviousSubscriptions :: (QueueId , Client ) -> IO (Maybe s )
203206 endPreviousSubscriptions (qId, c) = do
204- atomically $ modifyTVar' (ends s) $ IM. alter (Just . maybe [qId] (qId <| )) (clientId c)
207+ atomically $ modifyTVar' (pendingEvts s) $ IM. alter (Just . maybe [qId] (qId <| )) (clientId c)
205208 atomically $ TM. lookupDelete qId (clientSubs c)
206209
207- sendPendingENDsThread :: Server -> M ()
208- sendPendingENDsThread s = do
210+ sendPendingEvtsThread :: Server -> M ()
211+ sendPendingEvtsThread s = do
209212 endInt <- asks $ pendingENDInterval . config
210213 cls <- asks clients
211214 forever $ do
212215 threadDelay endInt
213- sendPending cls $ pendingENDs s
214- sendPending cls $ pendingNtfENDs s
216+ sendPending cls END $ pendingENDs s
217+ sendPending cls DELD $ pendingDELDs s
218+ sendPending cls END $ pendingNtfENDs s
219+ sendPending cls DELD $ pendingNtfDELDs s
215220 where
216- sendPending cls ref = do
221+ sendPending cls evt ref = do
217222 ends <- atomically $ swapTVar ref IM. empty
218223 unless (null ends) $ forM_ (IM. assocs ends) $ \ (cId, qIds) ->
219- mapM_ (queueENDs qIds) . join . IM. lookup cId =<< readTVarIO cls
220- queueENDs qIds c@ Client {connected, sndQ = q} =
224+ mapM_ (queueEvts qIds evt ) . join . IM. lookup cId =<< readTVarIO cls
225+ queueEvts qIds evt c@ Client {connected, sndQ = q} =
221226 whenM (readTVarIO connected) $ do
222227 sent <- atomically $ ifM (isFullTBQueue q) (pure False ) (writeTBQueue q ts $> True )
223228 if sent
224229 then updateEndStats
225230 else -- if queue is full it can block
226- forkClient c (" sendPendingENDsThread.queueENDs " ) $
231+ forkClient c (" sendPendingEvtsThread.queueEvts " ) $
227232 atomically (writeTBQueue q ts) >> updateEndStats
228233 where
229- ts = L. map (CorrId " " ,,END ) qIds
230- updateEndStats = do
231- stats <- asks serverStats
232- let len = L. length qIds
233- liftIO $ atomicModifyIORef'_ (qSubEnd stats) (+ len)
234- liftIO $ atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1 )) -- up to 255 ENDs in the batch
234+ ts = L. map (CorrId " " ,,evt) qIds
235+ updateEndStats = case evt of
236+ END -> do
237+ stats <- asks serverStats
238+ let len = L. length qIds
239+ liftIO $ atomicModifyIORef'_ (qSubEnd stats) (+ len)
240+ liftIO $ atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1 )) -- up to 255 ENDs in the batch
241+ _ -> pure ()
235242
236243 receiveFromProxyAgent :: ProxyAgent -> M ()
237244 receiveFromProxyAgent ProxyAgent {smpAgent = SMPClientAgent {agentQ}} =
@@ -892,7 +899,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do
892899 mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM. insert tId
893900
894901client :: THandleParams SMPVersion 'TServer -> Client -> Server -> M ()
895- client thParams' clnt@ Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, ntfSubscribedQ, subscribers, notifiers} = do
902+ client thParams' clnt@ Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, deletedQ, ntfSubscribedQ, ntfDeletedQ , subscribers, notifiers} = do
896903 labelMyThread . B. unpack $ " client $" <> encode sessionId <> " commands"
897904 forever $
898905 atomically (readTBQueue rcvQ)
@@ -985,11 +992,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
985992 processCommand (qr_, (corrId, entId, cmd)) = case cmd of
986993 Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
987994 Cmd SSender command -> Just <$> case command of
988- SKEY sKey -> (corrId,entId,) <$> case qr_ of
989- Just qr@ QueueRec {sndSecure}
990- | sndSecure -> secureQueue_ " SKEY" qr sKey
991- | otherwise -> pure $ ERR AUTH
992- Nothing -> pure $ ERR INTERNAL
995+ SKEY sKey ->
996+ withQueue $ \ QueueRec {sndSecure, recipientId} ->
997+ (corrId,entId,) <$> if sndSecure then secureQueue_ " SKEY" recipientId sKey else pure $ ERR AUTH
993998 SEND flags msgBody -> withQueue $ \ qr -> sendMessage qr flags msgBody
994999 PING -> pure (corrId, NoEntity , PONG )
9951000 RFWD encBlock -> (corrId, NoEntity ,) <$> processForwardedCommand encBlock
@@ -1009,9 +1014,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
10091014 SUB -> withQueue (`subscribeQueue` entId)
10101015 GET -> withQueue getMessage
10111016 ACK msgId -> withQueue (`acknowledgeMsg` msgId)
1012- KEY sKey -> (corrId,entId,) <$> case qr_ of
1013- Just qr -> secureQueue_ " KEY " qr sKey
1014- Nothing -> pure $ ERR INTERNAL
1017+ KEY sKey ->
1018+ withQueue $ \ QueueRec {recipientId} ->
1019+ (corrId,entId,) <$> secureQueue_ " KEY " recipientId sKey
10151020 NKEY nKey dhKey -> addQueueNotifier_ st nKey dhKey
10161021 NDEL -> deleteQueueNotifier_ st
10171022 OFF -> suspendQueue_ st
@@ -1063,10 +1068,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
10631068 n <- asks $ queueIdBytes . config
10641069 liftM2 (,) (randomId n) (randomId n)
10651070
1066- secureQueue_ :: T. Text -> QueueRec -> SndPublicAuthKey -> M BrokerMsg
1067- secureQueue_ name qr @ QueueRec {recipientId = rId} sKey = time name $ do
1071+ secureQueue_ :: T. Text -> RecipientId -> SndPublicAuthKey -> M BrokerMsg
1072+ secureQueue_ name rId sKey = time name $ do
10681073 withLog $ \ s -> logSecureQueue s rId sKey
1069- updateQueueDate qr
10701074 st <- asks queueStore
10711075 stats <- asks serverStats
10721076 incStat $ qSecured stats
@@ -1086,20 +1090,22 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
10861090 liftIO (addQueueNotifier st entId ntfCreds) >>= \ case
10871091 Left DUPLICATE_ -> addNotifierRetry (n - 1 ) rcvPublicDhKey rcvNtfDhSecret
10881092 Left e -> pure $ ERR e
1089- Right _ -> do
1093+ Right nId_ -> do
10901094 withLog $ \ s -> logAddNotifier s entId ntfCreds
10911095 incStat . ntfCreated =<< asks serverStats
1096+ forM_ nId_ $ \ nId -> atomically $ writeTQueue ntfDeletedQ (nId, clientId)
10921097 pure $ NID notifierId rcvPublicDhKey
10931098
10941099 deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg )
10951100 deleteQueueNotifier_ st = do
10961101 withLog (`logDeleteNotifier` entId)
10971102 liftIO (deleteQueueNotifier st entId) >>= \ case
1098- Right () -> do
1103+ Right (Just nId ) -> do
10991104 -- Possibly, the same should be done if the queue is suspended, but currently we do not use it
1100- atomically $ writeTQueue ntfSubscribedQ (entId , clientId, False )
1105+ atomically $ writeTQueue ntfDeletedQ (nId , clientId)
11011106 incStat . ntfDeleted =<< asks serverStats
11021107 pure ok
1108+ Right Nothing -> pure ok
11031109 Left e -> pure $ err e
11041110
11051111 suspendQueue_ :: QueueStore -> M (Transmission BrokerMsg )
@@ -1124,7 +1130,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
11241130 where
11251131 newSub :: M Sub
11261132 newSub = time " SUB newSub" . atomically $ do
1127- writeTQueue subscribedQ (rId, clientId, True )
1133+ writeTQueue subscribedQ (rId, clientId)
11281134 sub <- newSubscription NoSub
11291135 TM. insert rId sub subscriptions
11301136 pure sub
@@ -1199,7 +1205,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
11991205 pure ok
12001206 where
12011207 newSub = do
1202- writeTQueue ntfSubscribedQ (entId, clientId, True )
1208+ writeTQueue ntfSubscribedQ (entId, clientId)
12031209 TM. insert entId () ntfSubscriptions
12041210
12051211 acknowledgeMsg :: QueueRec -> MsgId -> M (Transmission BrokerMsg )
@@ -1480,9 +1486,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
14801486 liftIO (deleteQueue st entId $>>= \ q -> delMsgQueue ms entId $> Right q) >>= \ case
14811487 Right q -> do
14821488 -- Possibly, the same should be done if the queue is suspended, but currently we do not use it
1483- atomically $ writeTQueue subscribedQ (entId, clientId, False )
1489+ atomically $ writeTQueue deletedQ (entId, clientId)
14841490 forM_ (notifierId <$> notifier q) $ \ nId ->
1485- atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False )
1491+ atomically $ writeTQueue ntfDeletedQ (nId, clientId)
14861492 updateDeletedStats q
14871493 pure ok
14881494 Left e -> pure $ err e
0 commit comments