Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ openLedgerDB [email protected]{LedgerDB.lgrFlavorArgs = LedgerDB.L
bss
(\_ -> error "no replay")
snapManager
(LedgerDB.praosGetVolatileSuffix $ LedgerDB.ledgerDbCfgSecParam $ LedgerDB.lgrConfig lgrDbArgs)
)
snapManager
emptyStream
Expand All @@ -92,6 +93,7 @@ openLedgerDB [email protected]{LedgerDB.lgrFlavorArgs = LedgerDB.L
bss'
(\_ -> error "no replay")
snapManager
(LedgerDB.praosGetVolatileSuffix $ LedgerDB.ledgerDbCfgSecParam $ LedgerDB.lgrConfig lgrDbArgs)
)
snapManager
emptyStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ traceChainDBEventTestBlockWith tracer = \case
trace $ "Switched to a fork; now: " ++ terseHFragment newFragment
StoreButDontChange point ->
trace $ "Did not select block due to LoE: " ++ terseRealPoint point
IgnoreBlockOlderThanK point ->
trace $ "Ignored block older than k: " ++ terseRealPoint point
IgnoreBlockOlderThanImmTip point ->
trace $ "Ignored block older than imm tip: " ++ terseRealPoint point
ChainSelectionLoEDebug curChain (LoEEnabled loeFrag0) -> do
trace $ "Current chain: " ++ terseHFragment curChain
trace $ "LoE fragment: " ++ terseHFragment loeFrag0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Breaking

- Renamed `IgnoreBlockOlderThanK` to `IgnoreBlockOlderThanImmTip` for future-proofing.
- Renamed and simplified `olderThanK` to `olderThanImmTip`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Breaking

- LedgerDB: generalized over the criterion used to determine which states are
volatile/immutable, in preparation for Ouroboros Peras.

Concretely, `LedgerDB.openDB` takes a new argument, `GetVolatileSuffix m blk`.
For Praos behavior, use `praosGetVolatileSuffix`.
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,10 @@ checkKnownIntersectionInvariants ::
( HasHeader blk
, HasHeader (Header blk)
, HasAnnTip blk
, ConsensusProtocol (BlockProtocol blk)
) =>
ConsensusConfig (BlockProtocol blk) ->
KnownIntersectionState blk ->
Either String ()
checkKnownIntersectionInvariants cfg kis
checkKnownIntersectionInvariants kis
-- 'theirHeaderStateHistory' invariant
| let HeaderStateHistory snapshots = theirHeaderStateHistory
historyTips :: [WithOrigin (AnnTip blk)]
Expand All @@ -722,19 +720,6 @@ checkKnownIntersectionInvariants cfg kis
, show fragmentAnchorPoint
]
-- 'ourFrag' invariants
| let nbHeaders = AF.length ourFrag
ourAnchorPoint = AF.anchorPoint ourFrag
, nbHeaders < fromIntegral (unNonZero k)
, ourAnchorPoint /= GenesisPoint =
throwError $
unwords
[ "ourFrag contains fewer than k headers and not close to genesis:"
, show nbHeaders
, "vs"
, show k
, "with anchor"
, show ourAnchorPoint
]
| let ourFragAnchor = AF.anchorPoint ourFrag
theirFragAnchor = AF.anchorPoint theirFrag
, ourFragAnchor /= castPoint theirFragAnchor =
Expand All @@ -760,8 +745,6 @@ checkKnownIntersectionInvariants cfg kis
| otherwise =
return ()
where
SecurityParam k = protocolSecurityParam cfg

KnownIntersectionState
{ mostRecentIntersection
, ourFrag
Expand All @@ -773,14 +756,12 @@ assertKnownIntersectionInvariants ::
( HasHeader blk
, HasHeader (Header blk)
, HasAnnTip blk
, ConsensusProtocol (BlockProtocol blk)
, HasCallStack
) =>
ConsensusConfig (BlockProtocol blk) ->
KnownIntersectionState blk ->
KnownIntersectionState blk
assertKnownIntersectionInvariants cfg kis =
assertWithMsg (checkKnownIntersectionInvariants cfg kis) kis
assertKnownIntersectionInvariants kis =
assertWithMsg (checkKnownIntersectionInvariants kis) kis

{-------------------------------------------------------------------------------
The ChainSync client definition
Expand Down Expand Up @@ -891,8 +872,7 @@ chainSyncClient cfgEnv dynEnv =
(ForkTooDeep GenesisPoint)
where
ConfigEnv
{ cfg
, chainDbView
{ chainDbView
, tracer
} = cfgEnv

Expand Down Expand Up @@ -994,7 +974,7 @@ chainSyncClient cfgEnv dynEnv =
-- we will /never/ adopt them, which is handled in the "no
-- more intersection case".
StillIntersects () $
assertKnownIntersectionInvariants (configConsensus cfg) $
assertKnownIntersectionInvariants $
KnownIntersectionState
{ mostRecentIntersection = castPoint intersection
, ourFrag = ourFrag'
Expand Down Expand Up @@ -1157,7 +1137,7 @@ findIntersectionTop cfgEnv dynEnv intEnv =
(ourTipFromChain ourFrag)
theirTip
let kis =
assertKnownIntersectionInvariants (configConsensus cfg) $
assertKnownIntersectionInvariants $
KnownIntersectionState
{ mostRecentIntersection = intersection
, ourFrag
Expand Down Expand Up @@ -1233,7 +1213,6 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv =
ConfigEnv
{ mkPipelineDecision0
, tracer
, cfg
, historicityCheck
} = cfgEnv

Expand Down Expand Up @@ -1621,9 +1600,8 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv =
else mostRecentIntersection

kis' =
assertKnownIntersectionInvariants
(configConsensus cfg)
$ KnownIntersectionState
assertKnownIntersectionInvariants $
KnownIntersectionState
{ mostRecentIntersection = mostRecentIntersection'
, ourFrag = ourFrag
, theirFrag = theirFrag'
Expand Down Expand Up @@ -1960,7 +1938,7 @@ checkValid cfgEnv intEnv hdr hdrSlotTime theirTip kis ledgerView = do
traceWith (tracer cfgEnv) $ TraceValidatedHeader hdr

pure $
assertKnownIntersectionInvariants (configConsensus cfg) $
assertKnownIntersectionInvariants $
KnownIntersectionState
{ mostRecentIntersection = mostRecentIntersection'
, ourFrag = ourFrag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import Data.Functor.Contravariant ((>$<))
import qualified Data.Map.Strict as Map
import Data.Maybe.Strict (StrictMaybe (..))
import GHC.Stack (HasCallStack)
import NoThunks.Class
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import qualified Ouroboros.Consensus.Fragment.Validated as VF
Expand Down Expand Up @@ -86,6 +87,7 @@ import Ouroboros.Consensus.Util.STM
( Fingerprint (..)
, WithFingerprint (..)
)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.BlockFetch.ConsensusInterface
( ChainSelStarvation (..)
Expand Down Expand Up @@ -160,12 +162,15 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
(chainDB, testing, env) <- lift $ do
traceWith tracer $ TraceOpenEvent (OpenedVolatileDB maxSlot)
traceWith tracer $ TraceOpenEvent StartedOpeningLgrDB
(ledgerDbGetVolatileSuffix, setGetCurrentChainForLedgerDB) <-
mkLedgerDbGetVolatileSuffix
(lgrDB, replayed) <-
LedgerDB.openDB
argsLgrDb
(ImmutableDB.streamAPI immutableDB)
immutableDbTipPoint
(Query.getAnyKnownBlock immutableDB volatileDB)
ledgerDbGetVolatileSuffix
traceWith tracer $ TraceOpenEvent OpenedLgrDB

varInvalid <- newTVarIO (WithFingerprint Map.empty (Fingerprint 0))
Expand Down Expand Up @@ -246,6 +251,9 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, cdbLoE = Args.cdbsLoE cdbSpecificArgs
, cdbChainSelStarvation = varChainSelStarvation
}

setGetCurrentChainForLedgerDB $ Query.getCurrentChain env

h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
let chainDB =
API.ChainDB
Expand Down Expand Up @@ -304,6 +312,38 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
tracer = Args.cdbsTracer cdbSpecificArgs
Args.ChainDbArgs argsImmutableDb argsVolatileDb argsLgrDb cdbSpecificArgs = args

-- The LedgerDB requires a criterion ('LedgerDB.GetVolatileSuffix')
-- determining which of its states are volatile/immutable. Once we have
-- initialized the ChainDB we can defer this decision to
-- 'Query.getCurrentChain'.
--
-- However, we initialize the LedgerDB before the ChainDB (for initial chain
-- selection), so during that period, we temporarily consider no state (apart
-- from the anchor state) as immutable. This is fine as we don't perform eg
-- any rollbacks during this period.
mkLedgerDbGetVolatileSuffix ::
m
( LedgerDB.GetVolatileSuffix m blk
, STM m (AnchoredFragment (Header blk)) -> m ()
)
mkLedgerDbGetVolatileSuffix = do
varGetCurrentChain ::
StrictTMVar m (OnlyCheckWhnf (STM m (AnchoredFragment (Header blk)))) <-
newEmptyTMVarIO
let getVolatileSuffix =
LedgerDB.GetVolatileSuffix $
tryReadTMVar varGetCurrentChain >>= \case
-- If @setVarChain@ has not yet been invoked, return the entire
-- suffix as volatile.
Nothing -> pure id
-- Otherwise, return the suffix with the same length as the
-- current chain.
Just (OnlyCheckWhnf getCurrentChain) -> do
curChainLen <- AF.length <$> getCurrentChain
pure $ AF.anchorNewest (fromIntegral curChainLen)
setVarChain = atomically . writeTMVar varGetCurrentChain . OnlyCheckWhnf
pure (getVolatileSuffix, setVarChain)

-- | We use 'runInnerWithTempRegistry' for the component databases.
innerOpenCont ::
IOLike m =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background
, addBlockRunner
) where

import Cardano.Ledger.BaseTypes (unNonZero)
import Control.Exception (assert)
import Control.Monad (forM_, forever, void)
import Control.Monad.Trans.Class (lift)
Expand All @@ -57,7 +56,6 @@ import Data.Word
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HardFork.Abstract
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
Expand All @@ -69,6 +67,7 @@ import Ouroboros.Consensus.Storage.ChainDB.API
import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
( chainSelSync
)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Query as Query
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
Expand Down Expand Up @@ -132,10 +131,11 @@ launchBgTasks cdb@CDB{..} replayed = do
Copying blocks from the VolatileDB to the ImmutableDB
-------------------------------------------------------------------------------}

-- | Copy the blocks older than @k@ from the VolatileDB to the ImmutableDB.
-- | Copy the blocks older than the immutable tip from the VolatileDB to the
-- ImmutableDB.
--
-- These headers of these blocks can be retrieved by dropping the @k@ most
-- recent blocks from the fragment stored in 'cdbChain'.
-- The headers of these blocks can be retrieved by considering headers in
-- 'cdbChain' that are not also in 'getCurrentChain' (a suffix of 'cdbChain').
--
-- The copied blocks are removed from the fragment stored in 'cdbChain'.
--
Expand All @@ -153,10 +153,11 @@ copyToImmutableDB ::
) =>
ChainDbEnv m blk ->
Electric m (WithOrigin SlotNo)
copyToImmutableDB CDB{..} = electric $ do
copyToImmutableDB cdb@CDB{..} = electric $ do
toCopy <- atomically $ do
curChain <- icWithoutTime <$> readTVar cdbChain
let nbToCopy = max 0 (AF.length curChain - fromIntegral (unNonZero k))
curChainVolSuffix <- Query.getCurrentChain cdb
let nbToCopy = max 0 $ AF.length curChain - AF.length curChainVolSuffix
toCopy :: [Point blk]
toCopy =
map headerPoint $
Expand All @@ -165,10 +166,10 @@ copyToImmutableDB CDB{..} = electric $ do
return toCopy

if null toCopy
-- This can't happen in practice, as we're only called when the fragment
-- is longer than @k@. However, in the tests, we will be calling this
-- function manually, which means it might be called when there are no
-- blocks to copy.
-- This can't happen in practice, as we're only called when there are new
-- immutable blocks. However, in the tests, we will be calling this function
-- manually, which means it might be called when there are no blocks to
-- copy.
then trace NoBlocksToCopyToImmutableDB
else forM_ toCopy $ \pt -> do
let hash = case pointHash pt of
Expand All @@ -193,7 +194,6 @@ copyToImmutableDB CDB{..} = electric $ do
-- Get the /possibly/ updated tip of the ImmutableDB
atomically $ ImmutableDB.getTipSlot cdbImmutableDB
where
SecurityParam k = configSecurityParam cdbTopLevelConfig
trace = traceWith (contramap TraceCopyToImmutableDBEvent cdbTracer)

-- \| Remove the header corresponding to the given point from the beginning
Expand All @@ -218,9 +218,11 @@ copyToImmutableDB CDB{..} = electric $ do
-- | Copy blocks from the VolatileDB to ImmutableDB and trigger further tasks in
-- other threads.
--
-- We watch the chain for changes. Whenever the chain is longer than @k@, then
-- the headers older than @k@ are copied from the VolatileDB to the ImmutableDB
-- (using 'copyToImmutableDB'). Once that is complete,
-- Wait until the current chain ('cdbChain') is longer than its volatile suffix
-- ('getCurrentChain'). When this occurs, it indicates that new blocks have
-- become immutable. These newly immutable blocks are then copied from the
-- VolatileDB to the ImmutableDB (using 'copyToImmutableDB'). Once that is
-- complete,
--
-- * Trigger LedgerDB maintenance tasks, namely flushing, taking snapshots and
-- garbage collection.
Expand Down Expand Up @@ -254,15 +256,15 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
LedgerDB.tryFlush cdbLedgerDB
forever copyAndTrigger
where
SecurityParam k = configSecurityParam cdbTopLevelConfig

copyAndTrigger :: m ()
copyAndTrigger = do
-- Wait for the chain to grow larger than @k@
-- Wait for 'cdbChain' to become longer than 'getCurrentChain'.
numToWrite <- atomically $ do
curChain <- icWithoutTime <$> readTVar cdbChain
check $ fromIntegral (AF.length curChain) > unNonZero k
return $ fromIntegral (AF.length curChain) - unNonZero k
curChainVolSuffix <- Query.getCurrentChain cdb
let numToWrite = AF.length curChain - AF.length curChainVolSuffix
check $ numToWrite > 0
return $ fromIntegral numToWrite

-- Copy blocks to ImmutableDB
--
Expand Down
Loading
Loading