Skip to content

Commit 55ff581

Browse files
authored
SMP client: dont block on writing to sending queues (#1454)
* SMP client: dont block on writing to sending queues * only fork if full
1 parent 560b257 commit 55ff581

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

src/Simplex/Messaging/Client.hs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ module Simplex.Messaging.Client
105105
where
106106

107107
import Control.Applicative ((<|>))
108-
import Control.Concurrent (ThreadId, forkFinally, killThread, mkWeakThreadId)
108+
import Control.Concurrent (ThreadId, forkFinally, forkIO, killThread, mkWeakThreadId)
109109
import Control.Concurrent.Async
110110
import Control.Concurrent.STM
111111
import Control.Exception
@@ -1086,11 +1086,11 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do
10861086
pure [Response entityId $ Left $ PCETransportError e]
10871087
TBTransmissions s n rs
10881088
| n > 0 -> do
1089-
atomically $ writeTBQueue sndQ (Nothing, s) -- do not expire batched responses
1089+
nonBlockingWriteTBQueue sndQ (Nothing, s) -- do not expire batched responses
10901090
mapConcurrently (getResponse c Nothing) rs
10911091
| otherwise -> pure []
10921092
TBTransmission s r -> do
1093-
atomically $ writeTBQueue sndQ (Nothing, s)
1093+
nonBlockingWriteTBQueue sndQ (Nothing, s)
10941094
(: []) <$> getResponse c Nothing r
10951095

10961096
-- | Send Protocol command
@@ -1112,13 +1112,18 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan
11121112
Right t
11131113
| B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg
11141114
| otherwise -> do
1115-
atomically $ writeTBQueue sndQ (Just r, s)
1115+
nonBlockingWriteTBQueue sndQ (Just r, s)
11161116
response <$> getResponse c tOut r
11171117
where
11181118
s
11191119
| batch = tEncodeBatch1 t
11201120
| otherwise = tEncode t
11211121

1122+
nonBlockingWriteTBQueue :: TBQueue a -> a -> IO ()
1123+
nonBlockingWriteTBQueue q x = do
1124+
sent <- atomically $ ifM (isFullTBQueue q) (pure False) (writeTBQueue q x $> True)
1125+
unless sent $ void $ forkIO $ atomically $ writeTBQueue q x
1126+
11221127
getResponse :: ProtocolClient v err msg -> Maybe Int -> Request err msg -> IO (Response err msg)
11231128
getResponse ProtocolClient {client_ = PClient {tcpTimeout, timeoutErrorCount}} tOut Request {entityId, pending, responseVar} = do
11241129
r <- fromMaybe tcpTimeout tOut `timeout` atomically (takeTMVar responseVar)

0 commit comments

Comments
 (0)