@@ -42,11 +42,16 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
4242 ) where
4343
4444import Codec.Serialise (decode )
45+ import Control.Exception (assert )
4546import qualified Control.Monad as Monad
4647import Control.Monad.Trans (lift )
4748import Control.Monad.Trans.Except
49+ import Control.Monad.Trans.Maybe (MaybeT (.. ), maybeToExceptT )
4850import Control.ResourceRegistry
4951import Control.Tracer
52+ import Data.ByteString (toStrict )
53+ import qualified Data.ByteString.Builder as BS
54+ import Data.ByteString.Char8 (readInt )
5055import qualified Data.Foldable as Foldable
5156import Data.Functor.Contravariant ((>$<) )
5257import qualified Data.List as List
@@ -63,6 +68,7 @@ import qualified Data.Vector as V
6368import qualified Data.Vector.Mutable as VM
6469import qualified Data.Vector.Primitive as VP
6570import Data.Void
71+ import Data.Word
6672import Database.LSMTree (Salt , Session , Table )
6773import qualified Database.LSMTree as LSM
6874import GHC.Generics
@@ -87,6 +93,7 @@ import Ouroboros.Consensus.Util.IndexedMemPack
8793import qualified Streaming as S
8894import qualified Streaming.Prelude as S
8995import System.FS.API
96+ import System.FS.API.Lazy (hGetAll , hPutAll )
9097import qualified System.FS.BlockIO.API as BIO
9198import System.FS.BlockIO.IO
9299import System.FilePath (splitDirectories , splitFileName )
@@ -170,21 +177,24 @@ newLSMLedgerTablesHandle ::
170177 , IndexedMemPack (l EmptyMK ) (TxOut l )
171178 ) =>
172179 Tracer m LedgerDBV2Trace ->
180+ -- | The size of the tables
181+ Word64 ->
173182 (ResourceKey m , UTxOTable m ) ->
174183 m (LedgerTablesHandle m l )
175- newLSMLedgerTablesHandle tracer (origResKey, t) =
184+ newLSMLedgerTablesHandle tracer origSize (origResKey, t) =
176185 encloseTimedWith (TraceLedgerTablesHandleCreate >$< tracer) $ do
177186 tv <- newTVarIO origResKey
187+ tsize <- newTVarIO origSize
178188 pure
179189 LedgerTablesHandle
180190 { close = implClose tv
181- , duplicate = \ rr -> implDuplicate rr t tracer
191+ , duplicate = \ rr -> implDuplicate rr tsize t tracer
182192 , read = implRead tracer t
183193 , readRange = implReadRange t
184194 , readAll = implReadAll t
185- , pushDiffs = implPushDiffs tracer t
195+ , pushDiffs = implPushDiffs tracer t tsize
186196 , takeHandleSnapshot = implTakeHandleSnapshot tracer t
187- , tablesSize = pure Nothing
197+ , tablesSize = fromIntegral <$> readTVarIO tsize
188198 , transfer = atomically . writeTVar tv
189199 }
190200
@@ -206,16 +216,18 @@ implDuplicate ::
206216 , IndexedMemPack (l EmptyMK ) (TxOut l )
207217 ) =>
208218 ResourceRegistry m ->
219+ StrictTVar m Word64 ->
209220 UTxOTable m ->
210221 Tracer m LedgerDBV2Trace ->
211222 m (ResourceKey m , LedgerTablesHandle m l )
212- implDuplicate rr t tracer = do
223+ implDuplicate rr sizeTVar t tracer = do
213224 (rk, table) <-
214225 allocate
215226 rr
216227 (\ _ -> encloseTimedWith (TraceLedgerTablesHandleDuplicate >$< tracer) $ LSM. duplicate t)
217228 (encloseTimedWith (TraceLedgerTablesHandleClose >$< tracer) . LSM. closeTable)
218- (rk,) <$> newLSMLedgerTablesHandle tracer (rk, table)
229+ size <- readTVarIO sizeTVar
230+ (rk,) <$> newLSMLedgerTablesHandle tracer size (rk, table)
219231
220232implRead ::
221233 forall m l .
@@ -303,8 +315,8 @@ implPushDiffs ::
303315 , HasLedgerTables l
304316 , IndexedMemPack (l EmptyMK ) (TxOut l )
305317 ) =>
306- Tracer m LedgerDBV2Trace -> UTxOTable m -> l mk -> l DiffMK -> m ()
307- implPushDiffs tracer t _ ! st1 =
318+ Tracer m LedgerDBV2Trace -> UTxOTable m -> StrictTVar m Word64 -> l mk -> l DiffMK -> m ()
319+ implPushDiffs tracer t sizeTVar _ ! st1 =
308320 encloseTimedWith (TraceLedgerTablesHandleRead >$< tracer) $ do
309321 let LedgerTables (DiffMK (Diff. Diff diffs)) = projectLedgerTables st1
310322 let vec = V. create $ do
@@ -314,6 +326,22 @@ implPushDiffs tracer t _ !st1 =
314326 0
315327 $ Map. toList diffs
316328 pure vec'
329+ let (ins, dels) =
330+ Map. foldl'
331+ ( \ (i, d) delta -> case delta of
332+ Diff. Insert {} -> (i + 1 , d)
333+ Diff. Delete -> (i, d + 1 )
334+ )
335+ (0 , 0 )
336+ diffs
337+ atomically $
338+ modifyTVar
339+ sizeTVar
340+ ( \ x ->
341+ assert (x + ins >= x) $
342+ assert (x + ins - dels <= x + ins) $
343+ x + ins - dels
344+ )
317345 encloseTimedWith (BackendTrace . SomeBackendTrace . LSMUpdate >$< tracer) $ LSM. updates t vec
318346 where
319347 f (Diff. Insert v) = LSM. Insert (toTxOutBytes (forgetLedgerTables st1) v) Nothing
@@ -394,21 +422,50 @@ implTakeSnapshot ccfg tracer shfs@(SomeHasFS hasFs) suffix st =
394422 then
395423 return Nothing
396424 else do
425+ sz <- tablesSize (tables st)
397426 encloseTimedWith (TookSnapshot snapshot t >$< tracer) $
398- writeSnapshot snapshot
427+ writeSnapshot sz snapshot
399428 return $ Just (snapshot, t)
400429 where
401- writeSnapshot ds = do
430+ writeSnapshot sz ds = do
402431 createDirectoryIfMissing hasFs True $ snapshotToDirPath ds
403432 crc1 <- writeExtLedgerState shfs (encodeDiskExtLedgerState ccfg) (snapshotToStatePath ds) $ state st
404433 crc2 <- takeHandleSnapshot (tables st) (state st) $ snapshotToDirName ds
434+ writeUTxOSizeFile hasFs (snapshotToUTxOSizeFilePath ds) sz
405435 writeSnapshotMetadata shfs ds $
406436 SnapshotMetadata
407437 { snapshotBackend = UTxOHDLSMSnapshot
408438 , snapshotChecksum = maybe crc1 (crcOfConcat crc1) crc2
409439 , snapshotTablesCodecVersion = TablesCodecVersion1
410440 }
411441
442+ snapshotToUTxOSizeFilePath :: DiskSnapshot -> FsPath
443+ snapshotToUTxOSizeFilePath ds = snapshotToDirPath ds </> mkFsPath [" utxoSize" ]
444+
445+ writeUTxOSizeFile :: MonadThrow f => HasFS f h -> FsPath -> Int -> f ()
446+ writeUTxOSizeFile hasFs p sz =
447+ Monad. void $ withFile hasFs p (WriteMode MustBeNew ) $ \ h ->
448+ hPutAll hasFs h $ BS. toLazyByteString $ BS. intDec sz
449+
450+ readUTxOSizeFile :: MonadThrow m => HasFS m h -> FsPath -> ExceptT (SnapshotFailure blk ) m Word64
451+ readUTxOSizeFile hfs p = do
452+ exists <- lift $ doesFileExist hfs p
453+ Monad. unless exists $ throwE (InitFailureRead ReadSnapshotDataCorruption )
454+ maybeToExceptT (InitFailureRead ReadSnapshotDataCorruption ) $
455+ MaybeT $
456+ withFile hfs p ReadMode $ \ h ->
457+ ( \ case
458+ Nothing -> Nothing
459+ Just i ->
460+ if i < 0
461+ then Nothing
462+ else Just (fromIntegral i)
463+ )
464+ . fmap fst
465+ . readInt
466+ . toStrict
467+ <$> hGetAll hfs h
468+
412469-- | Delete snapshot from disk and also from the LSM tree database.
413470implDeleteSnapshot ::
414471 IOLike m =>
@@ -471,6 +528,7 @@ loadSnapshot tracer rr ccfg fs@(SomeHasFS hfs) session ds =
471528 withExceptT
472529 (InitFailureRead . ReadSnapshotFailed )
473530 $ readExtLedgerState fs (decodeDiskExtLedgerState ccfg) decode (snapshotToStatePath ds)
531+ msz <- readUTxOSizeFile hfs (snapshotToUTxOSizeFilePath ds)
474532 case pointToWithOriginRealPoint (castPoint (getTip extLedgerSt)) of
475533 Origin -> throwE InitFailureGenesis
476534 NotOrigin pt -> do
@@ -492,7 +550,7 @@ loadSnapshot tracer rr ccfg fs@(SomeHasFS hfs) session ds =
492550 $ InitFailureRead
493551 ReadSnapshotDataCorruption
494552 (,pt)
495- <$> lift (empty extLedgerSt (rk, values) (newLSMLedgerTablesHandle tracer))
553+ <$> lift (empty extLedgerSt (rk, values) (newLSMLedgerTablesHandle tracer msz ))
496554
497555-- | Create the initial LSM table from values, which should happen only at
498556-- Genesis.
@@ -504,7 +562,7 @@ tableFromValuesMK ::
504562 Session m ->
505563 l EmptyMK ->
506564 LedgerTables l ValuesMK ->
507- m (ResourceKey m , UTxOTable m )
565+ m (ResourceKey m , UTxOTable m , Word64 )
508566tableFromValuesMK tracer rr session st (LedgerTables (ValuesMK values)) = do
509567 (rk, table) <-
510568 allocate
@@ -515,7 +573,7 @@ tableFromValuesMK tracer rr session st (LedgerTables (ValuesMK values)) = do
515573 )
516574 (encloseTimedWith (TraceLedgerTablesHandleClose >$< tracer) . LSM. closeTable)
517575 mapM_ (go table) $ chunks 1000 $ Map. toList values
518- pure (rk, table)
576+ pure (rk, table, fromIntegral $ Map. size values )
519577 where
520578 go table items =
521579 LSM. inserts table $
@@ -613,9 +671,9 @@ instance
613671 loadSnapshot trcr reg ccfg shfs (sessionResource res) ds
614672
615673 newHandleFromValues trcr reg res st = do
616- table <-
674+ (rk, table, sz) <-
617675 tableFromValuesMK trcr reg (sessionResource res) (forgetLedgerTables st) (ltprj st)
618- newLSMLedgerTablesHandle trcr table
676+ newLSMLedgerTablesHandle trcr sz (rk, table)
619677
620678 snapshotManager _ res = Ouroboros.Consensus.Storage.LedgerDB.V2.LSM. snapshotManager (sessionResource res)
621679
@@ -636,13 +694,15 @@ instance
636694 = SinkLSM
637695 -- \| Chunk size
638696 Int
639- -- \| Snap name
640- String
697+ -- \| LedgerDB snapshot fs
698+ (SomeHasFS m )
699+ -- \| DiskSnapshot
700+ DiskSnapshot
641701 (Session m )
642702
643703 yield _ (YieldLSM chunkSize hdl ) = yieldLsmS chunkSize hdl
644704
645- sink _ (SinkLSM chunkSize snapName session ) = sinkLsmS chunkSize snapName session
705+ sink _ (SinkLSM chunkSize shfs ds session ) = sinkLsmS chunkSize shfs ds session
646706
647707data SomeHasFSAndBlockIO m where
648708 SomeHasFSAndBlockIO ::
@@ -684,35 +744,37 @@ sinkLsmS ::
684744 , IndexedMemPack (l EmptyMK ) (TxOut l )
685745 ) =>
686746 Int ->
687- String ->
747+ SomeHasFS m ->
748+ DiskSnapshot ->
688749 Session m ->
689750 Sink m l
690- sinkLsmS writeChunkSize snapName session st s = do
691- tb :: UTxOTable m <- lift $ LSM. newTable session
692- r <- go tb writeChunkSize mempty s
751+ sinkLsmS writeChunkSize ( SomeHasFS hfs) ds session st stream = do
752+ lsmTable :: UTxOTable m <- lift $ LSM. newTable session
753+ (r, utxosSize) <- go ( 0 :: Int ) lsmTable writeChunkSize mempty stream
693754 lift $
694755 LSM. saveSnapshot
695- (LSM. toSnapshotName snapName )
756+ (LSM. toSnapshotName (snapshotToDirName ds) )
696757 (LSM. SnapshotLabel $ T. pack " UTxO table" )
697- tb
698- lift $ LSM. closeTable tb
758+ lsmTable
759+ lift $ LSM. closeTable lsmTable
760+ lift $ writeUTxOSizeFile hfs (snapshotToUTxOSizeFilePath ds) utxosSize
699761 pure (fmap (,Nothing ) r)
700762 where
701- go tb 0 m s' = do
702- lift $
703- LSM. inserts tb $
704- V. fromList [(toTxInBytes (Proxy @ l ) k, toTxOutBytes st v, Nothing ) | (k, v) <- m]
705- go tb writeChunkSize mempty s'
706- go tb n m s' = do
707- mbs <- S. uncons s'
708- case mbs of
763+ writeToTable lsmTable accUTxOs =
764+ LSM. inserts lsmTable $
765+ V. fromList
766+ [(toTxInBytes (Proxy @ l ) txin, toTxOutBytes st txout, Nothing ) | (txin, txout) <- accUTxOs]
767+
768+ go utxosSize lsmTable 0 accUTxOs stream' = do
769+ lift $ writeToTable lsmTable accUTxOs
770+ go utxosSize lsmTable writeChunkSize mempty stream'
771+ go utxosSize lsmTable numToRead accUTxOs stream' = do
772+ mItem <- S. uncons stream'
773+ case mItem of
709774 Nothing -> do
710- lift $
711- LSM. inserts tb $
712- V. fromList
713- [(toTxInBytes (Proxy @ l ) k, toTxOutBytes st v, Nothing ) | (k, v) <- m]
714- S. effects s'
715- Just (item, s'') -> go tb (n - 1 ) (item : m) s''
775+ lift $ writeToTable lsmTable accUTxOs
776+ (,utxosSize) <$> S. effects stream'
777+ Just (item, stream'') -> go (utxosSize + 1 ) lsmTable (numToRead - 1 ) (item : accUTxOs) stream''
716778
717779-- | Create Yield arguments for LSM
718780mkLSMYieldArgs ::
@@ -746,15 +808,17 @@ mkLSMYieldArgs fp snapName mkFS mkGen _ reg = do
746808 (LSM. SnapshotLabel $ T. pack " UTxO table" )
747809 )
748810 LSM. closeTable
749- YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer tb
811+ YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer 0 tb
750812
751813-- | Create Sink arguments for LSM
752814mkLSMSinkArgs ::
753815 IOLike m =>
754816 -- | The filepath in which the LSM database should be opened. Must not have a trailing slash!
755817 FilePath ->
756- -- | The complete name of the snapshot to be created, so @<slotno>[_<suffix>]@.
757- String ->
818+ -- | The filepath to the snapshot to be created, so @.../.../ledger/<slotno>[_<suffix>]@.
819+ FilePath ->
820+ -- | Usually 'ioHasFS'
821+ (MountPoint -> SomeHasFS m ) ->
758822 -- | Usually 'stdMkBlockIOFS'
759823 (FilePath -> ResourceRegistry m -> m (a , SomeHasFSAndBlockIO m )) ->
760824 -- | Usually 'newStdGen'
@@ -764,18 +828,20 @@ mkLSMSinkArgs ::
764828 m (SinkArgs m LSM l )
765829mkLSMSinkArgs
766830 (splitFileName -> (fp, lsmDir))
767- snapName
768- mkFS
831+ snapFP
832+ mkFs
833+ mkBlockIOFS
769834 mkGen
770835 _
771836 reg =
772837 do
773- (_, SomeHasFSAndBlockIO hasFS blockIO) <- mkFS fp reg
838+ (_, SomeHasFSAndBlockIO hasFS blockIO) <- mkBlockIOFS fp reg
774839 removeDirectoryRecursive hasFS lsmFsPath
775840 createDirectory hasFS lsmFsPath
776841 salt <- fst . genWord64 <$> mkGen
777842 (_, session) <-
778843 allocate reg (\ _ -> LSM. newSession nullTracer hasFS blockIO salt lsmFsPath) LSM. closeSession
779- pure (SinkLSM 1000 snapName session)
844+ let snapFS = mkFs (MountPoint snapFP)
845+ pure (SinkLSM 1000 snapFS (fromJust $ snapshotFromPath $ last $ splitDirectories snapFP) session)
780846 where
781847 lsmFsPath = mkFsPath [lsmDir]
0 commit comments