55{-# LANGUAGE FlexibleContexts #-}
66{-# LANGUAGE FlexibleInstances #-}
77{-# LANGUAGE GADTSyntax #-}
8+ {-# LANGUAGE LambdaCase #-}
89{-# LANGUAGE RankNTypes #-}
910{-# LANGUAGE RecordWildCards #-}
1011{-# LANGUAGE ScopedTypeVariables #-}
1415module Chan.Mux (
1516 ToFromBundleMsg (.. ),
1617 ConnectionBundle (.. ),
17- fromBearerMsg ,
18+ mapBearerTracer ,
1819 newMuxChan ,
1920) where
2021
2122import Chan.Core (Chan (.. ))
22- import Chan.TCP (MessageSize (.. ))
23+ import Chan.TCP (Bytes , MessageSize (.. ))
2324import qualified Control.Category as Cat
2425import Control.Concurrent.Class.MonadMVar (MonadMVar (.. ))
25- import Control.Monad (forever )
26+ import qualified Control.Lens as Lens
27+ import Control.Monad (forM_ , forever )
2628import Control.Monad.Class.MonadFork (MonadFork (forkIO ))
29+ import Control.Tracer (Tracer (Tracer ), traceWith )
2730import Data.Array (Array , listArray , (!) )
2831import Data.Kind
32+ import Data.Foldable (traverse_ )
33+ import Data.Functor.Const (Const (Const ), getConst )
2934import STMCompat
3035
3136class ConnectionBundle bundle where
@@ -56,66 +61,86 @@ instance Cat.Category ToFromBundleMsg where
5661-- dynToFromBundleMsg :: Typeable a => ToFromBundleMsg Dynamic a
5762-- dynToFromBundleMsg = ToFromBundleMsg toDyn (fromJust . fromDynamic)
5863
59- data BearerMsg a = BearerMsg ! Int a
64+ data BorneMsg a = BorneMsg ! Int a
6065
61- fromBearerMsg :: BearerMsg a -> a
62- fromBearerMsg (BearerMsg _ a) = a
63-
64- instance MessageSize a => MessageSize (BearerMsg a ) where
65- messageSizeBytes (BearerMsg _ a) = 1 + messageSizeBytes a
66+ -- | Each bearer message is some slices of various 'BorneMsg's
67+ --
68+ -- The mini protocols never see this, so this type is not exported. It does
69+ -- occur in the argument types of some exported functions, but the caller
70+ -- should be using parametric functions to generate those arguments.
71+ data BearerMsg a = BearerMsg ! Bytes [BorneMsg a ]
72+ -- ^ the cumulative size of the slices the borne messages whose /final/ slice
73+ -- is in this message
74+
75+ instance MessageSize (BearerMsg a ) where
76+ messageSizeBytes (BearerMsg sz _) = 1 + sz
77+
78+ mapBearerTracer ::
79+ Applicative m =>
80+ Lens. Lens s t (BearerMsg a ) a ->
81+ Tracer m t ->
82+ Tracer m s
83+ mapBearerTracer lens tracer = Tracer $ \ x -> do
84+ let BearerMsg _ msgs = getConst $ lens Const x -- why doesn't Lens.view lens x type check?
85+ flip traverse_ msgs $ \ (BorneMsg _ a) -> do
86+ traceWith tracer $ Lens. set lens a x
6687
6788newMuxChan ::
6889 forall bundle m .
69- (ConnectionBundle bundle , MonadMVar m , MonadSTM m , MonadFork m ) =>
90+ (ConnectionBundle bundle , MonadMVar m , MonadSTM m , MonadFork m , MessageSize ( BundleMsg bundle ) ) =>
7091 Chan m (BearerMsg (BundleMsg bundle )) ->
7192 m (bundle (Chan m ))
7293newMuxChan bearer = do
73- sendLock <- newMVar ()
7494 -- Bit of a hack to use these TVars, could run the traverseConnectionBundle
7595 -- in a reader+state monad instead. That'd be cleaner.
7696 recvQueuesAccum <- newTVarIO []
7797 recvQueuesIx <- newTVarIO (0 :: Int )
98+ sendQueue <- newTQueueIO
7899 chans <-
79100 traverseConnectionBundle
80101 ( newMuxChanSingle @ bundle
81- bearer
82- sendLock
102+ sendQueue
83103 recvQueuesIx
84104 recvQueuesAccum
85105 )
86106 toFromBundleMsgBundle
87107 recvQueues <- reverse <$> readTVarIO recvQueuesAccum
88108 let recvQueues' = listArray (0 , length recvQueues - 1 ) recvQueues
89109 _ <- forkIO $ demuxer @ bundle bearer recvQueues'
110+ _ <- forkIO $ muxer @ bundle bearer sendQueue
90111 return chans
91112
92113newMuxChanSingle ::
93114 forall bundle m a .
94- (MonadMVar m , MonadSTM m ) =>
95- Chan m (BearerMsg (BundleMsg bundle )) ->
96- MVar m () ->
115+ (MonadMVar m , MonadSTM m , MessageSize (BundleMsg bundle )) =>
116+ TQueue m (MVar m () , Bytes , BorneMsg (BundleMsg bundle )) ->
97117 TVar m Int ->
98118 TVar m [RecvQueue m (BundleMsg bundle )] ->
99119 ToFromBundleMsg (BundleMsg bundle ) a ->
100120 m (Chan m a )
101121newMuxChanSingle
102- bearer
103- sendLock
122+ sendQueue
104123 recvQueuesIx
105124 recvQueuesAccum
106125 ToFromBundleMsg {.. } = do
107- queue <- newTQueueIO
126+ recvQueue <- newTQueueIO
127+ -- A mini protocol can have at most one message in the send buffer.
128+ sendLock <- newMVar ()
108129 i <- atomically $ do
109- modifyTVar recvQueuesAccum (RecvQueue fromBundleMsg queue : )
130+ modifyTVar recvQueuesAccum (RecvQueue fromBundleMsg recvQueue : )
110131 i <- readTVar recvQueuesIx
111132 writeTVar recvQueuesIx $! (i + 1 )
112133 return i
113134 return
114135 Chan
115- { readChan = atomically (readTQueue queue)
116- , writeChan = \ msg ->
117- let ! muxmsg = BearerMsg i (toBundleMsg msg)
118- in withMVar sendLock $ \ _ -> writeChan bearer muxmsg
136+ { readChan = atomically (readTQueue recvQueue)
137+ , writeChan = \ msg -> do
138+ let ! bundleMsg = toBundleMsg msg
139+ ! muxmsg = BorneMsg i bundleMsg
140+ takeMVar sendLock
141+ atomically $
142+ writeTQueue sendQueue $
143+ (sendLock, messageSizeBytes bundleMsg, muxmsg)
119144 }
120145
121146data RecvQueue m mm where
@@ -129,10 +154,49 @@ demuxer ::
129154 m ()
130155demuxer bearer queues =
131156 forever $ do
132- BearerMsg i msg <- readChan bearer
133- case queues ! i of
134- RecvQueue convert queue ->
135- atomically $ writeTQueue queue $! convert msg
157+ BearerMsg _ msgs <- readChan bearer
158+ forM_ msgs $ \ (BorneMsg i msg) ->
159+ case queues ! i of
160+ RecvQueue convert queue ->
161+ atomically $ writeTQueue queue $! convert msg
162+
163+ muxer ::
164+ forall bundle m .
165+ (MonadMVar m , MonadSTM m ) =>
166+ Chan m (BearerMsg (BundleMsg bundle )) ->
167+ TQueue m (MVar m () , Bytes , BorneMsg (BundleMsg bundle )) ->
168+ m ()
169+ muxer bearer sendQueue =
170+ forever $ do
171+ x <- atomically (readTQueue sendQueue)
172+ (muxmsg, locks) <- go 0 [] [] x
173+ mapM_ (flip putMVar () ) locks
174+ writeChan bearer muxmsg
175+ where
176+ --- from ouroboros-network's @Network.Mux.Bearer.makeSocketBearer'@
177+ sliceBytes = 12288
178+ loafBytes = 131072
179+
180+ go ! accBytes acc locks (lock, bytes, msg) = do
181+ let ! accBytes' = accBytes + min sliceBytes bytes
182+ (acc', locks') <-
183+ if bytes <= sliceBytes
184+ then do
185+ -- We do not release the lock before finalizing the loaf because a
186+ -- single loaf should include slices from at most one borne message
187+ -- per protocol.
188+ pure (msg : acc, lock : locks)
189+ else do
190+ -- reenqueue the rest of the message
191+ let ! bytes' = bytes - sliceBytes
192+ atomically $ writeTQueue sendQueue (lock, bytes', msg)
193+ pure (acc, locks)
194+
195+ let result = (BearerMsg accBytes' acc', locks')
196+ if accBytes' >= loafBytes then pure result else do
197+ atomically (tryReadTQueue sendQueue) >>= \ case
198+ Nothing -> pure result
199+ Just x -> go accBytes' acc' locks' x
136200
137201data ExampleBundle f = ExampleBundle
138202 { exampleFoo :: f Int
0 commit comments