Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ library
Ouroboros.Network.Protocol.BlockFetch.Type
Ouroboros.Network.Protocol.ChainSync.Client
Ouroboros.Network.Protocol.ChainSync.ClientPipelined
Ouroboros.Network.Protocol.ChainSync.PipeClient
Ouroboros.Network.Protocol.ChainSync.Codec
Ouroboros.Network.Protocol.ChainSync.PipelineDecision
Ouroboros.Network.Protocol.ChainSync.Server
Expand Down Expand Up @@ -104,6 +105,7 @@ library
io-classes ^>=1.5.0,
nothunks,
ouroboros-network-api ^>=0.11,
pipes,
quiet,
serialise,
si-timers,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes #-}

module Ouroboros.Network.Protocol.ChainSync.PipeClient
where

import Network.TypedProtocol
import Ouroboros.Network.Protocol.ChainSync.Client
import Ouroboros.Network.Protocol.ChainSync.Type
import Pipes

data ChainSyncResponse header point tip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming header everywhere to block. This will be intended to be used with blocks. With the node-to-client protocol, the chain sync provides blocks (whereas for the node-to-node it's used at type header).

= RollForward header tip
| RollBackward point tip
| IntersectFound point tip
| IntersectNotFound tip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good start but I think where we want to go next is to not include the intersect finding in the pipe event stream, but to do all the intersection finding first and then yield the pipe producer (if the intersection finding is successful) for streaming the blocks.

So in the end we want just the roll forwards and roll backwards as pipe events.

And later we might want to rename it something like:

data BlockOrRollback block = NextBlock block | Rollback Point

deriving (Show)

-- | Run a chain sync client as a 'Producer'.
-- Pass a suitable 'Driver', and a list of known 'point's. The 'Producer' will
-- initialize itself by requesting an intersection, and yielding either
-- 'IntersectFound' or 'IntersectNotFound' to indicate the server's response.
-- After that, it will indefinitely request updates and 'yield' them as
-- 'RollForward' and 'RollBackward', respectively.
chainSyncClientProducer :: forall header point tip m dstate
. ( Monad m
)
=> Driver
(ChainSync header point tip)
AsClient
dstate
(Producer (ChainSyncResponse header point tip) m)
-> [point]
-> Producer (ChainSyncResponse header point tip) m dstate
chainSyncClientProducer driver known =
snd <$> runPeerWithDriver driver peer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit, I'm struggling to understand if this can plausibly work in the way we want it to.

I would have thought that runPeerWithDriver will run the peer to the end and not produce anything until it is done, which would not match what we want. My intuition is we'd need to break up runPeerWithDriver and integrate it into this function (fortunately it's not long).

But on the other hand, what you have has the right type. So I'm confused :-)

We might need to just try it out.

where
peer = chainSyncClientPeer (ChainSyncClient runPeer)

runPeer :: Producer
(ChainSyncResponse header point tip)
m
(ClientStIdle header point tip (Producer (ChainSyncResponse header point tip) m) ())
runPeer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, this is the initial intersection finding phase, before going into the main streaming phase. A little renaming would make this clearer.

pure . SendMsgFindIntersect known $
ClientStIntersect
{ recvMsgIntersectFound = \p t -> ChainSyncClient $ do
yield (IntersectFound p t)
runProducer
, recvMsgIntersectNotFound = \t -> ChainSyncClient $ do
yield (IntersectNotFound t)
runProducer
}
runProducer :: Producer
(ChainSyncResponse header point tip)
m
(ClientStIdle header point tip (Producer (ChainSyncResponse header point tip) m) ())
runProducer =
pure . SendMsgRequestNext (pure ()) $
ClientStNext
{ recvMsgRollForward = \h t -> ChainSyncClient $ do
yield (RollForward h t)
runProducer
, recvMsgRollBackward = \p t -> ChainSyncClient $ do
yield (RollBackward p t)
runProducer
}
Loading