1
1
{-# LANGUAGE BangPatterns #-}
2
2
{-# LANGUAGE DeriveAnyClass #-}
3
3
{-# LANGUAGE DeriveGeneric #-}
4
+ {-# LANGUAGE DerivingStrategies #-}
4
5
{-# LANGUAGE FlexibleContexts #-}
5
6
{-# LANGUAGE LambdaCase #-}
6
7
{-# LANGUAGE NamedFieldPuns #-}
@@ -19,11 +20,10 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background
19
20
launchBgTasks
20
21
21
22
-- * Copying blocks from the VolatileDB to the ImmutableDB
22
- , copyAndSnapshotRunner
23
23
, copyToImmutableDB
24
24
25
25
-- * Executing garbage collection
26
- , garbageCollect
26
+ , garbageCollectBlocks
27
27
28
28
-- * Scheduling garbage collections
29
29
, GcParams (.. )
@@ -76,6 +76,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
76
76
import Ouroboros.Consensus.Util
77
77
import Ouroboros.Consensus.Util.Condense
78
78
import Ouroboros.Consensus.Util.IOLike
79
+ import Ouroboros.Consensus.Util.STM (Watcher (.. ), forkLinkedWatcher )
79
80
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (.. ))
80
81
import qualified Ouroboros.Network.AnchoredFragment as AF
81
82
@@ -99,17 +100,30 @@ launchBgTasks cdb@CDB{..} replayed = do
99
100
! addBlockThread <-
100
101
launch " ChainDB.addBlockRunner" $
101
102
addBlockRunner cdbChainSelFuse cdb
103
+
104
+ ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
105
+ ! ledgerDbMaintenaceThread <-
106
+ forkLinkedWatcher cdbRegistry " ChainDB.ledgerDbTaskWatcher" $
107
+ ledgerDbTaskWatcher cdb ledgerDbTasksTrigger
108
+
102
109
gcSchedule <- newGcSchedule
103
110
! gcThread <-
104
- launch " ChainDB.gcScheduleRunner " $
111
+ launch " ChainDB.gcBlocksScheduleRunner " $
105
112
gcScheduleRunner gcSchedule $
106
- garbageCollect cdb
107
- ! copyAndSnapshotThread <-
108
- launch " ChainDB.copyAndSnapshotRunner" $
109
- copyAndSnapshotRunner cdb gcSchedule replayed cdbCopyFuse
113
+ garbageCollectBlocks cdb
114
+
115
+ ! copyToImmutableDBThread <-
116
+ launch " ChainDB.copyToImmutableDBRunner" $
117
+ copyToImmutableDBRunner cdb ledgerDbTasksTrigger gcSchedule cdbCopyFuse
118
+
110
119
atomically $
111
120
writeTVar cdbKillBgThreads $
112
- sequence_ [addBlockThread, gcThread, copyAndSnapshotThread]
121
+ sequence_
122
+ [ addBlockThread
123
+ , cancelThread ledgerDbMaintenaceThread
124
+ , gcThread
125
+ , copyToImmutableDBThread
126
+ ]
113
127
where
114
128
launch :: String -> m Void -> m (m () )
115
129
launch = fmap cancelThread .: forkLinkedThread cdbRegistry
@@ -198,22 +212,18 @@ copyToImmutableDB CDB{..} = electric $ do
198
212
_ -> error " header to remove not on the current chain"
199
213
200
214
{- ------------------------------------------------------------------------------
201
- Snapshotting
215
+ Copy to ImmutableDB
202
216
-------------------------------------------------------------------------------}
203
217
204
- -- | Copy blocks from the VolatileDB to ImmutableDB and take snapshots of the
205
- -- LedgerDB
218
+ -- | Copy blocks from the VolatileDB to ImmutableDB and trigger further tasks in
219
+ -- other threads.
206
220
--
207
221
-- We watch the chain for changes. Whenever the chain is longer than @k@, then
208
222
-- the headers older than @k@ are copied from the VolatileDB to the ImmutableDB
209
223
-- (using 'copyToImmutableDB'). Once that is complete,
210
224
--
211
- -- * We periodically take a snapshot of the LedgerDB (depending on its config).
212
- -- When enough blocks (depending on its config) have been replayed during
213
- -- startup, a snapshot of the replayed LedgerDB will be written to disk at the
214
- -- start of this function. NOTE: After this initial snapshot we do not take a
215
- -- snapshot of the LedgerDB until the chain has changed again, irrespective of
216
- -- the LedgerDB policy.
225
+ -- * Trigger LedgerDB maintenance tasks, namely flushing, taking snapshots and
226
+ -- garbage collection.
217
227
--
218
228
-- * Schedule GC of the VolatileDB ('scheduleGC') for the 'SlotNo' of the most
219
229
-- recent block that was copied.
@@ -228,32 +238,26 @@ copyToImmutableDB CDB{..} = electric $ do
228
238
-- GC can happen, when we restart the node and schedule the /next/ GC, it will
229
239
-- /imply/ any previously scheduled GC, since GC is driven by slot number
230
240
-- ("garbage collect anything older than @x@").
231
- copyAndSnapshotRunner ::
241
+ copyToImmutableDBRunner ::
232
242
forall m blk .
233
243
( IOLike m
234
244
, LedgerSupportsProtocol blk
235
245
) =>
236
246
ChainDbEnv m blk ->
247
+ LedgerDbTasksTrigger m ->
237
248
GcSchedule m ->
238
- -- | Number of immutable blocks replayed on ledger DB startup
239
- Word64 ->
240
249
Fuse m ->
241
250
m Void
242
- copyAndSnapshotRunner cdb@ CDB {.. } gcSchedule replayed fuse = do
251
+ copyToImmutableDBRunner cdb@ CDB {.. } ledgerDbTasksTrigger gcSchedule fuse = do
243
252
-- this first flush will persist the differences that come from the initial
244
253
-- chain selection.
245
254
LedgerDB. tryFlush cdbLedgerDB
246
- loop =<< LedgerDB. tryTakeSnapshot cdbLedgerDB Nothing replayed
255
+ forever copyAndTrigger
247
256
where
248
257
SecurityParam k = configSecurityParam cdbTopLevelConfig
249
258
250
- loop :: LedgerDB. SnapCounters -> m Void
251
- loop counters = do
252
- let LedgerDB. SnapCounters
253
- { prevSnapshotTime
254
- , ntBlocksSinceLastSnap
255
- } = counters
256
-
259
+ copyAndTrigger :: m ()
260
+ copyAndTrigger = do
257
261
-- Wait for the chain to grow larger than @k@
258
262
numToWrite <- atomically $ do
259
263
curChain <- icWithoutTime <$> readTVar cdbChain
@@ -264,14 +268,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
264
268
--
265
269
-- This is a synchronous operation: when it returns, the blocks have been
266
270
-- copied to disk (though not flushed, necessarily).
267
- withFuse fuse (copyToImmutableDB cdb) >>= scheduleGC'
271
+ gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)
268
272
269
- LedgerDB. tryFlush cdbLedgerDB
270
-
271
- now <- getMonotonicTime
272
- let ntBlocksSinceLastSnap' = ntBlocksSinceLastSnap + numToWrite
273
-
274
- loop =<< LedgerDB. tryTakeSnapshot cdbLedgerDB ((,now) <$> prevSnapshotTime) ntBlocksSinceLastSnap'
273
+ triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
274
+ scheduleGC' gcSlotNo
275
275
276
276
scheduleGC' :: WithOrigin SlotNo -> m ()
277
277
scheduleGC' Origin = return ()
@@ -285,16 +285,104 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
285
285
}
286
286
gcSchedule
287
287
288
+ {- ------------------------------------------------------------------------------
289
+ LedgerDB maintenance tasks
290
+ -------------------------------------------------------------------------------}
291
+
292
+ -- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable
293
+ -- DB tip slot advances when we finish copying blocks to it.
294
+ newtype LedgerDbTasksTrigger m
295
+ = LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState )
296
+
297
+ data LedgerDbTaskState = LedgerDbTaskState
298
+ { ldbtsImmTip :: ! (WithOrigin SlotNo )
299
+ , ldbtsPrevSnapshotTime :: ! (Maybe Time )
300
+ , ldbtsBlocksSinceLastSnapshot :: ! Word64
301
+ }
302
+ deriving stock Generic
303
+ deriving anyclass NoThunks
304
+
305
+ newLedgerDbTasksTrigger ::
306
+ IOLike m =>
307
+ -- | Number of blocks replayed.
308
+ Word64 ->
309
+ m (LedgerDbTasksTrigger m )
310
+ newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st
311
+ where
312
+ st =
313
+ LedgerDbTaskState
314
+ { ldbtsImmTip = Origin
315
+ , ldbtsPrevSnapshotTime = Nothing
316
+ , ldbtsBlocksSinceLastSnapshot = replayed
317
+ }
318
+
319
+ triggerLedgerDbTasks ::
320
+ forall m .
321
+ IOLike m =>
322
+ LedgerDbTasksTrigger m ->
323
+ -- | New tip of the ImmutableDB.
324
+ WithOrigin SlotNo ->
325
+ -- | Number of blocks written to the ImmutableDB.
326
+ Word64 ->
327
+ m ()
328
+ triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten =
329
+ atomically $ modifyTVar varSt $ \ st ->
330
+ st
331
+ { ldbtsImmTip = immTip
332
+ , ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
333
+ }
334
+
335
+ -- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes.
336
+ --
337
+ -- * Flushing of differences.
338
+ -- * Taking snapshots.
339
+ -- * Garbage collection.
340
+ ledgerDbTaskWatcher ::
341
+ forall m blk .
342
+ IOLike m =>
343
+ ChainDbEnv m blk ->
344
+ LedgerDbTasksTrigger m ->
345
+ Watcher m LedgerDbTaskState (WithOrigin SlotNo )
346
+ ledgerDbTaskWatcher CDB {.. } (LedgerDbTasksTrigger varSt) =
347
+ Watcher
348
+ { wFingerprint = ldbtsImmTip
349
+ , wInitial = Nothing
350
+ , wReader = readTVar varSt
351
+ , wNotify =
352
+ \ LedgerDbTaskState
353
+ { ldbtsImmTip
354
+ , ldbtsBlocksSinceLastSnapshot = blocksSinceLast
355
+ , ldbtsPrevSnapshotTime = prevSnapTime
356
+ } ->
357
+ whenJust (withOriginToMaybe ldbtsImmTip) $ \ slotNo -> do
358
+ LedgerDB. tryFlush cdbLedgerDB
359
+
360
+ now <- getMonotonicTime
361
+ LedgerDB. SnapCounters
362
+ { prevSnapshotTime
363
+ , ntBlocksSinceLastSnap
364
+ } <-
365
+ LedgerDB. tryTakeSnapshot
366
+ cdbLedgerDB
367
+ ((,now) <$> prevSnapTime)
368
+ blocksSinceLast
369
+ atomically $ modifyTVar varSt $ \ st ->
370
+ st
371
+ { ldbtsBlocksSinceLastSnapshot =
372
+ ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
373
+ , ldbtsPrevSnapshotTime = prevSnapshotTime
374
+ }
375
+
376
+ LedgerDB. garbageCollect cdbLedgerDB slotNo
377
+ }
378
+
288
379
{- ------------------------------------------------------------------------------
289
380
Executing garbage collection
290
381
-------------------------------------------------------------------------------}
291
382
292
383
-- | Trigger a garbage collection for blocks older than the given 'SlotNo' on
293
384
-- the VolatileDB.
294
385
--
295
- -- Also removes the corresponding cached "previously applied points" from the
296
- -- LedgerDB.
297
- --
298
386
-- This is thread-safe as the VolatileDB locks itself while performing a GC.
299
387
--
300
388
-- When calling this function it is __critical__ that the blocks that will be
@@ -304,11 +392,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
304
392
--
305
393
-- TODO will a long GC be a bottleneck? It will block any other calls to
306
394
-- @putBlock@ and @getBlock@.
307
- garbageCollect :: forall m blk . IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
308
- garbageCollect CDB {.. } slotNo = do
395
+ garbageCollectBlocks :: forall m blk . IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
396
+ garbageCollectBlocks CDB {.. } slotNo = do
309
397
VolatileDB. garbageCollect cdbVolatileDB slotNo
310
398
atomically $ do
311
- LedgerDB. garbageCollect cdbLedgerDB slotNo
312
399
modifyTVar cdbInvalid $ fmap $ Map. filter ((>= slotNo) . invalidBlockSlotNo)
313
400
traceWith cdbTracer $ TraceGCEvent $ PerformedGC slotNo
314
401
0 commit comments