Skip to content

Commit a0c0ae0

Browse files
committed
LedgerDB: implement predictable snapshotting
1 parent c70fd54 commit a0c0ae0

File tree

9 files changed

+274
-248
lines changed

9 files changed

+274
-248
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
### Breaking
2+
3+
- LedgerDB: implemented *predictable* snapshots, i.e. different nodes with the
4+
same configuration will now create snapshots for the same slots.
5+
6+
See 'SnapshotPolicyArgs' for more details.

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
160160
(chainDB, testing, env) <- lift $ do
161161
traceWith tracer $ TraceOpenEvent (OpenedVolatileDB maxSlot)
162162
traceWith tracer $ TraceOpenEvent StartedOpeningLgrDB
163-
(lgrDB, replayed) <-
163+
(lgrDB, _replayed) <-
164164
LedgerDB.openDB
165165
argsLgrDb
166166
(ImmutableDB.streamAPI immutableDB)
@@ -281,8 +281,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
281281
, intGarbageCollect = \slot -> getEnv h $ \e -> do
282282
Background.garbageCollectBlocks e slot
283283
LedgerDB.garbageCollect (cdbLedgerDB e) slot
284-
, intTryTakeSnapshot = getEnv h $ \env' ->
285-
void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound
284+
, intTryTakeSnapshot = getEnv h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB
286285
, intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse)
287286
, intKillBgThreads = varKillBgThreads
288287
}
@@ -293,7 +292,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
293292
(castPoint $ AF.anchorPoint chain)
294293
(castPoint $ AF.headPoint chain)
295294

296-
when launchBgTasks $ Background.launchBgTasks env replayed
295+
when launchBgTasks $ Background.launchBgTasks env
297296

298297
return (chainDB, testing, env)
299298

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs

Lines changed: 17 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
{-# LANGUAGE BangPatterns #-}
22
{-# LANGUAGE DeriveAnyClass #-}
33
{-# LANGUAGE DeriveGeneric #-}
4-
{-# LANGUAGE DerivingStrategies #-}
54
{-# LANGUAGE FlexibleContexts #-}
65
{-# LANGUAGE LambdaCase #-}
76
{-# LANGUAGE NamedFieldPuns #-}
87
{-# LANGUAGE RecordWildCards #-}
98
{-# LANGUAGE ScopedTypeVariables #-}
10-
{-# LANGUAGE TupleSections #-}
119

1210
-- | Background tasks:
1311
--
@@ -53,7 +51,6 @@ import Data.Sequence.Strict (StrictSeq (..))
5351
import qualified Data.Sequence.Strict as Seq
5452
import Data.Time.Clock
5553
import Data.Void (Void)
56-
import Data.Word
5754
import GHC.Generics (Generic)
5855
import GHC.Stack (HasCallStack)
5956
import Ouroboros.Consensus.Block
@@ -77,7 +74,7 @@ import Ouroboros.Consensus.Util
7774
import Ouroboros.Consensus.Util.Condense
7875
import Ouroboros.Consensus.Util.Enclose (Enclosing' (..))
7976
import Ouroboros.Consensus.Util.IOLike
80-
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
77+
import Ouroboros.Consensus.Util.STM (Watcher (..), blockUntilJust, forkLinkedWatcher)
8178
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
8279
import qualified Ouroboros.Network.AnchoredFragment as AF
8380

@@ -94,15 +91,13 @@ launchBgTasks ::
9491
, HasHardForkHistory blk
9592
) =>
9693
ChainDbEnv m blk ->
97-
-- | Number of immutable blocks replayed on ledger DB startup
98-
Word64 ->
9994
m ()
100-
launchBgTasks cdb@CDB{..} replayed = do
95+
launchBgTasks cdb@CDB{..} = do
10196
!addBlockThread <-
10297
launch "ChainDB.addBlockRunner" $
10398
addBlockRunner cdbChainSelFuse cdb
10499

105-
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
100+
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger
106101
!ledgerDbMaintenaceThread <-
107102
forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTasksTasks" $
108103
ledgerDbTasksTasks cdb ledgerDbTasksTrigger
@@ -260,18 +255,17 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
260255
copyAndTrigger :: m ()
261256
copyAndTrigger = do
262257
-- Wait for the chain to grow larger than @k@
263-
numToWrite <- atomically $ do
258+
atomically $ do
264259
curChain <- icWithoutTime <$> readTVar cdbChain
265260
check $ fromIntegral (AF.length curChain) > unNonZero k
266-
return $ fromIntegral (AF.length curChain) - unNonZero k
267261

268262
-- Copy blocks to ImmutableDB
269263
--
270264
-- This is a synchronous operation: when it returns, the blocks have been
271265
-- copied to disk (though not flushed, necessarily).
272266
gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)
273267

274-
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
268+
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo
275269
scheduleGC' gcSlotNo
276270

277271
scheduleGC' :: WithOrigin SlotNo -> m ()
@@ -293,45 +287,20 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
293287
-- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable
294288
-- DB tip slot advances when we finish copying blocks to it.
295289
newtype LedgerDbTasksTrigger m
296-
= LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState)
290+
= LedgerDbTasksTrigger (StrictTVar m (WithOrigin SlotNo))
297291

298-
data LedgerDbTaskState = LedgerDbTaskState
299-
{ ldbtsImmTip :: !(WithOrigin SlotNo)
300-
, ldbtsPrevSnapshotTime :: !(Maybe Time)
301-
, ldbtsBlocksSinceLastSnapshot :: !Word64
302-
}
303-
deriving stock Generic
304-
deriving anyclass NoThunks
305-
306-
newLedgerDbTasksTrigger ::
307-
IOLike m =>
308-
-- | Number of blocks replayed.
309-
Word64 ->
310-
m (LedgerDbTasksTrigger m)
311-
newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st
312-
where
313-
st =
314-
LedgerDbTaskState
315-
{ ldbtsImmTip = Origin
316-
, ldbtsPrevSnapshotTime = Nothing
317-
, ldbtsBlocksSinceLastSnapshot = replayed
318-
}
292+
newLedgerDbTasksTrigger :: IOLike m => m (LedgerDbTasksTrigger m)
293+
newLedgerDbTasksTrigger = LedgerDbTasksTrigger <$> newTVarIO Origin
319294

320295
triggerLedgerDbTasks ::
321296
forall m.
322297
IOLike m =>
323298
LedgerDbTasksTrigger m ->
324299
-- | New tip of the ImmutableDB.
325300
WithOrigin SlotNo ->
326-
-- | Number of blocks written to the ImmutableDB.
327-
Word64 ->
328301
m ()
329-
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten =
330-
atomically $ modifyTVar varSt $ \st ->
331-
st
332-
{ ldbtsImmTip = immTip
333-
, ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
334-
}
302+
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) =
303+
atomically . writeTVar varSt
335304

336305
-- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes.
337306
--
@@ -343,38 +312,16 @@ ledgerDbTasksTasks ::
343312
IOLike m =>
344313
ChainDbEnv m blk ->
345314
LedgerDbTasksTrigger m ->
346-
Watcher m LedgerDbTaskState (WithOrigin SlotNo)
315+
Watcher m SlotNo SlotNo
347316
ledgerDbTasksTasks CDB{..} (LedgerDbTasksTrigger varSt) =
348317
Watcher
349-
{ wFingerprint = ldbtsImmTip
318+
{ wFingerprint = id
350319
, wInitial = Nothing
351-
, wReader = readTVar varSt
352-
, wNotify =
353-
\LedgerDbTaskState
354-
{ ldbtsImmTip
355-
, ldbtsBlocksSinceLastSnapshot = blocksSinceLast
356-
, ldbtsPrevSnapshotTime = prevSnapTime
357-
} ->
358-
whenJust (withOriginToMaybe ldbtsImmTip) $ \slotNo -> do
359-
LedgerDB.tryFlush cdbLedgerDB
360-
361-
now <- getMonotonicTime
362-
LedgerDB.SnapCounters
363-
{ prevSnapshotTime
364-
, ntBlocksSinceLastSnap
365-
} <-
366-
LedgerDB.tryTakeSnapshot
367-
cdbLedgerDB
368-
((,now) <$> prevSnapTime)
369-
blocksSinceLast
370-
atomically $ modifyTVar varSt $ \st ->
371-
st
372-
{ ldbtsBlocksSinceLastSnapshot =
373-
ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
374-
, ldbtsPrevSnapshotTime = prevSnapshotTime
375-
}
376-
377-
LedgerDB.garbageCollect cdbLedgerDB slotNo
320+
, wReader = blockUntilJust $ withOriginToMaybe <$> readTVar varSt
321+
, wNotify = \slotNo -> do
322+
LedgerDB.tryFlush cdbLedgerDB
323+
LedgerDB.tryTakeSnapshot cdbLedgerDB
324+
LedgerDB.garbageCollect cdbLedgerDB slotNo
378325
}
379326

380327
{-------------------------------------------------------------------------------

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.API
149149
, withPrivateTipForker
150150
, withTipForker
151151

152-
-- * Snapshots
153-
, SnapCounters (..)
154-
155152
-- * Testing
156153
, TestInternals (..)
157154
, TestInternals'
@@ -160,7 +157,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.API
160157

161158
import Codec.Serialise
162159
import qualified Control.Monad as Monad
163-
import Control.Monad.Class.MonadTime.SI
164160
import Control.Monad.Except
165161
import Control.ResourceRegistry
166162
import Control.Tracer
@@ -262,18 +258,12 @@ data LedgerDB m l blk = LedgerDB
262258
-- * The set of previously applied points.
263259
, tryTakeSnapshot ::
264260
l ~ ExtLedgerState blk =>
265-
Maybe (Time, Time) ->
266-
Word64 ->
267-
m SnapCounters
261+
m ()
268262
-- ^ If the provided arguments indicate so (based on the SnapshotPolicy with
269263
-- which this LedgerDB was opened), take a snapshot and delete stale ones.
270264
--
271-
-- The arguments are:
272-
--
273-
-- - If a snapshot has been taken already, the time at which it was taken
274-
-- and the current time.
275-
--
276-
-- - How many blocks have been processed since the last snapshot.
265+
-- For V1, this must not be called concurrently with 'garbageCollect' and/or
266+
-- 'tryFlush'.
277267
, tryFlush :: m ()
278268
-- ^ Flush V1 in-memory LedgerDB state to disk, if possible. This is a no-op
279269
-- for implementations that do not need an explicit flush function.
@@ -420,18 +410,6 @@ getReadOnlyForker ::
420410
m (Either GetForkerError (ReadOnlyForker m l blk))
421411
getReadOnlyForker ldb rr pt = fmap readOnlyForker <$> getForkerAtTarget ldb rr pt
422412

423-
{-------------------------------------------------------------------------------
424-
Snapshots
425-
-------------------------------------------------------------------------------}
426-
427-
-- | Counters to keep track of when we made the last snapshot.
428-
data SnapCounters = SnapCounters
429-
{ prevSnapshotTime :: !(Maybe Time)
430-
-- ^ When was the last time we made a snapshot
431-
, ntBlocksSinceLastSnap :: !Word64
432-
-- ^ How many blocks have we processed since the last snapshot
433-
}
434-
435413
{-------------------------------------------------------------------------------
436414
Initialization
437415
-------------------------------------------------------------------------------}

0 commit comments

Comments
 (0)