1010{-# LANGUAGE TypeFamilies #-}
1111
1212module Chan.Mux (
13- ToFromMuxMsg (.. ),
14- MuxBundle (.. ),
15- newConnectionBundleTCP ,
13+ ToFromBundleMsg (.. ),
14+ ConnectionBundle (.. ),
15+ fromBearerMsg ,
16+ newMuxChan ,
1617) where
1718
1819import Chan.Core (Chan (.. ))
19- import Chan.TCP (
20- LabelTcpDir ,
21- MessageSize (.. ),
22- TcpConnProps ,
23- TcpEvent ,
24- newConnectionTCP ,
25- )
20+ import Chan.TCP (MessageSize (.. ))
2621import qualified Control.Category as Cat
27- import Control.Concurrent.Class.MonadMVar (
28- MonadMVar (MVar , newMVar , withMVar ),
29- )
22+ import Control.Concurrent.Class.MonadMVar (MonadMVar (.. ))
3023import Control.Monad (forever )
31- import Control.Monad.Class.MonadAsync (MonadAsync )
3224import Control.Monad.Class.MonadFork (MonadFork (forkIO ))
33- import Control.Tracer (Contravariant (contramap ), Tracer )
3425import Data.Array (Array , listArray , (!) )
35- import Data.Dynamic (Dynamic , Typeable , fromDynamic , toDyn )
36- import Data.Maybe (fromJust )
3726import STMCompat
38- import TimeCompat
3927
40- class MuxBundle bundle where
41- type MuxMsg bundle
42- toFromMuxMsgBundle :: bundle (ToFromMuxMsg ( MuxMsg bundle ))
28+ class ConnectionBundle bundle where
29+ type BundleMsg bundle
30+ toFromBundleMsgBundle :: bundle (ToFromBundleMsg ( BundleMsg bundle ))
4331
44- traverseMuxBundle ::
32+ traverseConnectionBundle ::
4533 Monad m =>
4634 (forall a . f a -> m (g a )) ->
4735 bundle f ->
@@ -50,24 +38,24 @@ class MuxBundle bundle where
5038-- | Injection, projection, between a common mux message type, and an
5139-- individual message type. The following must hold:
5240--
53- -- > fromMuxMsg (toMuxMsg x) = x
41+ -- > fromBundleMsg (toBundleMsg x) = x
5442--
55- -- But 'fromMuxMsg ' is not required to be defined outside of the image of
56- -- 'toMuxMsg '. For example, a valid implementation would be:
43+ -- But 'fromBundleMsg ' is not required to be defined outside of the image of
44+ -- 'toBundleMsg '. For example, a valid implementation would be:
5745--
58- -- > ToFromMuxMsg toDynamic (fromJust . fromDynamic)
59- data ToFromMuxMsg mm a
60- = ToFromMuxMsg
61- { toMuxMsg :: a -> mm
62- , fromMuxMsg :: mm -> a
46+ -- > ToFromBundleMsg toDynamic (fromJust . fromDynamic)
47+ data ToFromBundleMsg mm a
48+ = ToFromBundleMsg
49+ { toBundleMsg :: a -> mm
50+ , fromBundleMsg :: mm -> a
6351 }
6452
65- instance Cat. Category ToFromMuxMsg where
66- id = ToFromMuxMsg id id
67- (.) (ToFromMuxMsg f f') (ToFromMuxMsg g g') = ToFromMuxMsg (g . f) (f' . g')
53+ instance Cat. Category ToFromBundleMsg where
54+ id = ToFromBundleMsg id id
55+ (.) (ToFromBundleMsg f f') (ToFromBundleMsg g g') = ToFromBundleMsg (g . f) (f' . g')
6856
69- -- dynToFromMuxMsg :: Typeable a => ToFromMuxMsg Dynamic a
70- -- dynToFromMuxMsg = ToFromMuxMsg toDyn (fromJust . fromDynamic)
57+ -- dynToFromBundleMsg :: Typeable a => ToFromBundleMsg Dynamic a
58+ -- dynToFromBundleMsg = ToFromBundleMsg toDyn (fromJust . fromDynamic)
7159
7260data BearerMsg a = BearerMsg ! Int a
7361
@@ -79,24 +67,24 @@ instance MessageSize a => MessageSize (BearerMsg a) where
7967
8068newMuxChan ::
8169 forall bundle m .
82- (MuxBundle bundle , MonadMVar m , MonadSTM m , MonadFork m ) =>
83- Chan m (BearerMsg (MuxMsg bundle )) ->
70+ (ConnectionBundle bundle , MonadMVar m , MonadSTM m , MonadFork m ) =>
71+ Chan m (BearerMsg (BundleMsg bundle )) ->
8472 m (bundle (Chan m ))
8573newMuxChan bearer = do
8674 sendLock <- newMVar ()
87- -- Bit of a hack to use these TVars, could run the traverseMuxBundle
75+ -- Bit of a hack to use these TVars, could run the traverseConnectionBundle
8876 -- in a reader+state monad instead. That'd be cleaner.
8977 recvQueuesAccum <- newTVarIO []
9078 recvQueuesIx <- newTVarIO 0
9179 chans <-
92- traverseMuxBundle
80+ traverseConnectionBundle
9381 ( newMuxChanSingle @ bundle
9482 bearer
9583 sendLock
9684 recvQueuesIx
9785 recvQueuesAccum
9886 )
99- toFromMuxMsgBundle
87+ toFromBundleMsgBundle
10088 recvQueues <- reverse <$> readTVarIO recvQueuesAccum
10189 let recvQueues' = listArray (0 , length recvQueues - 1 ) recvQueues
10290 _ <- forkIO $ demuxer @ bundle bearer recvQueues'
@@ -105,29 +93,29 @@ newMuxChan bearer = do
10593newMuxChanSingle ::
10694 forall bundle m a .
10795 (MonadMVar m , MonadSTM m ) =>
108- Chan m (BearerMsg (MuxMsg bundle )) ->
96+ Chan m (BearerMsg (BundleMsg bundle )) ->
10997 MVar m () ->
11098 TVar m Int ->
111- TVar m [RecvQueue m (MuxMsg bundle )] ->
112- ToFromMuxMsg ( MuxMsg bundle ) a ->
99+ TVar m [RecvQueue m (BundleMsg bundle )] ->
100+ ToFromBundleMsg ( BundleMsg bundle ) a ->
113101 m (Chan m a )
114102newMuxChanSingle
115103 bearer
116104 sendLock
117105 recvQueuesIx
118106 recvQueuesAccum
119- ToFromMuxMsg {.. } = do
107+ ToFromBundleMsg {.. } = do
120108 queue <- newTQueueIO
121109 i <- atomically $ do
122- modifyTVar recvQueuesAccum (RecvQueue fromMuxMsg queue : )
110+ modifyTVar recvQueuesAccum (RecvQueue fromBundleMsg queue : )
123111 i <- readTVar recvQueuesIx
124112 writeTVar recvQueuesIx $! (i + 1 )
125113 return i
126114 return
127115 Chan
128116 { readChan = atomically (readTQueue queue)
129117 , writeChan = \ msg ->
130- let ! muxmsg = BearerMsg i (toMuxMsg msg)
118+ let ! muxmsg = BearerMsg i (toBundleMsg msg)
131119 in withMVar sendLock $ \ _ -> writeChan bearer muxmsg
132120 }
133121
@@ -137,8 +125,8 @@ data RecvQueue m mm where
137125demuxer ::
138126 forall bundle m .
139127 MonadSTM m =>
140- Chan m (BearerMsg (MuxMsg bundle )) ->
141- Array Int (RecvQueue m (MuxMsg bundle )) ->
128+ Chan m (BearerMsg (BundleMsg bundle )) ->
129+ Array Int (RecvQueue m (BundleMsg bundle )) ->
142130 m ()
143131demuxer bearer queues =
144132 forever $ do
@@ -147,17 +135,6 @@ demuxer bearer queues =
147135 RecvQueue convert queue ->
148136 atomically $ writeTQueue queue $! convert msg
149137
150- newConnectionBundleTCP ::
151- forall bundle m .
152- (MuxBundle bundle , MonadTime m , MonadMonotonicTimeNSec m , MonadDelay m , MonadAsync m , MessageSize (MuxMsg bundle ), MonadMVar m , MonadFork m ) =>
153- Tracer m (LabelTcpDir (TcpEvent (MuxMsg bundle ))) ->
154- TcpConnProps ->
155- m (bundle (Chan m ), bundle (Chan m ))
156- newConnectionBundleTCP tracer tcpprops = do
157- let tracer' = contramap ((fmap . fmap ) fromBearerMsg) tracer
158- (mA, mB) <- newConnectionTCP tracer' tcpprops
159- (,) <$> newMuxChan mA <*> newMuxChan mB
160-
161138data ExampleBundle f = ExampleBundle
162139 { exampleFoo :: f Int
163140 , exampleBar :: f Bool
@@ -167,16 +144,16 @@ data ExampleMsg
167144 = MsgFoo { fromMsgFoo :: Int }
168145 | MsgBar { fromMsgBar :: Bool }
169146
170- instance MuxBundle ExampleBundle where
171- type MuxMsg ExampleBundle = ExampleMsg
147+ instance ConnectionBundle ExampleBundle where
148+ type BundleMsg ExampleBundle = ExampleMsg
172149
173- toFromMuxMsgBundle =
150+ toFromBundleMsgBundle =
174151 ExampleBundle
175- { exampleFoo = ToFromMuxMsg MsgFoo fromMsgFoo
176- , exampleBar = ToFromMuxMsg MsgBar fromMsgBar
152+ { exampleFoo = ToFromBundleMsg MsgFoo fromMsgFoo
153+ , exampleBar = ToFromBundleMsg MsgBar fromMsgBar
177154 }
178155
179- traverseMuxBundle f ExampleBundle {.. } =
156+ traverseConnectionBundle f ExampleBundle {.. } =
180157 ExampleBundle
181158 <$> f exampleFoo
182159 <*> f exampleBar
0 commit comments