Skip to content

Commit 76217aa

Browse files
facundominguezneilmayhew
authored andcommitted
Implement a call to rotate dynamos in CSJ
1 parent f596da3 commit 76217aa

File tree

1 file changed

+119
-38
lines changed
  • ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client

1 file changed

+119
-38
lines changed

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/Jumping.hs

Lines changed: 119 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@
7474
-- when the client should pause, download headers, or ask about agreement with
7575
-- a given point (jumping). See the 'Jumping' type for more details.
7676
--
77+
-- Interactions with the BlockFetch logic
78+
-- --------------------------------------
79+
--
80+
-- When syncing, the BlockFetch logic will fetch blocks from the dynamo. If the
81+
-- dynamo is responding too slowly, the BlockFetch logic can ask to change the
82+
-- dynamo with a call to 'rotateDynamo'.
83+
--
7784
-- Interactions with the Limit on Patience
7885
-- ---------------------------------------
7986
--
@@ -100,15 +107,15 @@
100107
--
101108
-- > j ╔════════╗
102109
-- > ╭────────── ║ Dynamo ║ ◀─────────╮
103-
-- > │ ╚════════╝ │f
104-
-- > ▼ ▲ │
105-
-- > ┌────────────┐ │ k ┌──────────┐
106-
-- > │ Disengaged │ ◀──────────│────────── │ Objector │
107-
-- > └────────────┘ ╭─────│────────── └──────────┘
108-
-- > │ │ ▲ ▲ │
109-
-- > g│ │e b │ │ │
110-
-- > │ │ ╭─────╯ i│ │c
111-
-- > ╭╌╌╌╌╌╌▼╌╌╌╌╌╌╌╌╌╌╌╌╌│╌╌╌╌╌╌╌╌╌╌│╌▼╌╌╌╮
110+
-- > │ ╭──╚════════╝ │f
111+
-- > ▼ ▲ │
112+
-- > ┌────────────┐ │ k ┌──────────┐
113+
-- > │ Disengaged │ ◀──────────│────────── │ Objector │
114+
-- > └────────────┘ ╭─────│────────── └──────────┘
115+
-- > │ │ ▲ ▲ │
116+
-- > l│ g│ │e b │ │ │
117+
-- > │ │ ╭─────╯ i│ │c
118+
-- > ╭╌╌╌╌╌╌▼╌╌╌╌╌╌╌╌╌╌╌╌╌│╌╌╌╌╌╌╌╌╌╌│╌▼╌╌╌╮
112119
-- > ┆ ╔═══════╗ a ┌──────┐ d ┌─────┐ |
113120
-- > ┆ ║ Happy ║ ───▶ │ LFI* │ ───▶ │ FI* │ |
114121
-- > ┆ ╚═══════╝ ◀─╮ └──────┘ └─────┘ |
@@ -147,26 +154,34 @@
147154
-- If dynamo or objector claim to have no more headers, they are disengaged
148155
-- (j|k).
149156
--
157+
-- The BlockFetch logic can ask to change the dynamo if it is not serving blocks
158+
-- fast enough. If there are other non-disengaged peers the dynamo is demoted to
159+
-- a jumper (l) and a new dynamo is elected.
160+
--
150161
module Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping (
151162
Context
152163
, ContextWith (..)
153164
, Instruction (..)
154165
, JumpInstruction (..)
155166
, JumpResult (..)
156167
, Jumping (..)
168+
, getDynamo
157169
, makeContext
158170
, mkJumping
159171
, noJumping
160172
, registerClient
173+
, rotateDynamo
161174
, unregisterClient
162175
) where
163176

164177
import Cardano.Slotting.Slot (SlotNo (..), WithOrigin (..))
165-
import Control.Monad (forM, forM_, when)
178+
import Control.Monad (forM, forM_, void, when)
166179
import Data.Foldable (toList)
167180
import Data.List (sortOn)
181+
import qualified Data.Map as Map
168182
import Data.Maybe (catMaybes, fromMaybe)
169183
import Data.Maybe.Strict (StrictMaybe (..))
184+
import Data.Sequence.Strict (StrictSeq)
170185
import qualified Data.Sequence.Strict as Seq
171186
import GHC.Generics (Generic)
172187
import Ouroboros.Consensus.Block (HasHeader (getHeaderFields), Header,
@@ -460,7 +475,7 @@ onRollBackward context slot =
460475
Dynamo _ lastJumpSlot
461476
| slot < lastJumpSlot -> do
462477
disengage (handle context)
463-
electNewDynamo (stripContext context)
478+
void $ electNewDynamo (stripContext context)
464479
| otherwise -> pure ()
465480

466481
-- | This function is called when we receive a 'MsgAwaitReply' message.
@@ -478,7 +493,7 @@ onAwaitReply context =
478493
readTVar (cschJumping (handle context)) >>= \case
479494
Dynamo{} -> do
480495
disengage (handle context)
481-
electNewDynamo (stripContext context)
496+
void $ electNewDynamo (stripContext context)
482497
Objector{} -> do
483498
disengage (handle context)
484499
electNewObjector (stripContext context)
@@ -511,7 +526,7 @@ processJumpResult context jumpResult =
511526
updateChainSyncState (handle context) jumpInfo
512527
RejectedJump JumpToGoodPoint{} -> do
513528
startDisengaging (handle context)
514-
electNewDynamo (stripContext context)
529+
void $ electNewDynamo (stripContext context)
515530

516531
-- Not interesting in the dynamo state
517532
AcceptedJump JumpTo{} -> pure ()
@@ -662,10 +677,10 @@ updateJumpInfo context jumpInfo =
662677
getDynamo ::
663678
(MonadSTM m) =>
664679
ChainSyncClientHandleCollection peer m blk ->
665-
STM m (Maybe (ChainSyncClientHandle m blk))
680+
STM m (Maybe (peer, ChainSyncClientHandle m blk))
666681
getDynamo handlesCol = do
667682
handles <- cschcSeq handlesCol
668-
fmap snd <$> findM (\(_, handle) -> isDynamo <$> readTVar (cschJumping handle)) handles
683+
findM (\(_, handle) -> isDynamo <$> readTVar (cschJumping handle)) handles
669684
where
670685
isDynamo Dynamo{} = True
671686
isDynamo _ = False
@@ -720,7 +735,7 @@ registerClient context peer csState mkHandle = do
720735
Nothing -> do
721736
fragment <- csCandidate <$> readTVar csState
722737
pure $ Dynamo DynamoStarted $ pointSlot $ AF.anchorPoint fragment
723-
Just handle -> do
738+
Just (_, handle) -> do
724739
mJustInfo <- readTVar (cschJumpInfo handle)
725740
newJumper mJustInfo (Happy FreshJumper Nothing)
726741
cschJumping <- newTVar csjState
@@ -744,7 +759,52 @@ unregisterClient context = do
744759
Disengaged{} -> pure ()
745760
Jumper{} -> pure ()
746761
Objector{} -> electNewObjector context'
747-
Dynamo{} -> electNewDynamo context'
762+
Dynamo{} -> void $ electNewDynamo context'
763+
764+
-- | Elects a new dynamo by demoting the given dynamo to a jumper, moving the
765+
-- peer to the end of the queue of chain sync handles and electing a new dynamo.
766+
--
767+
-- It does nothing if there is no other engaged peer to elect or if the given
768+
-- peer is not the dynamo.
769+
--
770+
-- Yields the new dynamo, if there is one.
771+
rotateDynamo ::
772+
( Ord peer,
773+
LedgerSupportsProtocol blk,
774+
MonadSTM m
775+
) =>
776+
ChainSyncClientHandleCollection peer m blk ->
777+
peer ->
778+
STM m (Maybe (peer, ChainSyncClientHandle m blk))
779+
rotateDynamo handlesCol peer = do
780+
handles <- cschcMap handlesCol
781+
case handles Map.!? peer of
782+
Nothing ->
783+
-- Do not re-elect a dynamo if the peer has been disconnected.
784+
getDynamo handlesCol
785+
Just oldDynHandle ->
786+
readTVar (cschJumping oldDynHandle) >>= \case
787+
Dynamo{} -> do
788+
cschcRotateHandle handlesCol peer
789+
peerStates <- cschcSeq handlesCol
790+
mEngaged <- findNonDisengaged peerStates
791+
case mEngaged of
792+
Nothing ->
793+
-- There are no engaged peers. This case cannot happen, as the
794+
-- dynamo is always engaged.
795+
error "rotateDynamo: no engaged peer found"
796+
Just (newDynamoId, newDynHandle)
797+
| newDynamoId == peer ->
798+
-- The old dynamo is the only engaged peer left.
799+
pure $ Just (newDynamoId, newDynHandle)
800+
| otherwise -> do
801+
newJumper Nothing (Happy FreshJumper Nothing)
802+
>>= writeTVar (cschJumping oldDynHandle)
803+
promoteToDynamo peerStates newDynamoId newDynHandle
804+
pure $ Just (newDynamoId, newDynHandle)
805+
_ ->
806+
-- Do not re-elect a dynamo if the peer is not the dynamo.
807+
getDynamo handlesCol
748808

749809
-- | Choose an unspecified new non-idling dynamo and demote all other peers to
750810
-- jumpers.
@@ -754,32 +814,53 @@ electNewDynamo ::
754814
LedgerSupportsProtocol blk
755815
) =>
756816
Context m peer blk ->
757-
STM m ()
817+
STM m (Maybe (peer, ChainSyncClientHandle m blk))
758818
electNewDynamo context = do
759819
peerStates <- cschcSeq (handlesCol context)
760820
mDynamo <- findNonDisengaged peerStates
761821
case mDynamo of
762-
Nothing -> pure ()
822+
Nothing -> pure Nothing
763823
Just (dynId, dynamo) -> do
764-
fragment <- csCandidate <$> readTVar (cschState dynamo)
765-
mJumpInfo <- readTVar (cschJumpInfo dynamo)
766-
-- If there is no jump info, the dynamo must be just starting and
767-
-- there is no need to set the intersection of the ChainSync server.
768-
let dynamoInitState = maybe DynamoStarted DynamoStarting mJumpInfo
769-
writeTVar (cschJumping dynamo) $
770-
Dynamo dynamoInitState $ pointSlot $ AF.headPoint fragment
771-
-- Demote all other peers to jumpers
772-
forM_ peerStates $ \(peer, st) ->
773-
when (peer /= dynId) $ do
774-
jumpingState <- readTVar (cschJumping st)
775-
when (not (isDisengaged jumpingState)) $
776-
newJumper mJumpInfo (Happy FreshJumper Nothing)
777-
>>= writeTVar (cschJumping st)
778-
where
779-
findNonDisengaged =
780-
findM $ \(_, st) -> not . isDisengaged <$> readTVar (cschJumping st)
781-
isDisengaged Disengaged{} = True
782-
isDisengaged _ = False
824+
promoteToDynamo peerStates dynId dynamo
825+
pure $ Just (dynId, dynamo)
826+
827+
-- | Promote the given peer to dynamo and demote all other peers to jumpers.
828+
promoteToDynamo ::
829+
( MonadSTM m,
830+
Eq peer,
831+
LedgerSupportsProtocol blk
832+
) =>
833+
StrictSeq (peer, ChainSyncClientHandle m blk) ->
834+
peer ->
835+
ChainSyncClientHandle m blk ->
836+
STM m ()
837+
promoteToDynamo peerStates dynId dynamo = do
838+
fragment <- csCandidate <$> readTVar (cschState dynamo)
839+
mJumpInfo <- readTVar (cschJumpInfo dynamo)
840+
-- If there is no jump info, the dynamo must be just starting and
841+
-- there is no need to set the intersection of the ChainSync server.
842+
let dynamoInitState = maybe DynamoStarted DynamoStarting mJumpInfo
843+
writeTVar (cschJumping dynamo) $
844+
Dynamo dynamoInitState $ pointSlot $ AF.headPoint fragment
845+
-- Demote all other peers to jumpers
846+
forM_ peerStates $ \(peer, st) ->
847+
when (peer /= dynId) $ do
848+
jumpingState <- readTVar (cschJumping st)
849+
when (not (isDisengaged jumpingState)) $
850+
newJumper mJumpInfo (Happy FreshJumper Nothing)
851+
>>= writeTVar (cschJumping st)
852+
853+
-- | Find a non-disengaged peer in the given sequence
854+
findNonDisengaged ::
855+
(MonadSTM m) =>
856+
StrictSeq (peer, ChainSyncClientHandle m blk) ->
857+
STM m (Maybe (peer, ChainSyncClientHandle m blk))
858+
findNonDisengaged =
859+
findM $ \(_, st) -> not . isDisengaged <$> readTVar (cschJumping st)
860+
861+
isDisengaged :: ChainSyncJumpingState m blk -> Bool
862+
isDisengaged Disengaged{} = True
863+
isDisengaged _ = False
783864

784865
findM :: (Foldable f, Monad m) => (a -> m Bool) -> f a -> m (Maybe a)
785866
findM p =

0 commit comments

Comments
 (0)