1
1
{-# LANGUAGE BlockArguments #-}
2
2
{-# LANGUAGE DataKinds #-}
3
3
{-# LANGUAGE DeriveAnyClass #-}
4
+ {-# LANGUAGE DeriveGeneric #-}
4
5
{-# LANGUAGE DerivingStrategies #-}
5
6
{-# LANGUAGE FlexibleContexts #-}
6
7
{-# LANGUAGE LambdaCase #-}
@@ -23,6 +24,7 @@ import Data.Functor ((<&>))
23
24
import qualified Data.Map.Strict as Map
24
25
import Data.Typeable (Typeable )
25
26
import Data.Void (Void )
27
+ import GHC.Generics (Generic )
26
28
import Ouroboros.Consensus.Block
27
29
import Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
28
30
(blockFetchServer' )
@@ -152,6 +154,15 @@ immDBServer codecCfg encAddr decAddr immDB networkMagic = do
152
154
, miniProtocolRun = ResponderProtocolOnly proto
153
155
}
154
156
157
+ -- | The ChainSync specification requires sending a rollback instruction to the
158
+ -- intersection point right after an intersection has been negotiated. (Opening
159
+ -- a connection implicitly negotiates the Genesis point as the intersection.)
160
+ data ChainSyncIntersection blk =
161
+ JustNegotiatedIntersection ! (Point blk )
162
+ | AlreadySentRollbackToIntersection
163
+ deriving stock (Generic )
164
+ deriving anyclass (NoThunks )
165
+
155
166
chainSyncServer ::
156
167
forall m blk a . (IOLike m , HasHeader blk )
157
168
=> ImmutableDB m blk
@@ -167,15 +178,24 @@ chainSyncServer immDB blockComponent registry = ChainSyncServer $ do
167
178
newImmutableDBFollower = do
168
179
varIterator <-
169
180
newTVarIO =<< ImmutableDB. streamAll immDB registry blockComponent
181
+ varIntersection <-
182
+ newTVarIO $ JustNegotiatedIntersection GenesisPoint
170
183
171
- let followerInstructionBlocking = do
172
- iterator <- readTVarIO varIterator
173
- ImmutableDB. iteratorNext iterator >>= \ case
174
- ImmutableDB. IteratorExhausted -> do
175
- ImmutableDB. iteratorClose iterator
176
- throwIO ReachedImmutableTip
177
- ImmutableDB. IteratorResult a ->
178
- pure $ AddBlock a
184
+ let followerInstructionBlocking =
185
+ readTVarIO varIntersection >>= \ case
186
+ JustNegotiatedIntersection intersectionPt -> do
187
+ atomically $
188
+ writeTVar varIntersection AlreadySentRollbackToIntersection
189
+ pure $ RollBack intersectionPt
190
+ -- Otherwise, get the next block from the iterator (or fail).
191
+ AlreadySentRollbackToIntersection -> do
192
+ iterator <- readTVarIO varIterator
193
+ ImmutableDB. iteratorNext iterator >>= \ case
194
+ ImmutableDB. IteratorExhausted -> do
195
+ ImmutableDB. iteratorClose iterator
196
+ throwIO ReachedImmutableTip
197
+ ImmutableDB. IteratorResult a ->
198
+ pure $ AddBlock a
179
199
180
200
followerClose = ImmutableDB. iteratorClose =<< readTVarIO varIterator
181
201
@@ -185,7 +205,9 @@ chainSyncServer immDB blockComponent registry = ChainSyncServer $ do
185
205
Left _ -> followerForward pts
186
206
Right iterator -> do
187
207
followerClose
188
- atomically $ writeTVar varIterator iterator
208
+ atomically $ do
209
+ writeTVar varIterator iterator
210
+ writeTVar varIntersection $ JustNegotiatedIntersection pt
189
211
pure $ Just pt
190
212
191
213
pure Follower {
0 commit comments