Skip to content

Commit 02f6ff7

Browse files
tbagrel1amesgen
authored andcommitted
[WIP] Clarify and generalize agency for ObjectDiffusion protocol
1 parent 0bc55d2 commit 02f6ff7

File tree

5 files changed

+261
-238
lines changed

5 files changed

+261
-238
lines changed

ouroboros-network-protocols/ouroboros-network-protocols.cabal

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ library
6969
Ouroboros.Network.Protocol.TxSubmission2.Codec
7070
Ouroboros.Network.Protocol.TxSubmission2.Server
7171
Ouroboros.Network.Protocol.TxSubmission2.Type
72-
Ouroboros.Network.Protocol.ObjectDiffusion.Client
72+
Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
7373
Ouroboros.Network.Protocol.ObjectDiffusion.Codec
74-
Ouroboros.Network.Protocol.ObjectDiffusion.Server
74+
Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
7575
Ouroboros.Network.Protocol.ObjectDiffusion.Type
7676

7777
default-language: Haskell2010

ouroboros-network-protocols/src/Ouroboros/Network/Protocol/ObjectDiffusion/Client.hs

Lines changed: 0 additions & 120 deletions
This file was deleted.
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE GADTs #-}
3+
{-# LANGUAGE KindSignatures #-}
4+
{-# LANGUAGE LambdaCase #-}
5+
{-# LANGUAGE ScopedTypeVariables #-}
6+
{-# LANGUAGE TypeApplications #-}
7+
8+
-- | A view of the object diffusion protocol from the point of view of
9+
-- the inbound.
10+
--
11+
-- This provides a view that uses less complex types and should be easier to
12+
-- use than the underlying typed protocol itself.
13+
--
14+
-- For execution, a conversion into the typed protocol is provided.
15+
module Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
16+
( -- * Protocol type for the inbound
17+
18+
-- | The protocol states from the point of view of the inbound.
19+
ObjectDiffusionInboundPipelined (..),
20+
InboundStIdle (..),
21+
Collect (..),
22+
23+
-- * Execution as a typed protocol
24+
objectDiffusionClientInboundPeerPipelined,
25+
objectDiffusionServerInboundPeerPipelined,
26+
)
27+
where
28+
29+
import Data.List.NonEmpty (NonEmpty)
30+
import Network.TypedProtocol.Core
31+
import Network.TypedProtocol.Peer
32+
import Ouroboros.Network.Protocol.ObjectDiffusion.Type
33+
34+
data ObjectDiffusionInboundPipelined objectId object m a where
35+
ObjectDiffusionInboundPipelined ::
36+
m (InboundStIdle Z objectId object m a) ->
37+
ObjectDiffusionInboundPipelined objectId object m a
38+
39+
-- | This is the type of the pipelined results, collected by 'CollectPipelined'.
40+
-- This protocol can pipeline requests for object ids and objects,
41+
-- so we use a sum of either for collecting the responses.
42+
data Collect objectId object
43+
= -- | The result of 'SendMsgRequestObjectIdsPipelined'. It also carries
44+
-- the number of objectIds originally requested.
45+
CollectObjectIds NumObjectIdsToReq [(objectId, SizeInBytes)]
46+
| -- | The result of 'SendMsgRequestObjectsPipelined'. The actual reply only
47+
-- contains the objects sent, but this pairs them up with the
48+
-- objects requested. This is because the peer can determine that
49+
-- some objects are no longer needed.
50+
CollectObjects [objectId] [object]
51+
52+
data InboundStIdle (n :: N) objectId object m a where
53+
SendMsgRequestObjectIdsBlocking ::
54+
-- | number of objectIds to acknowledge
55+
NumObjectIdsToAck ->
56+
-- | number of objectIds to request
57+
NumObjectIdsToReq ->
58+
-- | Result if done
59+
m a ->
60+
( NonEmpty (objectId, SizeInBytes) ->
61+
m (InboundStIdle Z objectId object m a)
62+
) ->
63+
InboundStIdle Z objectId object m a
64+
SendMsgRequestObjectIdsPipelined ::
65+
NumObjectIdsToAck ->
66+
NumObjectIdsToReq ->
67+
m (InboundStIdle (S n) objectId object m a) ->
68+
InboundStIdle n objectId object m a
69+
SendMsgRequestObjectsPipelined ::
70+
[objectId] ->
71+
m (InboundStIdle (S n) objectId object m a) ->
72+
InboundStIdle n objectId object m a
73+
-- | Collect a pipelined result.
74+
CollectPipelined ::
75+
Maybe (InboundStIdle (S n) objectId object m a) ->
76+
(Collect objectId object -> m (InboundStIdle n objectId object m a)) ->
77+
InboundStIdle (S n) objectId object m a
78+
79+
inboundRun :: forall (pr :: PeerRole) (n :: N) objectId object m a.
80+
(Functor m) =>
81+
InboundStIdle n objectId object m a ->
82+
Peer (ObjectDiffusion objectId object) pr (Pipelined n (Collect objectId object)) StIdle m a
83+
84+
inboundRun (SendMsgRequestObjectIdsBlocking ackNo reqNo kDone k) =
85+
Yield undefined
86+
(MsgRequestObjectIds SingBlocking ackNo reqNo)
87+
$ Await undefined
88+
$ \case
89+
MsgDone -> Effect (Done undefined <$> kDone)
90+
MsgReplyObjectIds (BlockingReply objectIds) -> Effect (inboundRun <$> k objectIds)
91+
inboundRun (SendMsgRequestObjectIdsPipelined ackNo reqNo k) =
92+
YieldPipelined undefined
93+
(MsgRequestObjectIds SingNonBlocking ackNo reqNo)
94+
(ReceiverAwait undefined $ \(MsgReplyObjectIds (NonBlockingReply objectIds)) -> ReceiverDone (CollectObjectIds reqNo objectIds))
95+
(Effect (inboundRun <$> k))
96+
inboundRun (SendMsgRequestObjectsPipelined objectIds k) =
97+
YieldPipelined undefined
98+
(MsgRequestObjects objectIds)
99+
(ReceiverAwait undefined $ \(MsgReplyObjects objects) -> ReceiverDone (CollectObjects objectIds objects))
100+
(Effect (inboundRun <$> k))
101+
inboundRun (CollectPipelined mNone collect) =
102+
Collect
103+
(fmap inboundRun mNone)
104+
(Effect . fmap inboundRun . collect)
105+
106+
-- | Transform a 'ObjectDiffusionInboundPipelined' into a 'PeerPipelined'.
107+
objectDiffusionClientInboundPeerPipelined ::
108+
forall objectId object m a.
109+
(Functor m) =>
110+
ObjectDiffusionInboundPipelined objectId object m a ->
111+
PeerPipelined (ObjectDiffusion objectId object) 'AsClient StInit m a
112+
objectDiffusionClientInboundPeerPipelined (ObjectDiffusionInboundPipelined inboundSt) =
113+
PeerPipelined $
114+
Yield undefined MsgInit $
115+
Effect $
116+
inboundRun <$> inboundSt
117+
118+
objectDiffusionServerInboundPeerPipelined ::
119+
forall objectId object m a.
120+
(Functor m) =>
121+
ObjectDiffusionInboundPipelined objectId object m a ->
122+
PeerPipelined (ObjectDiffusion objectId object) 'AsServer StInit m a
123+
objectDiffusionServerInboundPeerPipelined (ObjectDiffusionInboundPipelined inboundSt) =
124+
PeerPipelined $
125+
Await @_ @_ @(Pipelined Z (Collect objectId object)) undefined
126+
(\MsgInit -> Effect (inboundRun <$> inboundSt))
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE GADTs #-}
3+
{-# LANGUAGE KindSignatures #-}
4+
{-# LANGUAGE NamedFieldPuns #-}
5+
{-# LANGUAGE RankNTypes #-}
6+
{-# LANGUAGE ScopedTypeVariables #-}
7+
8+
-- | A view of the object diffusion protocol from the point of view of
9+
-- the outbound peer.
10+
--
11+
-- This provides a view that uses less complex types and should be easier to
12+
-- use than the underlying typed protocol itself.
13+
--
14+
-- For execution, 'objectDiffusionOutboundPeer' is provided for conversion
15+
-- into the typed protocol.
16+
module Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
17+
( -- * Protocol type for the outbound
18+
19+
-- | The protocol states from the point of view of the outbound.
20+
ObjectDiffusionOutbound (..),
21+
OutboundStIdle (..),
22+
OutboundStObjectIds (..),
23+
OutboundStObjects (..),
24+
SingBlockingStyle (..),
25+
BlockingReplyList (..),
26+
27+
-- * Execution as a typed protocol
28+
objectDiffusionServerOutboundPeer,
29+
objectDiffusionClientOutboundPeer,
30+
)
31+
where
32+
33+
import Network.TypedProtocol.Core
34+
import Network.TypedProtocol.Peer
35+
import Ouroboros.Network.Protocol.ObjectDiffusion.Type
36+
37+
-- | The outbound side of the object diffusion protocol.
38+
--
39+
-- The peer in the outbound role submits objects to the peer in the server
40+
-- role.
41+
newtype ObjectDiffusionOutbound txid tx m a = ObjectDiffusionOutbound
42+
{ runObjectDiffusionOutbound :: m (OutboundStIdle txid tx m a)
43+
}
44+
45+
-- | In the 'StIdle' protocol state, the outbound does not have agency. Instead
46+
-- it is waiting for:
47+
--
48+
-- * a request for object ids (blocking or non-blocking)
49+
-- * a request for a given list of objects
50+
-- * a termination message
51+
--
52+
-- It must be prepared to handle any of these.
53+
data OutboundStIdle txid tx m a = OutboundStIdle
54+
{ recvMsgRequestObjectIds ::
55+
forall blocking.
56+
SingBlockingStyle blocking ->
57+
NumObjectIdsToAck ->
58+
NumObjectIdsToReq ->
59+
m (OutboundStObjectIds blocking txid tx m a),
60+
recvMsgRequestObjects ::
61+
[txid] ->
62+
m (OutboundStObjects txid tx m a)
63+
}
64+
65+
data OutboundStObjectIds blocking txid tx m a where
66+
SendMsgReplyObjectIds ::
67+
BlockingReplyList blocking (txid, SizeInBytes) ->
68+
OutboundStIdle txid tx m a ->
69+
OutboundStObjectIds blocking txid tx m a
70+
-- | In the blocking case, the outbound can terminate the protocol. This could
71+
-- be used when the outbound knows there will be no more objects to submit.
72+
SendMsgDone :: a -> OutboundStObjectIds StBlocking txid tx m a
73+
74+
data OutboundStObjects txid tx m a where
75+
SendMsgReplyObjects ::
76+
[tx] ->
77+
OutboundStIdle txid tx m a ->
78+
OutboundStObjects txid tx m a
79+
80+
outboundRun ::
81+
forall (pr :: PeerRole) txid tx m a.
82+
(Monad m) =>
83+
OutboundStIdle txid tx m a ->
84+
Peer (ObjectDiffusion txid tx) pr NonPipelined StIdle m a
85+
outboundRun OutboundStIdle {recvMsgRequestObjectIds, recvMsgRequestObjects} =
86+
Await undefined $ \msg -> case msg of
87+
MsgRequestObjectIds blocking ackNo reqNo -> Effect $ do
88+
reply <- recvMsgRequestObjectIds blocking ackNo reqNo
89+
case reply of
90+
SendMsgReplyObjectIds txids k ->
91+
-- TODO: investigate why GHC cannot infer `SingI`; it used to in
92+
-- `coot/typed-protocols-rewrite` branch
93+
return $ case blocking of
94+
SingBlocking ->
95+
Yield undefined
96+
(MsgReplyObjectIds txids)
97+
(outboundRun k)
98+
SingNonBlocking ->
99+
Yield undefined
100+
(MsgReplyObjectIds txids)
101+
(outboundRun k)
102+
SendMsgDone result ->
103+
return $
104+
Yield undefined
105+
MsgDone
106+
(Done undefined result)
107+
MsgRequestObjects txids -> Effect $ do
108+
SendMsgReplyObjects txs k <- recvMsgRequestObjects txids
109+
return $
110+
Yield undefined
111+
(MsgReplyObjects txs)
112+
(outboundRun k)
113+
114+
-- | A non-pipelined 'Peer' representing the 'ObjectDiffusionOutbound'.
115+
objectDiffusionServerOutboundPeer ::
116+
forall txid tx m a.
117+
(Monad m) =>
118+
ObjectDiffusionOutbound txid tx m a ->
119+
Peer (ObjectDiffusion txid tx) 'AsServer NonPipelined StInit m a
120+
objectDiffusionServerOutboundPeer (ObjectDiffusionOutbound outboundSt) =
121+
-- We need to assist GHC to infer the existentially quantified `c` as
122+
-- `Collect objectId object`
123+
Await undefined
124+
(\MsgInit -> Effect (outboundRun <$> outboundSt))
125+
126+
objectDiffusionClientOutboundPeer ::
127+
forall txid tx m a.
128+
(Monad m) =>
129+
ObjectDiffusionOutbound txid tx m a ->
130+
Peer (ObjectDiffusion txid tx) 'AsClient NonPipelined StInit m a
131+
objectDiffusionClientOutboundPeer (ObjectDiffusionOutbound outboundSt) =
132+
Yield undefined MsgInit $
133+
Effect $ outboundRun <$> outboundSt

0 commit comments

Comments
 (0)