11{-# LANGUAGE BlockArguments #-}
22{-# LANGUAGE DataKinds #-}
33{-# LANGUAGE DeriveAnyClass #-}
4+ {-# LANGUAGE DeriveGeneric #-}
45{-# LANGUAGE DerivingStrategies #-}
56{-# LANGUAGE FlexibleContexts #-}
67{-# LANGUAGE LambdaCase #-}
@@ -23,6 +24,7 @@ import Data.Functor ((<&>))
2324import qualified Data.Map.Strict as Map
2425import Data.Typeable (Typeable )
2526import Data.Void (Void )
27+ import GHC.Generics (Generic )
2628import Ouroboros.Consensus.Block
2729import Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
2830 (blockFetchServer' )
@@ -152,6 +154,15 @@ immDBServer codecCfg encAddr decAddr immDB networkMagic = do
152154 , miniProtocolRun = ResponderProtocolOnly proto
153155 }
154156
157+ -- | The ChainSync specification requires sending a rollback instruction to the
158+ -- intersection point right after an intersection has been negotiated. (Opening
159+ -- a connection implicitly negotiates the Genesis point as the intersection.)
160+ data ChainSyncIntersection blk =
161+ JustNegotiatedIntersection ! (Point blk )
162+ | AlreadySentRollbackToIntersection
163+ deriving stock (Generic )
164+ deriving anyclass (NoThunks )
165+
155166chainSyncServer ::
156167 forall m blk a . (IOLike m , HasHeader blk )
157168 => ImmutableDB m blk
@@ -167,15 +178,24 @@ chainSyncServer immDB blockComponent registry = ChainSyncServer $ do
167178 newImmutableDBFollower = do
168179 varIterator <-
169180 newTVarIO =<< ImmutableDB. streamAll immDB registry blockComponent
181+ varIntersection <-
182+ newTVarIO $ JustNegotiatedIntersection GenesisPoint
170183
171- let followerInstructionBlocking = do
172- iterator <- readTVarIO varIterator
173- ImmutableDB. iteratorNext iterator >>= \ case
174- ImmutableDB. IteratorExhausted -> do
175- ImmutableDB. iteratorClose iterator
176- throwIO ReachedImmutableTip
177- ImmutableDB. IteratorResult a ->
178- pure $ AddBlock a
184+ let followerInstructionBlocking =
185+ readTVarIO varIntersection >>= \ case
186+ JustNegotiatedIntersection intersectionPt -> do
187+ atomically $
188+ writeTVar varIntersection AlreadySentRollbackToIntersection
189+ pure $ RollBack intersectionPt
190+ -- Otherwise, get the next block from the iterator (or fail).
191+ AlreadySentRollbackToIntersection -> do
192+ iterator <- readTVarIO varIterator
193+ ImmutableDB. iteratorNext iterator >>= \ case
194+ ImmutableDB. IteratorExhausted -> do
195+ ImmutableDB. iteratorClose iterator
196+ throwIO ReachedImmutableTip
197+ ImmutableDB. IteratorResult a ->
198+ pure $ AddBlock a
179199
180200 followerClose = ImmutableDB. iteratorClose =<< readTVarIO varIterator
181201
@@ -185,7 +205,9 @@ chainSyncServer immDB blockComponent registry = ChainSyncServer $ do
185205 Left _ -> followerForward pts
186206 Right iterator -> do
187207 followerClose
188- atomically $ writeTVar varIterator iterator
208+ atomically $ do
209+ writeTVar varIterator iterator
210+ writeTVar varIntersection $ JustNegotiatedIntersection pt
189211 pure $ Just pt
190212
191213 pure Follower {
0 commit comments