Skip to content

Commit 886ba4d

Browse files
committed
LedgerDB: implement predictable snapshotting
1 parent 36b3c2d commit 886ba4d

File tree

9 files changed

+274
-248
lines changed

9 files changed

+274
-248
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
### Breaking
2+
3+
- LedgerDB: implemented *predictable* snapshots, i.e. different nodes with the
4+
same configuration will now create snapshots for the same slots.
5+
6+
See 'SnapshotPolicyArgs' for more details.

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
160160
(chainDB, testing, env) <- lift $ do
161161
traceWith tracer $ TraceOpenEvent (OpenedVolatileDB maxSlot)
162162
traceWith tracer $ TraceOpenEvent StartedOpeningLgrDB
163-
(lgrDB, replayed) <-
163+
(lgrDB, _replayed) <-
164164
LedgerDB.openDB
165165
argsLgrDb
166166
(ImmutableDB.streamAPI immutableDB)
@@ -281,8 +281,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
281281
, intGarbageCollect = \slot -> getEnv h $ \e -> do
282282
Background.garbageCollectBlocks e slot
283283
LedgerDB.garbageCollect (cdbLedgerDB e) slot
284-
, intTryTakeSnapshot = getEnv h $ \env' ->
285-
void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound
284+
, intTryTakeSnapshot = getEnv h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB
286285
, intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse)
287286
, intKillBgThreads = varKillBgThreads
288287
}
@@ -293,7 +292,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
293292
(castPoint $ AF.anchorPoint chain)
294293
(castPoint $ AF.headPoint chain)
295294

296-
when launchBgTasks $ Background.launchBgTasks env replayed
295+
when launchBgTasks $ Background.launchBgTasks env
297296

298297
return (chainDB, testing, env)
299298

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs

Lines changed: 17 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
{-# LANGUAGE BangPatterns #-}
22
{-# LANGUAGE DeriveAnyClass #-}
33
{-# LANGUAGE DeriveGeneric #-}
4-
{-# LANGUAGE DerivingStrategies #-}
54
{-# LANGUAGE FlexibleContexts #-}
65
{-# LANGUAGE LambdaCase #-}
76
{-# LANGUAGE NamedFieldPuns #-}
87
{-# LANGUAGE RecordWildCards #-}
98
{-# LANGUAGE ScopedTypeVariables #-}
10-
{-# LANGUAGE TupleSections #-}
119

1210
-- | Background tasks:
1311
--
@@ -53,7 +51,6 @@ import Data.Sequence.Strict (StrictSeq (..))
5351
import qualified Data.Sequence.Strict as Seq
5452
import Data.Time.Clock
5553
import Data.Void (Void)
56-
import Data.Word
5754
import GHC.Generics (Generic)
5855
import GHC.Stack (HasCallStack)
5956
import Ouroboros.Consensus.Block
@@ -76,7 +73,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
7673
import Ouroboros.Consensus.Util
7774
import Ouroboros.Consensus.Util.Condense
7875
import Ouroboros.Consensus.Util.IOLike
79-
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
76+
import Ouroboros.Consensus.Util.STM (Watcher (..), blockUntilJust, forkLinkedWatcher)
8077
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
8178
import qualified Ouroboros.Network.AnchoredFragment as AF
8279

@@ -93,15 +90,13 @@ launchBgTasks ::
9390
, HasHardForkHistory blk
9491
) =>
9592
ChainDbEnv m blk ->
96-
-- | Number of immutable blocks replayed on ledger DB startup
97-
Word64 ->
9893
m ()
99-
launchBgTasks cdb@CDB{..} replayed = do
94+
launchBgTasks cdb@CDB{..} = do
10095
!addBlockThread <-
10196
launch "ChainDB.addBlockRunner" $
10297
addBlockRunner cdbChainSelFuse cdb
10398

104-
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
99+
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger
105100
!ledgerDbMaintenaceThread <-
106101
forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $
107102
ledgerDbTaskWatcher cdb ledgerDbTasksTrigger
@@ -259,18 +254,17 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
259254
copyAndTrigger :: m ()
260255
copyAndTrigger = do
261256
-- Wait for the chain to grow larger than @k@
262-
numToWrite <- atomically $ do
257+
atomically $ do
263258
curChain <- icWithoutTime <$> readTVar cdbChain
264259
check $ fromIntegral (AF.length curChain) > unNonZero k
265-
return $ fromIntegral (AF.length curChain) - unNonZero k
266260

267261
-- Copy blocks to ImmutableDB
268262
--
269263
-- This is a synchronous operation: when it returns, the blocks have been
270264
-- copied to disk (though not flushed, necessarily).
271265
gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)
272266

273-
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
267+
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo
274268
scheduleGC' gcSlotNo
275269

276270
scheduleGC' :: WithOrigin SlotNo -> m ()
@@ -292,45 +286,20 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
292286
-- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable
293287
-- DB tip slot advances when we finish copying blocks to it.
294288
newtype LedgerDbTasksTrigger m
295-
= LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState)
289+
= LedgerDbTasksTrigger (StrictTVar m (WithOrigin SlotNo))
296290

297-
data LedgerDbTaskState = LedgerDbTaskState
298-
{ ldbtsImmTip :: !(WithOrigin SlotNo)
299-
, ldbtsPrevSnapshotTime :: !(Maybe Time)
300-
, ldbtsBlocksSinceLastSnapshot :: !Word64
301-
}
302-
deriving stock Generic
303-
deriving anyclass NoThunks
304-
305-
newLedgerDbTasksTrigger ::
306-
IOLike m =>
307-
-- | Number of blocks replayed.
308-
Word64 ->
309-
m (LedgerDbTasksTrigger m)
310-
newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st
311-
where
312-
st =
313-
LedgerDbTaskState
314-
{ ldbtsImmTip = Origin
315-
, ldbtsPrevSnapshotTime = Nothing
316-
, ldbtsBlocksSinceLastSnapshot = replayed
317-
}
291+
newLedgerDbTasksTrigger :: IOLike m => m (LedgerDbTasksTrigger m)
292+
newLedgerDbTasksTrigger = LedgerDbTasksTrigger <$> newTVarIO Origin
318293

319294
triggerLedgerDbTasks ::
320295
forall m.
321296
IOLike m =>
322297
LedgerDbTasksTrigger m ->
323298
-- | New tip of the ImmutableDB.
324299
WithOrigin SlotNo ->
325-
-- | Number of blocks written to the ImmutableDB.
326-
Word64 ->
327300
m ()
328-
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten =
329-
atomically $ modifyTVar varSt $ \st ->
330-
st
331-
{ ldbtsImmTip = immTip
332-
, ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
333-
}
301+
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) =
302+
atomically . writeTVar varSt
334303

335304
-- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes.
336305
--
@@ -342,38 +311,16 @@ ledgerDbTaskWatcher ::
342311
IOLike m =>
343312
ChainDbEnv m blk ->
344313
LedgerDbTasksTrigger m ->
345-
Watcher m LedgerDbTaskState (WithOrigin SlotNo)
314+
Watcher m SlotNo SlotNo
346315
ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) =
347316
Watcher
348-
{ wFingerprint = ldbtsImmTip
317+
{ wFingerprint = id
349318
, wInitial = Nothing
350-
, wReader = readTVar varSt
351-
, wNotify =
352-
\LedgerDbTaskState
353-
{ ldbtsImmTip
354-
, ldbtsBlocksSinceLastSnapshot = blocksSinceLast
355-
, ldbtsPrevSnapshotTime = prevSnapTime
356-
} ->
357-
whenJust (withOriginToMaybe ldbtsImmTip) $ \slotNo -> do
358-
LedgerDB.tryFlush cdbLedgerDB
359-
360-
now <- getMonotonicTime
361-
LedgerDB.SnapCounters
362-
{ prevSnapshotTime
363-
, ntBlocksSinceLastSnap
364-
} <-
365-
LedgerDB.tryTakeSnapshot
366-
cdbLedgerDB
367-
((,now) <$> prevSnapTime)
368-
blocksSinceLast
369-
atomically $ modifyTVar varSt $ \st ->
370-
st
371-
{ ldbtsBlocksSinceLastSnapshot =
372-
ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
373-
, ldbtsPrevSnapshotTime = prevSnapshotTime
374-
}
375-
376-
LedgerDB.garbageCollect cdbLedgerDB slotNo
319+
, wReader = blockUntilJust $ withOriginToMaybe <$> readTVar varSt
320+
, wNotify = \slotNo -> do
321+
LedgerDB.tryFlush cdbLedgerDB
322+
LedgerDB.tryTakeSnapshot cdbLedgerDB
323+
LedgerDB.garbageCollect cdbLedgerDB slotNo
377324
}
378325

379326
{-------------------------------------------------------------------------------

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.API
149149
, withPrivateTipForker
150150
, withTipForker
151151

152-
-- * Snapshots
153-
, SnapCounters (..)
154-
155152
-- * Testing
156153
, TestInternals (..)
157154
, TestInternals'
@@ -160,7 +157,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.API
160157

161158
import Codec.Serialise
162159
import qualified Control.Monad as Monad
163-
import Control.Monad.Class.MonadTime.SI
164160
import Control.Monad.Except
165161
import Control.ResourceRegistry
166162
import Control.Tracer
@@ -262,18 +258,12 @@ data LedgerDB m l blk = LedgerDB
262258
-- * The set of previously applied points.
263259
, tryTakeSnapshot ::
264260
l ~ ExtLedgerState blk =>
265-
Maybe (Time, Time) ->
266-
Word64 ->
267-
m SnapCounters
261+
m ()
268262
-- ^ If the provided arguments indicate so (based on the SnapshotPolicy with
269263
-- which this LedgerDB was opened), take a snapshot and delete stale ones.
270264
--
271-
-- The arguments are:
272-
--
273-
-- - If a snapshot has been taken already, the time at which it was taken
274-
-- and the current time.
275-
--
276-
-- - How many blocks have been processed since the last snapshot.
265+
-- For V1, this must not be called concurrently with 'garbageCollect' and/or
266+
-- 'tryFlush'.
277267
, tryFlush :: m ()
278268
-- ^ Flush V1 in-memory LedgerDB state to disk, if possible. This is a no-op
279269
-- for implementations that do not need an explicit flush function.
@@ -420,18 +410,6 @@ getReadOnlyForker ::
420410
m (Either GetForkerError (ReadOnlyForker m l blk))
421411
getReadOnlyForker ldb rr pt = fmap readOnlyForker <$> getForkerAtTarget ldb rr pt
422412

423-
{-------------------------------------------------------------------------------
424-
Snapshots
425-
-------------------------------------------------------------------------------}
426-
427-
-- | Counters to keep track of when we made the last snapshot.
428-
data SnapCounters = SnapCounters
429-
{ prevSnapshotTime :: !(Maybe Time)
430-
-- ^ When was the last time we made a snapshot
431-
, ntBlocksSinceLastSnap :: !Word64
432-
-- ^ How many blocks have we processed since the last snapshot
433-
}
434-
435413
{-------------------------------------------------------------------------------
436414
Initialization
437415
-------------------------------------------------------------------------------}

0 commit comments

Comments
 (0)