Skip to content

Commit 4647d69

Browse files
authored
smp server: do not include previously blocked queues in stats, prevent leak of client threads (#1593)
1 parent 9ab071d commit 4647d69

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -989,21 +989,23 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
989989
then liftIO $ hPutStrLn h $ "error: reached limit of " <> show quota <> " queues blocked daily"
990990
else do
991991
r <- liftIO $ runExceptT $ do
992-
q <- ExceptT $ getQueue st SSender sId
993-
ExceptT $ blockQueue (queueStore st) q info
992+
(q, QueueRec {status}) <- ExceptT $ getQueueRec st SSender sId
993+
when (status == EntityActive) $ ExceptT $ blockQueue (queueStore st) q info
994+
pure status
994995
case r of
995996
Left e -> liftIO $ hPutStrLn h $ "error: " <> show e
996-
Right () -> do
997+
Right EntityActive -> do
997998
incStat $ qBlocked stats
998-
liftIO $ hPutStrLn h "ok"
999+
liftIO $ hPutStrLn h "ok, queue blocked"
1000+
Right status -> liftIO $ hPutStrLn h $ "ok, already inactive: " <> show status
9991001
CPUnblock sId -> withUserRole $ unliftIO u $ do
10001002
st <- asks msgStore
10011003
r <- liftIO $ runExceptT $ do
10021004
q <- ExceptT $ getQueue st SSender sId
10031005
ExceptT $ unblockQueue (queueStore st) q
10041006
liftIO $ hPutStrLn h $ case r of
10051007
Left e -> "error: " <> show e
1006-
Right () -> "ok"
1008+
Right () -> "ok, queue unblocked"
10071009
CPSave -> withAdminRole $ withLock' (savingLock srv) "control" $ do
10081010
hPutStrLn h "saving server state..."
10091011
unliftIO u $ saveServer False

src/Simplex/Messaging/Transport/Server.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import Network.Socket
4949
import qualified Network.TLS as T
5050
import Simplex.Messaging.Transport
5151
import Simplex.Messaging.Transport.Shared
52-
import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow)
52+
import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow, unlessM)
5353
import System.Exit (exitFailure)
5454
import System.IO.Error (tryIOError)
5555
import System.Mem.Weak (Weak, deRefWeak)
@@ -172,12 +172,13 @@ runTCPServerSocket (accepted, gracefullyClosed, clients) started getSocket serve
172172
E.bracket getSocket (closeServer started clients) $ \sock ->
173173
forever . E.bracketOnError (safeAccept sock) (close . fst) $ \(conn, _peer) -> do
174174
cId <- atomically $ stateTVar accepted $ \cId -> let cId' = cId + 1 in cId' `seq` (cId', cId')
175+
closed <- newTVarIO False
175176
let closeConn _ = do
176-
atomically $ modifyTVar' clients $ IM.delete cId
177+
atomically $ writeTVar closed True >> modifyTVar' clients (IM.delete cId)
177178
gracefulClose conn 5000 `catchAll_` pure () -- catchAll_ is needed here in case the connection was closed earlier
178179
atomically $ modifyTVar' gracefullyClosed (+ 1)
179180
tId <- mkWeakThreadId =<< server conn `forkFinally` closeConn
180-
atomically $ modifyTVar' clients $ IM.insert cId tId
181+
atomically $ unlessM (readTVar closed) $ modifyTVar' clients $ IM.insert cId tId
181182

182183
-- | Recover from errors in `accept` whenever it is safe.
183184
-- Some errors are safe to ignore, while blindly restaring `accept` may trigger a busy loop.

0 commit comments

Comments
 (0)