@@ -28,9 +28,9 @@ import Control.Monad (forM_, forever)
2828import Control.Monad.Class.MonadFork (MonadFork (forkIO ))
2929import Control.Tracer (Tracer (Tracer ), traceWith )
3030import Data.Array (Array , listArray , (!) )
31- import Data.Kind
3231import Data.Foldable (traverse_ )
3332import Data.Functor.Const (Const (Const ), getConst )
33+ import Data.Kind
3434import STMCompat
3535
3636class ConnectionBundle bundle where
@@ -48,8 +48,7 @@ class ConnectionBundle bundle where
4848-- 'toBundleMsg'. For example, a valid implementation would be:
4949--
5050-- > ToFromBundleMsg toDynamic (fromJust . fromDynamic)
51- data ToFromBundleMsg mm a
52- = ToFromBundleMsg
51+ data ToFromBundleMsg mm a = ToFromBundleMsg
5352 { toBundleMsg :: a -> mm
5453 , fromBundleMsg :: mm -> a
5554 }
@@ -68,9 +67,10 @@ data BorneMsg a = BorneMsg !Int a
6867-- The mini protocols never see this, so this type is not exported. It does
6968-- occur in the argument types of some exported functions, but the caller
7069-- should be using parametric functions to generate those arguments.
71- data BearerMsg a = BearerMsg ! Bytes [BorneMsg a ]
72- -- ^ the cumulative size of the slices the borne messages whose /final/ slice
73- -- is in this message
70+ data BearerMsg a
71+ = -- | the cumulative size of the slices the borne messages whose /final/ slice
72+ -- is in this message
73+ BearerMsg ! Bytes [BorneMsg a ]
7474
7575instance MessageSize (BearerMsg a ) where
7676 messageSizeBytes (BearerMsg sz _) = 1 + sz
@@ -140,7 +140,7 @@ newMuxChanSingle
140140 takeMVar sendLock
141141 atomically $
142142 writeTQueue sendQueue $
143- (sendLock, messageSizeBytes bundleMsg, muxmsg)
143+ (sendLock, messageSizeBytes bundleMsg, muxmsg)
144144 }
145145
146146data RecvQueue m mm where
@@ -167,20 +167,20 @@ muxer ::
167167 TQueue m (MVar m () , Bytes , BorneMsg (BundleMsg bundle )) ->
168168 m ()
169169muxer bearer sendQueue =
170- forever $ do
171- x <- atomically (readTQueue sendQueue)
172- (muxmsg, locks) <- go 0 [] [] x
173- mapM_ (flip putMVar () ) locks
174- writeChan bearer muxmsg
175- where
176- --- from ouroboros-network's @Network.Mux.Bearer.makeSocketBearer'@
177- sliceBytes = 12288
178- loafBytes = 131072
179-
180- go ! accBytes acc locks (lock, bytes, msg) = do
181- let ! accBytes' = accBytes + min sliceBytes bytes
182- (acc', locks') <-
183- if bytes <= sliceBytes
170+ forever $ do
171+ x <- atomically (readTQueue sendQueue)
172+ (muxmsg, locks) <- go 0 [] [] x
173+ mapM_ (flip putMVar () ) locks
174+ writeChan bearer muxmsg
175+ where
176+ --- from ouroboros-network's @Network.Mux.Bearer.makeSocketBearer'@
177+ sliceBytes = 12288
178+ loafBytes = 131072
179+
180+ go ! accBytes acc locks (lock, bytes, msg) = do
181+ let ! accBytes' = accBytes + min sliceBytes bytes
182+ (acc', locks') <-
183+ if bytes <= sliceBytes
184184 then do
185185 -- We do not release the lock before finalizing the loaf because a
186186 -- single loaf should include slices from at most one borne message
@@ -192,8 +192,10 @@ muxer bearer sendQueue =
192192 atomically $ writeTQueue sendQueue (lock, bytes', msg)
193193 pure (acc, locks)
194194
195- let result = (BearerMsg accBytes' acc', locks')
196- if accBytes' >= loafBytes then pure result else do
195+ let result = (BearerMsg accBytes' acc', locks')
196+ if accBytes' >= loafBytes
197+ then pure result
198+ else do
197199 atomically (tryReadTQueue sendQueue) >>= \ case
198200 Nothing -> pure result
199201 Just x -> go accBytes' acc' locks' x
0 commit comments