Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ openLedgerDB ::
, LedgerDB.TestInternals' IO blk
)
openLedgerDB [email protected]{LedgerDB.lgrFlavorArgs = LedgerDB.LedgerDbFlavorArgsV1 bss} = do
(ledgerDB, _, intLedgerDB) <-
(ledgerDB, intLedgerDB) <-
LedgerDB.openDBInternal
lgrDbArgs
( LedgerDB.V1.mkInitDb
Expand All @@ -76,7 +76,7 @@ openLedgerDB [email protected]{LedgerDB.lgrFlavorArgs = LedgerDB.L
genesisPoint
pure (ledgerDB, intLedgerDB)
openLedgerDB [email protected]{LedgerDB.lgrFlavorArgs = LedgerDB.LedgerDbFlavorArgsV2 args} = do
(ledgerDB, _, intLedgerDB) <-
(ledgerDB, intLedgerDB) <-
LedgerDB.openDBInternal
lgrDbArgs
( LedgerDB.V2.mkInitDb
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Breaking

- LedgerDB: implemented *predictable* snapshots, i.e. different nodes with the
same configuration will now create snapshots for the same slots.

See 'SnapshotPolicyArgs' for more details.
4 changes: 2 additions & 2 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ test-suite storage-test
Test.Ouroboros.Storage.ChainDB.FollowerPromptness
Test.Ouroboros.Storage.ChainDB.GcSchedule
Test.Ouroboros.Storage.ChainDB.Iterator
Test.Ouroboros.Storage.ChainDB.LedgerSnapshots
Test.Ouroboros.Storage.ChainDB.Model
Test.Ouroboros.Storage.ChainDB.Model.Test
Test.Ouroboros.Storage.ChainDB.Paths
Expand All @@ -698,7 +699,6 @@ test-suite storage-test
Test.Ouroboros.Storage.ImmutableDB.StateMachine
Test.Ouroboros.Storage.LedgerDB
Test.Ouroboros.Storage.LedgerDB.Serialisation
Test.Ouroboros.Storage.LedgerDB.SnapshotPolicy
Test.Ouroboros.Storage.LedgerDB.Snapshots
Test.Ouroboros.Storage.LedgerDB.StateMachine
Test.Ouroboros.Storage.LedgerDB.StateMachine.TestBlock
Expand All @@ -721,7 +721,7 @@ test-suite storage-test
bytestring,
cardano-binary,
cardano-ledger-binary:testlib,
cardano-ledger-core:{cardano-ledger-core, testlib},
cardano-ledger-core:cardano-ledger-core,
cardano-slotting:{cardano-slotting, testlib},
cardano-strict-containers,
cborg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
(chainDB, testing, env) <- lift $ do
traceWith tracer $ TraceOpenEvent (OpenedVolatileDB maxSlot)
traceWith tracer $ TraceOpenEvent StartedOpeningLgrDB
(lgrDB, replayed) <-
lgrDB <-
LedgerDB.openDB
argsLgrDb
(ImmutableDB.streamAPI immutableDB)
Expand Down Expand Up @@ -281,8 +281,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, intGarbageCollect = \slot -> getEnv h $ \e -> do
Background.garbageCollectBlocks e slot
LedgerDB.garbageCollect (cdbLedgerDB e) slot
, intTryTakeSnapshot = getEnv h $ \env' ->
void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound
, intTryTakeSnapshot = getEnv h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB
, intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse)
, intKillBgThreads = varKillBgThreads
}
Expand All @@ -293,7 +292,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
(castPoint $ AF.anchorPoint chain)
(castPoint $ AF.headPoint chain)

when launchBgTasks $ Background.launchBgTasks env replayed
when launchBgTasks $ Background.launchBgTasks env

return (chainDB, testing, env)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

-- | Background tasks:
--
Expand Down Expand Up @@ -53,7 +51,6 @@ import Data.Sequence.Strict (StrictSeq (..))
import qualified Data.Sequence.Strict as Seq
import Data.Time.Clock
import Data.Void (Void)
import Data.Word
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
Expand All @@ -76,7 +73,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.Condense
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
import Ouroboros.Consensus.Util.STM (Watcher (..), blockUntilJust, forkLinkedWatcher)
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF

Expand All @@ -93,15 +90,13 @@ launchBgTasks ::
, HasHardForkHistory blk
) =>
ChainDbEnv m blk ->
-- | Number of immutable blocks replayed on ledger DB startup
Word64 ->
m ()
launchBgTasks cdb@CDB{..} replayed = do
launchBgTasks cdb@CDB{..} = do
!addBlockThread <-
launch "ChainDB.addBlockRunner" $
addBlockRunner cdbChainSelFuse cdb

ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger
!ledgerDbMaintenaceThread <-
forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $
ledgerDbTaskWatcher cdb ledgerDbTasksTrigger
Expand Down Expand Up @@ -259,18 +254,17 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
copyAndTrigger :: m ()
copyAndTrigger = do
-- Wait for the chain to grow larger than @k@
numToWrite <- atomically $ do
atomically $ do
curChain <- icWithoutTime <$> readTVar cdbChain
check $ fromIntegral (AF.length curChain) > unNonZero k
return $ fromIntegral (AF.length curChain) - unNonZero k

-- Copy blocks to ImmutableDB
--
-- This is a synchronous operation: when it returns, the blocks have been
-- copied to disk (though not flushed, necessarily).
gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)

triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo
scheduleGC' gcSlotNo

scheduleGC' :: WithOrigin SlotNo -> m ()
Expand All @@ -292,45 +286,20 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
-- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable
-- DB tip slot advances when we finish copying blocks to it.
newtype LedgerDbTasksTrigger m
= LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState)
= LedgerDbTasksTrigger (StrictTVar m (WithOrigin SlotNo))

data LedgerDbTaskState = LedgerDbTaskState
{ ldbtsImmTip :: !(WithOrigin SlotNo)
, ldbtsPrevSnapshotTime :: !(Maybe Time)
, ldbtsBlocksSinceLastSnapshot :: !Word64
}
deriving stock Generic
deriving anyclass NoThunks

newLedgerDbTasksTrigger ::
IOLike m =>
-- | Number of blocks replayed.
Word64 ->
m (LedgerDbTasksTrigger m)
newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st
where
st =
LedgerDbTaskState
{ ldbtsImmTip = Origin
, ldbtsPrevSnapshotTime = Nothing
, ldbtsBlocksSinceLastSnapshot = replayed
}
newLedgerDbTasksTrigger :: IOLike m => m (LedgerDbTasksTrigger m)
newLedgerDbTasksTrigger = LedgerDbTasksTrigger <$> newTVarIO Origin

triggerLedgerDbTasks ::
forall m.
IOLike m =>
LedgerDbTasksTrigger m ->
-- | New tip of the ImmutableDB.
WithOrigin SlotNo ->
-- | Number of blocks written to the ImmutableDB.
Word64 ->
m ()
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten =
atomically $ modifyTVar varSt $ \st ->
st
{ ldbtsImmTip = immTip
, ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
}
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) =
atomically . writeTVar varSt

-- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes.
--
Expand All @@ -342,38 +311,16 @@ ledgerDbTaskWatcher ::
IOLike m =>
ChainDbEnv m blk ->
LedgerDbTasksTrigger m ->
Watcher m LedgerDbTaskState (WithOrigin SlotNo)
Watcher m SlotNo SlotNo
ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) =
Watcher
{ wFingerprint = ldbtsImmTip
{ wFingerprint = id
, wInitial = Nothing
, wReader = readTVar varSt
, wNotify =
\LedgerDbTaskState
{ ldbtsImmTip
, ldbtsBlocksSinceLastSnapshot = blocksSinceLast
, ldbtsPrevSnapshotTime = prevSnapTime
} ->
whenJust (withOriginToMaybe ldbtsImmTip) $ \slotNo -> do
LedgerDB.tryFlush cdbLedgerDB

now <- getMonotonicTime
LedgerDB.SnapCounters
{ prevSnapshotTime
, ntBlocksSinceLastSnap
} <-
LedgerDB.tryTakeSnapshot
cdbLedgerDB
((,now) <$> prevSnapTime)
blocksSinceLast
atomically $ modifyTVar varSt $ \st ->
st
{ ldbtsBlocksSinceLastSnapshot =
ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
, ldbtsPrevSnapshotTime = prevSnapshotTime
}

LedgerDB.garbageCollect cdbLedgerDB slotNo
, wReader = blockUntilJust $ withOriginToMaybe <$> readTVar varSt
, wNotify = \slotNo -> do
LedgerDB.tryFlush cdbLedgerDB
LedgerDB.tryTakeSnapshot cdbLedgerDB
LedgerDB.garbageCollect cdbLedgerDB slotNo
}

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ module Ouroboros.Consensus.Storage.LedgerDB
) where

import Data.Functor.Contravariant ((>$<))
import Data.Word
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.HardFork.Abstract
import Ouroboros.Consensus.Ledger.Inspect
Expand Down Expand Up @@ -58,7 +57,7 @@ openDB ::
Point blk ->
-- | How to get blocks from the ChainDB
ResolveBlock m blk ->
m (LedgerDB' m blk, Word64)
m (LedgerDB' m blk)
openDB
args
stream
Expand Down Expand Up @@ -94,11 +93,9 @@ doOpenDB ::
InitDB db m blk ->
StreamAPI m blk blk ->
Point blk ->
m (LedgerDB' m blk, Word64)
m (LedgerDB' m blk)
doOpenDB args initDb stream replayGoal =
f <$> openDBInternal args initDb stream replayGoal
where
f (ldb, replayCounter, _) = (ldb, replayCounter)
fst <$> openDBInternal args initDb stream replayGoal

-- | Open the ledger DB and expose internals for testing purposes
openDBInternal ::
Expand All @@ -111,10 +108,10 @@ openDBInternal ::
InitDB db m blk ->
StreamAPI m blk blk ->
Point blk ->
m (LedgerDB' m blk, Word64, TestInternals' m blk)
m (LedgerDB' m blk, TestInternals' m blk)
openDBInternal args@(LedgerDbArgs{lgrHasFS = SomeHasFS fs}) initDb stream replayGoal = do
createDirectoryIfMissing fs True (mkFsPath [])
(_initLog, db, replayCounter) <-
(_initLog, db) <-
initialize
replayTracer
snapTracer
Expand All @@ -125,7 +122,7 @@ openDBInternal args@(LedgerDbArgs{lgrHasFS = SomeHasFS fs}) initDb stream replay
initDb
lgrStartSnapshot
(ledgerDb, internal) <- mkLedgerDb initDb db
return (ledgerDb, replayCounter, internal)
return (ledgerDb, internal)
where
LedgerDbArgs
{ lgrConfig
Expand Down
Loading
Loading