From 30c149ec4c22ef490f503c845d7136ac598be032 Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Mon, 25 Aug 2025 16:57:48 +0200 Subject: [PATCH] Implement LSM-trees based ledger tables handles in LedgerDB V2 --- cabal.project | 14 +- nix/haskell.nix | 10 +- .../app/DBAnalyser/Parsers.hs | 9 +- .../app/snapshot-converter.hs | 5 +- .../20250729_124022_javier.sagredo_lsm1.md | 25 + .../ouroboros-consensus-cardano.cabal | 1 + .../Ouroboros/Consensus/Shelley/ShelleyHFC.hs | 5 +- .../ThreadNet/Infra/ShelleyBasedHardFork.hs | 7 +- .../Cardano/Tools/DBAnalyser/Run.hs | 42 +- .../Cardano/Tools/DBAnalyser/Types.hs | 2 +- .../20250729_125052_javier.sagredo_lsm1.md | 23 + .../Ouroboros/Consensus/Node.hs | 28 +- .../20250729_124008_javier.sagredo_lsm1.md | 27 + ouroboros-consensus/ouroboros-consensus.cabal | 3 + .../Consensus/Storage/ChainDB/Impl/Types.hs | 6 - .../Ouroboros/Consensus/Storage/LedgerDB.hs | 22 +- .../Consensus/Storage/LedgerDB/API.hs | 22 +- .../Consensus/Storage/LedgerDB/Args.hs | 9 + .../Consensus/Storage/LedgerDB/Snapshots.hs | 3 + .../Consensus/Storage/LedgerDB/TraceEvent.hs | 5 +- .../Consensus/Storage/LedgerDB/V1/Args.hs | 7 - .../Consensus/Storage/LedgerDB/V2.hs | 14 +- .../Consensus/Storage/LedgerDB/V2/Args.hs | 31 +- .../Consensus/Storage/LedgerDB/V2/Forker.hs | 6 +- .../Consensus/Storage/LedgerDB/V2/InMemory.hs | 7 +- .../Consensus/Storage/LedgerDB/V2/LSM.hs | 493 ++++++++++++++++++ .../Storage/LedgerDB/V2/LedgerSeq.hs | 45 +- .../Consensus/Util/IndexedMemPack.hs | 32 +- .../Storage/LedgerDB/StateMachine.hs | 177 +++++-- 29 files changed, 939 insertions(+), 141 deletions(-) create mode 100644 ouroboros-consensus-cardano/changelog.d/20250729_124022_javier.sagredo_lsm1.md create mode 100644 ouroboros-consensus-diffusion/changelog.d/20250729_125052_javier.sagredo_lsm1.md create mode 100644 ouroboros-consensus/changelog.d/20250729_124008_javier.sagredo_lsm1.md create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs diff --git a/cabal.project b/cabal.project index 2162410163..9ef5807c0d 100644 --- a/cabal.project +++ b/cabal.project @@ -14,7 +14,7 @@ repository cardano-haskell-packages -- update either of these. index-state: -- Bump this if you need newer packages from Hackage - , hackage.haskell.org 2025-07-22T09:13:54Z + , hackage.haskell.org 2025-08-07T11:24:08Z -- Bump this if you need newer packages from CHaP , cardano-haskell-packages 2025-08-21T09:41:03Z @@ -56,6 +56,11 @@ allow-newer: , fin:QuickCheck , bin:QuickCheck +if impl (ghc >= 9.10) + allow-newer: + -- https://github.com/phadej/regression-simple/pull/14 + , regression-simple:base + source-repository-package type: git location: https://github.com/IntersectMBO/cardano-ledger @@ -90,3 +95,10 @@ constraints: plutus-core < 1.51, plutus-ledger-api < 1.51, plutus-tx < 1.51, + +if os (windows) + source-repository-package + type: git + location: https://github.com/jasagredo/digest + tag: 329fc2a911878ffe47472751cb40aae20ab2c00a + --sha256: sha256-84f8dFee9EfWbQ5UTLZ9MrsZ3JVojNhzfTGmWof6wHU= diff --git a/nix/haskell.nix b/nix/haskell.nix index 9965d3937d..3a946ed59e 100644 --- a/nix/haskell.nix +++ b/nix/haskell.nix @@ -89,9 +89,13 @@ in nativeBuildInputs = [ final.fd final.cabal-docspec - (hsPkgs.ghcWithPackages - (ps: [ ps.latex-svg-image ] ++ lib.filter (p: p ? components.library) - (lib.attrValues (haskell-nix.haskellLib.selectProjectPackages ps)))) + (hsPkgs.shellFor { + withHoogle = false; + exactDeps = true; + packages = _: [ ]; + additional = (ps: [ ps.latex-svg-image ] ++ lib.filter (p: p ? components.library) + (lib.attrValues (haskell-nix.haskellLib.selectProjectPackages ps))); + }).ghc final.texliveFull ]; diff --git a/ouroboros-consensus-cardano/app/DBAnalyser/Parsers.hs b/ouroboros-consensus-cardano/app/DBAnalyser/Parsers.hs index 1b3587289a..710994dcee 100644 --- a/ouroboros-consensus-cardano/app/DBAnalyser/Parsers.hs +++ b/ouroboros-consensus-cardano/app/DBAnalyser/Parsers.hs @@ -46,7 +46,7 @@ parseDBAnalyserConfig = [ flag' V1InMem $ mconcat [ long "v1-in-mem" - , help "use v1 in-memory backing store" + , help "use v1 in-memory backing store [deprecated]" ] , flag' V1LMDB $ mconcat @@ -55,9 +55,14 @@ parseDBAnalyserConfig = ] , flag' V2InMem $ mconcat - [ long "v2-in-mem" + [ long "in-mem" , help "use v2 in-memory backend" ] + , flag' V2LSM $ + mconcat + [ long "lsm" + , help "use v2 LSM backend" + ] ] parseSelectDB :: Parser SelectDB diff --git a/ouroboros-consensus-cardano/app/snapshot-converter.hs b/ouroboros-consensus-cardano/app/snapshot-converter.hs index 7c3eba6b4f..bf4354a20c 100644 --- a/ouroboros-consensus-cardano/app/snapshot-converter.hs +++ b/ouroboros-consensus-cardano/app/snapshot-converter.hs @@ -38,6 +38,7 @@ import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.DbChangelog as V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Lock as V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as V1 +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as InMemory import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as V2 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq as V2 import Ouroboros.Consensus.Util.CRC @@ -199,7 +200,7 @@ load config@Config{inpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS), pa checkSnapshotFileStructure Mem path fs (ls, _) <- withExceptT SnapshotError $ V2.loadSnapshot nullTracer rr ccfg fs ds let h = V2.currentHandle ls - (V2.state h,) <$> Trans.lift (V2.readAll (V2.tables h)) + (V2.state h,) <$> Trans.lift (V2.readAll (V2.tables h) (V2.state h)) LMDB -> do checkSnapshotFileStructure LMDB path fs ((dbch, k, bstore), _) <- @@ -240,7 +241,7 @@ store config@Config{outpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS), Mem -> do lseq <- V2.empty state tbs $ V2.newInMemoryLedgerTablesHandle nullTracer fs let h = V2.currentHandle lseq - Monad.void $ V2.implTakeSnapshot ccfg nullTracer fs suffix h + Monad.void $ InMemory.implTakeSnapshot ccfg nullTracer fs suffix h LMDB -> do chlog <- newTVarIO (V1.empty state) lock <- V1.mkLedgerDBLock diff --git a/ouroboros-consensus-cardano/changelog.d/20250729_124022_javier.sagredo_lsm1.md b/ouroboros-consensus-cardano/changelog.d/20250729_124022_javier.sagredo_lsm1.md new file mode 100644 index 0000000000..41b1fa620d --- /dev/null +++ b/ouroboros-consensus-cardano/changelog.d/20250729_124022_javier.sagredo_lsm1.md @@ -0,0 +1,25 @@ + + + + + diff --git a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal index 47472f1d8e..eee812f8bf 100644 --- a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal +++ b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal @@ -588,6 +588,7 @@ library unstable-cardano-tools ouroboros-network-api, ouroboros-network-framework ^>=0.19, ouroboros-network-protocols, + random, resource-registry, singletons, sop-core, diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/ShelleyHFC.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/ShelleyHFC.hs index 1ec5d58f45..528cafef8f 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/ShelleyHFC.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/ShelleyHFC.hs @@ -10,7 +10,6 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} @@ -429,7 +428,7 @@ instance { getShelleyBlockHFCTxIn :: SL.TxIn } deriving stock (Show, Eq, Ord) - deriving newtype NoThunks + deriving newtype (NoThunks, MemPack) injectCanonicalTxIn IZ txIn = ShelleyBlockHFCTxIn txIn injectCanonicalTxIn (IS idx') _ = case idx' of {} @@ -437,8 +436,6 @@ instance ejectCanonicalTxIn IZ txIn = getShelleyBlockHFCTxIn txIn ejectCanonicalTxIn (IS idx') _ = case idx' of {} -deriving newtype instance MemPack (CanonicalTxIn '[ShelleyBlock proto era]) - {------------------------------------------------------------------------------- HardForkTxOut -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs b/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs index 07b5f4b5cf..d150d034fc 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-testlib/Test/ThreadNet/Infra/ShelleyBasedHardFork.hs @@ -12,7 +12,6 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} @@ -498,7 +497,7 @@ instance { getShelleyHFCTxIn :: SL.TxIn } deriving stock (Show, Eq, Ord) - deriving newtype NoThunks + deriving newtype (NoThunks, MemPack) injectCanonicalTxIn IZ txIn = ShelleyHFCTxIn txIn injectCanonicalTxIn (IS IZ) txIn = ShelleyHFCTxIn (coerce txIn) @@ -508,10 +507,6 @@ instance ejectCanonicalTxIn (IS IZ) txIn = coerce (getShelleyHFCTxIn txIn) ejectCanonicalTxIn (IS (IS idx')) _ = case idx' of {} -deriving newtype instance - ShelleyBasedHardForkConstraints proto1 era1 proto2 era2 => - MemPack (CanonicalTxIn (ShelleyBasedHardForkEras proto1 era1 proto2 era2)) - instance ShelleyBasedHardForkConstraints proto1 era1 proto2 era2 => HasHardForkTxOut (ShelleyBasedHardForkEras proto1 era1 proto2 era2) diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs index 2832ffd437..8ad7880afd 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs @@ -15,9 +15,9 @@ import Cardano.Tools.DBAnalyser.HasAnalysis import Cardano.Tools.DBAnalyser.Types import Control.ResourceRegistry import Control.Tracer (Tracer (..), nullTracer) +import Data.Functor.Contravariant ((>$<)) import qualified Data.SOP.Dict as Dict import Data.Singletons (Sing, SingI (..)) -import Data.Void import qualified Debug.Trace as Debug import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config @@ -35,19 +35,24 @@ import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Args as ChainDB import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB import qualified Ouroboros.Consensus.Storage.ImmutableDB.Stream as ImmutableDB +import Ouroboros.Consensus.Storage.LedgerDB (TraceEvent (..)) import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB import qualified Ouroboros.Consensus.Storage.LedgerDB.V1 as LedgerDB.V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Args as LedgerDB.V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB as LMDB -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as LedgerDB.V1 +import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2 as LedgerDB.V2 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as LedgerDB.V2 +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as InMemory +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LSM as LSM import Ouroboros.Consensus.Util.Args import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.Orphans () import Ouroboros.Network.Block (genesisPoint) +import System.FS.API import System.IO +import System.Random import Text.Printf (printf) {------------------------------------------------------------------------------- @@ -66,7 +71,7 @@ openLedgerDB :: , LedgerDB.TestInternals' IO blk ) openLedgerDB lgrDbArgs@LedgerDB.LedgerDbArgs{LedgerDB.lgrFlavorArgs = LedgerDB.LedgerDbFlavorArgsV1 bss} = do - let snapManager = LedgerDB.V1.snapshotManager lgrDbArgs + let snapManager = V1.snapshotManager lgrDbArgs (ledgerDB, _, intLedgerDB) <- LedgerDB.openDBInternal lgrDbArgs @@ -82,8 +87,27 @@ openLedgerDB lgrDbArgs@LedgerDB.LedgerDbArgs{LedgerDB.lgrFlavorArgs = LedgerDB.L pure (ledgerDB, intLedgerDB) openLedgerDB lgrDbArgs@LedgerDB.LedgerDbArgs{LedgerDB.lgrFlavorArgs = LedgerDB.LedgerDbFlavorArgsV2 args} = do (snapManager, bss') <- case args of - LedgerDB.V2.V2Args LedgerDB.V2.InMemoryHandleArgs -> pure (InMemory.snapshotManager lgrDbArgs, LedgerDB.V2.InMemoryHandleEnv) - LedgerDB.V2.V2Args (LedgerDB.V2.LSMHandleArgs (LedgerDB.V2.LSMArgs x)) -> absurd x + V2.V2Args V2.InMemoryHandleArgs -> pure (InMemory.snapshotManager lgrDbArgs, V2.InMemoryHandleEnv) + V2.V2Args (V2.LSMHandleArgs (V2.LSMArgs path salt mkFS)) -> do + (rk1, V2.SomeHasFSAndBlockIO fs' blockio) <- mkFS (LedgerDB.lgrRegistry lgrDbArgs) + session <- + allocate + (LedgerDB.lgrRegistry lgrDbArgs) + ( \_ -> + LSM.openSession + ( LedgerDBFlavorImplEvent . LedgerDB.FlavorImplSpecificTraceV2 . V2.LSMTrace + >$< LedgerDB.lgrTracer lgrDbArgs + ) + fs' + blockio + salt + path + ) + LSM.closeSession + pure + ( LSM.snapshotManager (snd session) lgrDbArgs + , V2.LSMHandleEnv (V2.LSMResources (fst session) (snd session) rk1) + ) (ledgerDB, _, intLedgerDB) <- LedgerDB.openDBInternal lgrDbArgs @@ -126,6 +150,7 @@ analyse dbaConfig args = lock <- newMVar () chainDBTracer <- mkTracer lock verbose analysisTracer <- mkTracer lock True + lsmSalt <- fst . genWord64 <$> newStdGen ProtocolInfo{pInfoInitLedger = genesisLedger, pInfoConfig = cfg} <- mkProtocolInfo args let shfs = Node.stdMkChainDbHasFS dbDir @@ -150,6 +175,13 @@ analyse dbaConfig args = V2InMem -> LedgerDB.LedgerDbFlavorArgsV2 (LedgerDB.V2.V2Args LedgerDB.V2.InMemoryHandleArgs) + V2LSM -> + LedgerDB.LedgerDbFlavorArgsV2 + ( LedgerDB.V2.V2Args + ( LedgerDB.V2.LSMHandleArgs + (LedgerDB.V2.LSMArgs (mkFsPath ["lsm"]) lsmSalt (LSM.stdMkBlockIOFS dbDir)) + ) + ) args' = ChainDB.completeChainDbArgs registry diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Types.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Types.hs index a43929a250..d7bdc8fb63 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Types.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Types.hs @@ -49,7 +49,7 @@ newtype NumberOfBlocks = NumberOfBlocks {unNumberOfBlocks :: Word64} data Limit = Limit Int | Unlimited -data LedgerDBBackend = V1InMem | V1LMDB | V2InMem +data LedgerDBBackend = V1InMem | V1LMDB | V2InMem | V2LSM -- | The extent of the ChainDB on-disk files validation. This is completely -- unrelated to validation of the ledger rules. diff --git a/ouroboros-consensus-diffusion/changelog.d/20250729_125052_javier.sagredo_lsm1.md b/ouroboros-consensus-diffusion/changelog.d/20250729_125052_javier.sagredo_lsm1.md new file mode 100644 index 0000000000..147392cce5 --- /dev/null +++ b/ouroboros-consensus-diffusion/changelog.d/20250729_125052_javier.sagredo_lsm1.md @@ -0,0 +1,23 @@ + + + + +### Breaking + +- `srnLdbFlavorArgs` was renamed to `srnLedgerDbBackendArgs` and changed its type to `LedgerDBBackendArgs`. + diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index 7c0535c1bd..d92b7f8e07 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -53,6 +53,7 @@ module Ouroboros.Consensus.Node , pattern DoDiskSnapshotChecksum , pattern NoDoDiskSnapshotChecksum , ChainSyncIdleTimeout (..) + , LedgerDbBackendArgs (..) -- * Internal helpers , mkNodeKernelArgs @@ -126,6 +127,8 @@ import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Args as ChainDB import Ouroboros.Consensus.Storage.LedgerDB.Args import Ouroboros.Consensus.Storage.LedgerDB.Snapshots +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LSM as LSM import Ouroboros.Consensus.Util.Args import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.Orphans () @@ -173,11 +176,11 @@ import Ouroboros.Network.Protocol.ChainSync.Codec (timeLimitsChainSync) import Ouroboros.Network.RethrowPolicy import qualified SafeWildCards import System.Exit (ExitCode (..)) -import System.FS.API (SomeHasFS (..)) +import System.FS.API (SomeHasFS (..), mkFsPath) import System.FS.API.Types (MountPoint (..)) import System.FS.IO (ioHasFS) -import System.FilePath (()) -import System.Random (StdGen, newStdGen, randomIO, split) +import System.FilePath (splitDirectories, ()) +import System.Random (StdGen, genWord64, newStdGen, randomIO, split) {------------------------------------------------------------------------------- The arguments to the Consensus Layer node functionality @@ -375,7 +378,7 @@ data , -- Ad hoc values to replace default ChainDB configurations srnSnapshotPolicyArgs :: SnapshotPolicyArgs , srnQueryBatchSize :: QueryBatchSize - , srnLdbFlavorArgs :: Complete LedgerDbFlavorArgs m + , srnLedgerDbBackendArgs :: LedgerDbBackendArgs m } {------------------------------------------------------------------------------- @@ -1004,7 +1007,7 @@ stdLowLevelRunNodeArgsIO } $(SafeWildCards.fields 'StdRunNodeArgs) = do llrnBfcSalt <- stdBfcSaltIO - llrnRng <- newStdGen + (lsmSalt, llrnRng) <- genWord64 <$> newStdGen pure LowLevelRunNodeArgs { llrnBfcSalt @@ -1050,7 +1053,20 @@ stdLowLevelRunNodeArgsIO , llrnPublicPeerSelectionStateVar = Diffusion.dcPublicPeerSelectionVar srnDiffusionConfiguration , llrnLdbFlavorArgs = - srnLdbFlavorArgs + case srnLedgerDbBackendArgs of + V1LMDB args -> LedgerDbFlavorArgsV1 args + V2InMemory -> LedgerDbFlavorArgsV2 (V2.V2Args V2.InMemoryHandleArgs) + V2LSM path -> + LedgerDbFlavorArgsV2 + ( V2.V2Args + ( V2.LSMHandleArgs + ( V2.LSMArgs + (mkFsPath $ splitDirectories path) + lsmSalt + (LSM.stdMkBlockIOFS (nonImmutableDbPath srnDatabasePath)) + ) + ) + ) } where networkMagic :: NetworkMagic diff --git a/ouroboros-consensus/changelog.d/20250729_124008_javier.sagredo_lsm1.md b/ouroboros-consensus/changelog.d/20250729_124008_javier.sagredo_lsm1.md new file mode 100644 index 0000000000..2ad1d8ab2b --- /dev/null +++ b/ouroboros-consensus/changelog.d/20250729_124008_javier.sagredo_lsm1.md @@ -0,0 +1,27 @@ + + + + +### Breaking + +- Implement LSM-trees backend for LedgerDB V2 handles. +- Define new `LedgerDbBackendArgs` that will be provided by the node. +- Drop `Eq (Ouroboros.Consensus.Storage.ChainDb.Impl.Types.TraceEvent blk)` instance. +- Delete unused `Ouroboros.Consensus.Storage.LedgerDB.V1.Args.defaultLedgerDbFlavorArgs`. +- LedgerDB V2 forker reading functions now also receive a LedgerState to deserialize values from LSM trees. +- Expose `indexedPackByteArray` and define new `indexedUnpack` mirroring `unpack` from the `mempack` package. diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 1c93bb8f26..836705b2b4 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -261,6 +261,7 @@ library Ouroboros.Consensus.Storage.LedgerDB.V2.Args Ouroboros.Consensus.Storage.LedgerDB.V2.Forker Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory + Ouroboros.Consensus.Storage.LedgerDB.V2.LSM Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq Ouroboros.Consensus.Storage.Serialisation Ouroboros.Consensus.Storage.VolatileDB @@ -309,6 +310,7 @@ library base16-bytestring, bimap >=0.4 && <0.6, binary >=0.8 && <0.11, + blockio, bytestring >=0.10 && <0.13, cardano-binary, cardano-crypto-class, @@ -328,6 +330,7 @@ library fs-api ^>=0.4, hashable, io-classes:{io-classes, si-timers, strict-mvar, strict-stm} ^>=1.8.0.1, + lsm-tree, measures, mempack, monoid-subclasses, diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index 61a3c7380d..58761a7c99 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -721,12 +721,6 @@ data TraceEvent blk | TraceChainSelStarvationEvent (TraceChainSelStarvationEvent blk) deriving Generic -deriving instance - ( Eq (Header blk) - , LedgerSupportsProtocol blk - , InspectLedger blk - ) => - Eq (TraceEvent blk) deriving instance ( Show (Header blk) , LedgerSupportsProtocol blk diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs index 4b03b5e22d..51aa6b942b 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB.hs @@ -17,8 +17,8 @@ module Ouroboros.Consensus.Storage.LedgerDB , openDBInternal ) where +import Control.ResourceRegistry import Data.Functor.Contravariant ((>$<)) -import Data.Void import Data.Word import Ouroboros.Consensus.Block import Ouroboros.Consensus.HardFork.Abstract @@ -35,6 +35,7 @@ import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2 as V2 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as InMemory +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LSM as LSM import Ouroboros.Consensus.Util.Args import Ouroboros.Consensus.Util.CallStack import Ouroboros.Consensus.Util.IOLike @@ -80,7 +81,24 @@ openDB LedgerDbFlavorArgsV2 bss -> do (snapManager, bss') <- case bss of V2.V2Args V2.InMemoryHandleArgs -> pure (InMemory.snapshotManager args, V2.InMemoryHandleEnv) - V2.V2Args (V2.LSMHandleArgs (V2.LSMArgs x)) -> absurd x + V2.V2Args (V2.LSMHandleArgs (V2.LSMArgs path salt mkFS)) -> do + (rk1, V2.SomeHasFSAndBlockIO fs blockio) <- mkFS (lgrRegistry args) + session <- + allocate + (lgrRegistry args) + ( \_ -> + LSM.openSession + (LedgerDBFlavorImplEvent . FlavorImplSpecificTraceV2 . V2.LSMTrace >$< lgrTracer args) + fs + blockio + salt + path + ) + LSM.closeSession + pure + ( LSM.snapshotManager (snd session) args + , V2.LSMHandleEnv (V2.LSMResources (fst session) (snd session) rk1) + ) let initDb = V2.mkInitDb args diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs index 96d3930c7d..7a9891e1f2 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs @@ -304,7 +304,7 @@ data WhereToTakeSnapshot = TakeAtImmutableTip | TakeAtVolatileTip deriving Eq data TestInternals m l blk = TestInternals { wipeLedgerDB :: m () , takeSnapshotNOW :: WhereToTakeSnapshot -> Maybe String -> m () - , push :: ExtLedgerState blk DiffMK -> m () + , push :: l DiffMK -> m () -- ^ Push a ledger state, and prune the 'LedgerDB' to its immutable tip. -- -- This does not modify the set of previously applied points. @@ -728,12 +728,6 @@ data TraceReplayProgressEvent blk Updating ledger tables -------------------------------------------------------------------------------} -type LedgerSupportsInMemoryLedgerDB l = - (CanUpgradeLedgerTables l, SerializeTablesWithHint l) - -type LedgerSupportsV1LedgerDB l = - (LedgerSupportsInMemoryLedgerDB l, LedgerSupportsLMDBLedgerDB l) - -- | When pushing differences on InMemory Ledger DBs, we will sometimes need to -- update ledger tables to the latest era. For unary blocks this is a no-op, but -- for the Cardano block, we will need to upgrade all TxOuts in memory. @@ -768,20 +762,26 @@ instance LedgerTables (ValuesMK (Map.map absurd mk)) {------------------------------------------------------------------------------- - Supporting On-Disk backing stores + LedgerDB constraints -------------------------------------------------------------------------------} +type LedgerSupportsInMemoryLedgerDB l = + (CanUpgradeLedgerTables l, SerializeTablesWithHint l) + type LedgerSupportsLMDBLedgerDB l = (IndexedMemPack (l EmptyMK) (TxOut l), MemPackIdx l EmptyMK ~ l EmptyMK) +type LedgerSupportsV1LedgerDB l = + (LedgerSupportsInMemoryLedgerDB l, LedgerSupportsLMDBLedgerDB l) + type LedgerSupportsV2LedgerDB l = - (LedgerSupportsInMemoryLedgerDB l) + (LedgerSupportsInMemoryLedgerDB l, MemPack (TxIn l)) type LedgerSupportsLedgerDB blk = LedgerSupportsLedgerDB' (LedgerState blk) blk type LedgerSupportsLedgerDB' l blk = - ( LedgerSupportsLMDBLedgerDB l - , LedgerSupportsInMemoryLedgerDB l + ( LedgerSupportsV1LedgerDB l + , LedgerSupportsV2LedgerDB l , LedgerDbSerialiseConstraints blk ) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Args.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Args.hs index 63935c89fa..0516b16c85 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Args.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Args.hs @@ -18,6 +18,7 @@ -- | Arguments for LedgerDB initialization. module Ouroboros.Consensus.Storage.LedgerDB.Args ( LedgerDbArgs (..) + , LedgerDbBackendArgs (..) , LedgerDbFlavorArgs (..) , QueryBatchSize (..) , defaultArgs @@ -40,6 +41,14 @@ import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 import Ouroboros.Consensus.Util.Args import System.FS.API +data LedgerDbBackendArgs m + = V1LMDB (Complete V1.LedgerDbFlavorArgs m) + | V2InMemory + | V2LSM + -- | The filepath **relative to the fast storage device** in which we will + -- open/create the LSM-tree database. + FilePath + {------------------------------------------------------------------------------- Arguments -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs index 76b4a29267..4a859b3746 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs @@ -184,17 +184,20 @@ instance FromJSON SnapshotMetadata where data SnapshotBackend = UTxOHDMemSnapshot | UTxOHDLMDBSnapshot + | UTxOHDLSMSnapshot deriving (Eq, Show) instance ToJSON SnapshotBackend where toJSON = \case UTxOHDMemSnapshot -> "utxohd-mem" UTxOHDLMDBSnapshot -> "utxohd-lmdb" + UTxOHDLSMSnapshot -> "utxohd-lsm" instance FromJSON SnapshotBackend where parseJSON = Aeson.withText "SnapshotBackend" $ \case "utxohd-mem" -> pure UTxOHDMemSnapshot "utxohd-lmdb" -> pure UTxOHDLMDBSnapshot + "utxohd-lsm" -> pure UTxOHDLSMSnapshot _ -> fail "unknown SnapshotBackend" data MetadataErr diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/TraceEvent.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/TraceEvent.hs index 1072efa1fb..1bc193cd87 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/TraceEvent.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/TraceEvent.hs @@ -26,7 +26,7 @@ import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 data FlavorImplSpecificTrace = FlavorImplSpecificTraceV1 V1.FlavorImplSpecificTrace | FlavorImplSpecificTraceV2 V2.FlavorImplSpecificTrace - deriving (Show, Eq) + deriving Show data TraceEvent blk = LedgerDBSnapshotEvent !(TraceSnapshotEvent blk) @@ -38,6 +38,3 @@ data TraceEvent blk deriving instance (StandardHash blk, InspectLedger blk) => Show (TraceEvent blk) -deriving instance - (StandardHash blk, InspectLedger blk) => - Eq (TraceEvent blk) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Args.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Args.hs index 405f3d2581..841f79cda3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Args.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Args.hs @@ -13,7 +13,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.V1.Args ( BackingStoreArgs (..) , FlushFrequency (..) , LedgerDbFlavorArgs (..) - , defaultLedgerDbFlavorArgs , shouldFlush ) where @@ -55,9 +54,3 @@ data BackingStoreArgs f m class (MonadIO m, PrimState m ~ PrimState IO) => MonadIOPrim m instance (MonadIO m, PrimState m ~ PrimState IO) => MonadIOPrim m - -defaultLedgerDbFlavorArgs :: Incomplete LedgerDbFlavorArgs m -defaultLedgerDbFlavorArgs = V1Args DefaultFlushFrequency defaultBackingStoreArgs - -defaultBackingStoreArgs :: Incomplete BackingStoreArgs m -defaultBackingStoreArgs = InMemoryBackingStoreArgs diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs index ef6ba64882..251f2f5f13 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs @@ -47,6 +47,7 @@ import Ouroboros.Consensus.HeaderValidation import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Ledger.SupportsProtocol +import Ouroboros.Consensus.Ledger.Tables.Utils import Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache import Ouroboros.Consensus.Storage.LedgerDB.API import Ouroboros.Consensus.Storage.LedgerDB.Args @@ -55,6 +56,8 @@ import Ouroboros.Consensus.Storage.LedgerDB.TraceEvent import Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 import Ouroboros.Consensus.Storage.LedgerDB.V2.Forker import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as InMemory +import Ouroboros.Consensus.Storage.LedgerDB.V2.LSM (snapshotToStatePath) +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LSM as LSM import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq import Ouroboros.Consensus.Util (whenJust) import Ouroboros.Consensus.Util.Args @@ -88,8 +91,6 @@ mkInitDb args bss getBlock snapManager = loadSnapshot (configCodec . getExtLedgerCfg . ledgerDbCfg $ lgrConfig) lgrHasFS , abortLedgerDbInit = \ls -> do closeLedgerSeq ls - flip whenJust releaseLedgerDBResources $ case bss of - InMemoryHandleEnv -> Nothing , initReapplyBlock = \a b c -> do (x, y) <- reapplyThenPush lgrRegistry a b c x @@ -116,6 +117,7 @@ mkInitDb args bss getBlock snapManager = , ldbOpenHandlesLock = lock , ldbResourceKeys = case bss of InMemoryHandleEnv -> Nothing + LSMHandleEnv lsmRes -> Just $ LedgerDBResourceKeys (sessionKey lsmRes) (blockIOKey lsmRes) } h <- LDBHandle <$> newTVarIO (LedgerDBOpen env) pure $ implMkLedgerDb h snapManager @@ -140,6 +142,11 @@ mkInitDb args bss getBlock snapManager = emptyF st = empty' st $ case bss of InMemoryHandleEnv -> InMemory.newInMemoryLedgerTablesHandle v2Tracer lgrHasFS + LSMHandleEnv lsmRes -> + \values -> do + table <- + LSM.tableFromValuesMK v2Tracer lgrRegistry (sessionResource lsmRes) (forgetLedgerTables st) values + LSM.newLSMLedgerTablesHandle v2Tracer lgrRegistry table loadSnapshot :: CodecConfig blk -> @@ -148,6 +155,7 @@ mkInitDb args bss getBlock snapManager = m (Either (SnapshotFailure blk) (LedgerSeq' m blk, RealPoint blk)) loadSnapshot ccfg fs ds = case bss of InMemoryHandleEnv -> runExceptT $ InMemory.loadSnapshot v2Tracer lgrRegistry ccfg fs ds + LSMHandleEnv lsmRes -> runExceptT $ LSM.loadSnapshot v2Tracer lgrRegistry ccfg fs (sessionResource lsmRes) ds implMkLedgerDb :: forall m l blk. @@ -246,7 +254,7 @@ mkInternals h snapManager = implIntTruncateSnapshots :: MonadThrow m => SnapshotManager m m blk st -> SomeHasFS m -> m () implIntTruncateSnapshots snapManager (SomeHasFS fs) = do snapshotsMapM_ snapManager $ - \pre -> withFile fs (InMemory.snapshotToStatePath pre) (AppendMode AllowExisting) $ + \pre -> withFile fs (snapshotToStatePath pre) (AppendMode AllowExisting) $ \h -> hTruncate fs h 0 implGetVolatileTip :: diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Args.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Args.hs index 068d64d529..00a2f693bd 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Args.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Args.hs @@ -6,10 +6,17 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.Args , HandleArgs (..) , HandleEnv (..) , LedgerDbFlavorArgs (..) + , SomeHasFSAndBlockIO (..) , LSMHandleArgs (..) + , LSMResources (..) ) where -import Data.Void +import Control.ResourceRegistry +import Data.Typeable +import Database.LSMTree (LSMTreeTrace (..), Salt, Session) +import Ouroboros.Consensus.Util.Args +import System.FS.API +import System.FS.BlockIO.API data LedgerDbFlavorArgs f m = V2Args (HandleArgs f m) @@ -19,14 +26,29 @@ data HandleArgs f m = InMemoryHandleArgs | LSMHandleArgs (LSMHandleArgs f m) -data LSMHandleArgs f m = LSMArgs Void +data LSMHandleArgs f m = LSMArgs + { lsmFilePath :: HKD f FsPath + -- ^ The file path relative to the fast storage directory in which the LSM + -- trees database will be located. + , lsmSalt :: HKD f Salt + , lsmMkFS :: HKD f (ResourceRegistry m -> m (ResourceKey m, SomeHasFSAndBlockIO m)) + } + +data SomeHasFSAndBlockIO m where + SomeHasFSAndBlockIO :: (Eq h, Typeable h) => HasFS m h -> HasBlockIO m h -> SomeHasFSAndBlockIO m -- | The environment used to create new handles data HandleEnv m = InMemoryHandleEnv | -- | The environment for creating LSM handles. It carries the 'Session' -- together with its resource key and the resource key of the 'HasBlockIO'. - LSMHandleEnv !Void + LSMHandleEnv !(LSMResources m) + +data LSMResources m = LSMResources + { sessionKey :: !(ResourceKey m) + , sessionResource :: !(Session m) + , blockIOKey :: !(ResourceKey m) + } data FlavorImplSpecificTrace = -- | Created a new 'LedgerTablesHandle', potentially by duplicating an @@ -34,4 +56,5 @@ data FlavorImplSpecificTrace TraceLedgerTablesHandleCreate | -- | Closed a 'LedgerTablesHandle'. TraceLedgerTablesHandleClose - deriving (Show, Eq) + | LSMTrace LSMTreeTrace + deriving Show diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs index 53a7fb8142..4f0ee4b41c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs @@ -73,7 +73,7 @@ implForkerReadTables env ks = do traceWith (foeTracer env) ForkerReadTablesStart lseq <- readTVarIO (foeLedgerSeq env) let stateRef = currentHandle lseq - tbs <- read (tables stateRef) ks + tbs <- read (tables stateRef) (state stateRef) ks traceWith (foeTracer env) ForkerReadTablesEnd pure tbs @@ -89,10 +89,10 @@ implForkerRangeReadTables qbs env rq0 = do let n = fromIntegral $ defaultQueryBatchSize qbs stateRef = currentHandle ldb case rq0 of - NoPreviousQuery -> readRange (tables $ currentHandle ldb) (Nothing, n) + NoPreviousQuery -> readRange (tables stateRef) (state stateRef) (Nothing, n) PreviousQueryWasFinal -> pure (LedgerTables emptyMK, Nothing) PreviousQueryWasUpTo k -> do - tbs <- readRange (tables stateRef) (Just k, n) + tbs <- readRange (tables stateRef) (state stateRef) (Just k, n) traceWith (foeTracer env) ForkerRangeReadTablesEnd pure tbs diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs index 6e23ac6d23..7fe11e86a6 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs @@ -24,7 +24,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory -- * Snapshots , loadSnapshot , snapshotManager - , snapshotToStatePath -- * snapshot-converter , implTakeSnapshot @@ -110,12 +109,12 @@ newInMemoryLedgerTablesHandle tracer someFS@(SomeHasFS hasFS) l = do hs <- readTVarIO tv !x <- guardClosed hs $ newInMemoryLedgerTablesHandle tracer someFS pure x - , read = \keys -> do + , read = \_ keys -> do hs <- readTVarIO tv guardClosed hs (pure . flip (ltliftA2 (\(ValuesMK v) (KeysMK k) -> ValuesMK $ v `Map.restrictKeys` k)) keys) - , readRange = \(f, t) -> do + , readRange = \_ (f, t) -> do hs <- readTVarIO tv guardClosed hs @@ -123,7 +122,7 @@ newInMemoryLedgerTablesHandle tracer someFS@(SomeHasFS hasFS) l = do let m' = Map.take t . (maybe id (\g -> snd . Map.split g) f) $ m in pure (LedgerTables (ValuesMK m'), fst <$> Map.lookupMax m') ) - , readAll = do + , readAll = \_ -> do hs <- readTVarIO tv guardClosed hs pure , pushDiffs = \st0 !diffs -> diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs new file mode 100644 index 0000000000..da41ab1890 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs @@ -0,0 +1,493 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE UndecidableInstances #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +-- | Implementation of the 'LedgerTablesHandle' interface with LSM trees. +module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM + ( -- * LedgerTablesHandle + newLSMLedgerTablesHandle + , tableFromValuesMK + + -- * Snapshots + , loadSnapshot + , snapshotToStatePath + , snapshotManager + + -- * Re-exports + , LSM.Entry (..) + , LSM.RawBytes (..) + , LSM.Salt + , Session + , LSM.openSession + , LSM.closeSession + , stdMkBlockIOFS + + -- * snapshot-converter + , implTakeSnapshot + ) where + +import Cardano.Binary as CBOR +import Codec.Serialise (decode) +import qualified Control.Monad as Monad +import Control.Monad.Trans (lift) +import Control.Monad.Trans.Except +import Control.Monad.Trans.Fail +import Control.ResourceRegistry +import Control.Tracer +import Data.Bifunctor (first) +import qualified Data.Foldable as Foldable +import Data.Functor.Contravariant ((>$<)) +import qualified Data.List as List +import qualified Data.Map.Strict as Map +import Data.Maybe +import Data.MemPack +import Data.MemPack.Buffer +import Data.MemPack.Error +import qualified Data.Primitive.ByteArray as PBA +import qualified Data.Set as Set +import qualified Data.Text as Text +import qualified Data.Vector as V +import qualified Data.Vector.Mutable as VM +import qualified Data.Vector.Primitive as VP +import Data.Void +import Database.LSMTree (Session, Table) +import qualified Database.LSMTree as LSM +import GHC.Exts +import GHC.Stack +import NoThunks.Class +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Config +import Ouroboros.Consensus.Ledger.Abstract +import Ouroboros.Consensus.Ledger.Extended +import Ouroboros.Consensus.Ledger.SupportsProtocol +import qualified Ouroboros.Consensus.Ledger.Tables.Diff as Diff +import Ouroboros.Consensus.Ledger.Tables.Utils +import Ouroboros.Consensus.Storage.LedgerDB.API +import Ouroboros.Consensus.Storage.LedgerDB.Args +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots +import Ouroboros.Consensus.Storage.LedgerDB.TraceEvent +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 +import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq +import Ouroboros.Consensus.Util (chunks) +import Ouroboros.Consensus.Util.Args +import Ouroboros.Consensus.Util.CRC +import Ouroboros.Consensus.Util.Enclose +import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.IndexedMemPack +import System.FS.API +import qualified System.FS.BlockIO.API as BIO +import System.FS.BlockIO.IO +import Prelude hiding (read) + +-- | Type alias for convenience +type UTxOTable m = Table m TxInBytes TxOutBytes Void + +instance NoThunks (Table m txin txout Void) where + showTypeOf _ = "Table" + wNoThunks _ _ = pure Nothing + +data LSMClosedExn = LSMClosedExn + deriving (Show, Exception) + +instance Buffer (VP.Vector a) where + bufferByteCount (VP.Vector _ l _) = l + buffer (VP.Vector _ _ (PBA.ByteArray barr)) f _ = f barr + +{------------------------------------------------------------------------------- + TxOuts +-------------------------------------------------------------------------------} + +-- | Vendored version of unpack which starts from a given initial offset. Could +-- be upstreamed to @mempack@. +unpackWithOffset :: + forall a b. (MemPack a, Buffer b, HasCallStack) => b -> Int -> Either SomeError a +unpackWithOffset b off = first fromMultipleErrors . runFailAgg $ do + let len = bufferByteCount b + (a, consumedBytes) <- do + res@(_, consumedBytes) <- runStateT (runUnpack unpackM b) off + Monad.when (consumedBytes > len) $ + error $ + "Potential buffer overflow. Some bug in 'unpackM' was detected while unpacking " <> (typeName @a) + ++ ". Consumed " <> showBytes (consumedBytes - len) <> " more than allowed from a buffer of length " + ++ show len + pure res + Monad.when (consumedBytes /= len) $ + failT $ + toSomeError $ + NotFullyConsumedError + { notFullyConsumedRead = consumedBytes + , notFullyConsumedAvailable = len + , notFullyConsumedTypeName = (typeName @a) + } + pure a +{-# INLINEABLE unpackWithOffset #-} + +newtype TxOutBytes = TxOutBytes {unTxOutBytes :: LSM.RawBytes} + +toTxOutBytes :: IndexedMemPack (l EmptyMK) (TxOut l) => l EmptyMK -> TxOut l -> TxOutBytes +toTxOutBytes st txout = + let barr = indexedPackByteArray True st txout + in TxOutBytes $ LSM.RawBytes (VP.Vector 0 (PBA.sizeofByteArray barr) barr) + +fromTxOutBytes :: IndexedMemPack (l EmptyMK) (TxOut l) => l EmptyMK -> TxOutBytes -> TxOut l +fromTxOutBytes st (TxOutBytes (LSM.RawBytes vec@(VP.Vector off _ _))) = + case indexedUnpackWithOffset st vec off of + Left err -> + error $ + unlines + [ "There was an error deserializing a TxOut from the LSM backend." + , "This will likely result in a restart-crash loop." + , "The error: " <> show err + ] + Right v -> v + +instance LSM.SerialiseValue TxOutBytes where + serialiseValue = unTxOutBytes + deserialiseValue = TxOutBytes + +deriving via LSM.ResolveAsFirst TxOutBytes instance LSM.ResolveValue TxOutBytes + +{------------------------------------------------------------------------------- + TxIns +-------------------------------------------------------------------------------} + +newtype TxInBytes = TxInBytes {unTxInBytes :: LSM.RawBytes} + +toTxInBytes :: MemPack (TxIn l) => Proxy l -> TxIn l -> TxInBytes +toTxInBytes _ txin = + let barr = packByteArray True txin + in TxInBytes $ LSM.RawBytes (VP.Vector 0 (PBA.sizeofByteArray barr) barr) + +fromTxInBytes :: MemPack (TxIn l) => Proxy l -> TxInBytes -> TxIn l +fromTxInBytes _ (TxInBytes (LSM.RawBytes vec@(VP.Vector off _ _))) = + case unpackWithOffset vec off of + Left err -> + error $ + unlines + [ "There was an error deserializing a TxIn from the LSM backend." + , "This will likely result in a restart-crash loop." + , "The error: " <> show err + ] + Right v -> v + +instance LSM.SerialiseKey TxInBytes where + serialiseKey = unTxInBytes + deserialiseKey = TxInBytes + +{------------------------------------------------------------------------------- + Implementation +-------------------------------------------------------------------------------} + +-- | Create the initial LSM table from values, which should happen only at the +-- Genesis. +tableFromValuesMK :: + forall m l. + (IOLike m, IndexedMemPack (l EmptyMK) (TxOut l), MemPack (TxIn l)) => + Tracer m V2.FlavorImplSpecificTrace -> + ResourceRegistry m -> + Session m -> + l EmptyMK -> + LedgerTables l ValuesMK -> + m (ResourceKey m, UTxOTable m) +tableFromValuesMK tracer rr session st (LedgerTables (ValuesMK values)) = do + res@(_, table) <- + allocate + rr + (\_ -> LSM.newTable session) + ( \tb -> do + traceWith tracer V2.TraceLedgerTablesHandleClose + LSM.closeTable tb + ) + mapM_ (go table) $ chunks 1000 $ Map.toList values + pure res + where + go table items = + LSM.inserts table $ + V.fromListN (length items) $ + map (\(k, v) -> (toTxInBytes (Proxy @l) k, toTxOutBytes st v, Nothing)) items + +snapshotManager :: + ( IOLike m + , LedgerDbSerialiseConstraints blk + , LedgerSupportsProtocol blk + ) => + Session m -> + Complete LedgerDbArgs m blk -> + SnapshotManager m m blk (StateRef m (ExtLedgerState blk)) +snapshotManager session args = + snapshotManager' + session + (configCodec . getExtLedgerCfg . ledgerDbCfg $ lgrConfig args) + (LedgerDBSnapshotEvent >$< lgrTracer args) + (lgrHasFS args) + +snapshotManager' :: + ( IOLike m + , LedgerDbSerialiseConstraints blk + , LedgerSupportsProtocol blk + ) => + Session m -> + CodecConfig blk -> + Tracer m (TraceSnapshotEvent blk) -> + SomeHasFS m -> + SnapshotManager m m blk (StateRef m (ExtLedgerState blk)) +snapshotManager' session ccfg tracer fs = + SnapshotManager + { listSnapshots = defaultListSnapshots fs + , deleteSnapshot = implDeleteSnapshot session fs tracer + , takeSnapshot = implTakeSnapshot ccfg tracer fs + } + +newLSMLedgerTablesHandle :: + forall m l. + ( IOLike m + , HasLedgerTables l + , IndexedMemPack (l EmptyMK) (TxOut l) + ) => + Tracer m V2.FlavorImplSpecificTrace -> + ResourceRegistry m -> + (ResourceKey m, UTxOTable m) -> + m (LedgerTablesHandle m l) +newLSMLedgerTablesHandle tracer rr (resKey, t) = do + traceWith tracer V2.TraceLedgerTablesHandleCreate + pure + LedgerTablesHandle + { close = do + Monad.void $ release resKey + , duplicate = do + table <- + allocate + rr + (\_ -> LSM.duplicate t) + ( \t' -> do + traceWith tracer V2.TraceLedgerTablesHandleClose + LSM.closeTable t' + ) + newLSMLedgerTablesHandle tracer rr table + , read = \st (LedgerTables (KeysMK keys)) -> do + let vec' = V.create $ do + vec <- VM.new (Set.size keys) + Monad.foldM_ + (\i x -> VM.write vec i (toTxInBytes (Proxy @l) x) >> pure (i + 1)) + 0 + keys + pure vec + res <- LSM.lookups t vec' + pure + . LedgerTables + . ValuesMK + . Foldable.foldl' + ( \m (k, item) -> + case item of + LSM.Found v -> Map.insert (fromTxInBytes (Proxy @l) k) (fromTxOutBytes st v) m + LSM.NotFound -> m + LSM.FoundWithBlob{} -> m + ) + Map.empty + $ V.zip vec' res + , readRange = implReadRange t + , readAll = \st -> + let readAll' m = do + (v, n) <- implReadRange t st (m, 100000) + maybe (pure v) (fmap (ltliftA2 unionValues v) . readAll' . Just) n + in readAll' Nothing + , pushDiffs = const (implPushDiffs t) + , takeHandleSnapshot = \_ snapshotName -> do + LSM.saveSnapshot + (fromString snapshotName) + (LSM.SnapshotLabel $ Text.pack $ "UTxO table") + t + pure Nothing + , tablesSize = pure Nothing + } + +implReadRange :: + forall m l. + (IOLike m, IndexedMemPack (l EmptyMK) (TxOut l)) => + HasLedgerTables l => + UTxOTable m -> + l EmptyMK -> + (Maybe (TxIn l), Int) -> + m (LedgerTables l ValuesMK, Maybe (TxIn l)) +implReadRange table st (mPrev, num) = do + entries <- maybe cursorFromStart cursorFromKey mPrev + pure + ( LedgerTables + . ValuesMK + . V.foldl' + ( \m -> \case + LSM.Entry k v -> Map.insert (fromTxInBytes (Proxy @l) k) (fromTxOutBytes st v) m + LSM.EntryWithBlob{} -> m + ) + Map.empty + $ entries + , case snd <$> V.unsnoc entries of + Nothing -> Nothing + Just (LSM.Entry k _) -> Just (fromTxInBytes (Proxy @l) k) + Just (LSM.EntryWithBlob k _ _) -> Just (fromTxInBytes (Proxy @l) k) + ) + where + cursorFromStart = LSM.withCursor table (LSM.take num) + -- Here we ask for one value more and we drop one value because the + -- cursor returns also the key at which it was opened. + cursorFromKey k = fmap (V.drop 1) $ LSM.withCursorAtOffset table (toTxInBytes (Proxy @l) k) (LSM.take $ num + 1) + +implPushDiffs :: + forall m l. + ( IOLike m + , HasLedgerTables l + , IndexedMemPack (l EmptyMK) (TxOut l) + ) => + UTxOTable m -> l DiffMK -> m () +implPushDiffs t !st1 = do + let LedgerTables (DiffMK (Diff.Diff diffs)) = projectLedgerTables st1 + let vec = V.create $ do + vec' <- VM.new (Map.size diffs) + Monad.foldM_ + (\idx (k, item) -> VM.write vec' idx (toTxInBytes (Proxy @l) k, (f item)) >> pure (idx + 1)) + 0 + $ Map.toList diffs + pure vec' + LSM.updates t vec + where + f (Diff.Insert v) = LSM.Insert (toTxOutBytes (forgetLedgerTables st1) v) Nothing + f Diff.Delete = LSM.Delete + +-- | The path within the LedgerDB's filesystem to the file that contains the +-- snapshot's serialized ledger state +snapshotToStatePath :: DiskSnapshot -> FsPath +snapshotToStatePath = mkFsPath . (\x -> [x, "state"]) . snapshotToDirName + +implTakeSnapshot :: + ( IOLike m + , LedgerDbSerialiseConstraints blk + , LedgerSupportsProtocol blk + ) => + CodecConfig blk -> + Tracer m (TraceSnapshotEvent blk) -> + SomeHasFS m -> + Maybe String -> + StateRef m (ExtLedgerState blk) -> + m (Maybe (DiskSnapshot, RealPoint blk)) +implTakeSnapshot ccfg tracer hasFS suffix st = case pointToWithOriginRealPoint (castPoint (getTip $ state st)) of + Origin -> return Nothing + NotOrigin t -> do + let number = unSlotNo (realPointSlot t) + snapshot = DiskSnapshot number suffix + diskSnapshots <- defaultListSnapshots hasFS + if List.any (== DiskSnapshot number suffix) diskSnapshots + then + return Nothing + else do + encloseTimedWith (TookSnapshot snapshot t >$< tracer) $ + writeSnapshot hasFS (encodeDiskExtLedgerState ccfg) snapshot st + return $ Just (snapshot, t) + +writeSnapshot :: + MonadThrow m => + SomeHasFS m -> + (ExtLedgerState blk EmptyMK -> Encoding) -> + DiskSnapshot -> + StateRef m (ExtLedgerState blk) -> + m () +writeSnapshot fs@(SomeHasFS hasFs) encLedger ds st = do + createDirectoryIfMissing hasFs True $ snapshotToDirPath ds + crc1 <- writeExtLedgerState fs encLedger (snapshotToStatePath ds) $ state st + crc2 <- takeHandleSnapshot (tables st) (state st) $ snapshotToDirName ds + writeSnapshotMetadata fs ds $ + SnapshotMetadata + { snapshotBackend = UTxOHDLSMSnapshot + , snapshotChecksum = maybe crc1 (crcOfConcat crc1) crc2 + } + +-- | Delete snapshot from disk and also from the LSM tree database. +implDeleteSnapshot :: + IOLike m => Session m -> SomeHasFS m -> Tracer m (TraceSnapshotEvent blk) -> DiskSnapshot -> m () +implDeleteSnapshot session (SomeHasFS HasFS{doesDirectoryExist, removeDirectoryRecursive}) tracer ss = do + deleteState `finally` deleteLsmTable + traceWith tracer (DeletedSnapshot ss) + where + deleteState = do + let p = snapshotToDirPath ss + exists <- doesDirectoryExist p + Monad.when exists (removeDirectoryRecursive p) + + deleteLsmTable = + LSM.deleteSnapshot + session + (fromString $ show (dsNumber ss) <> maybe "" ("_" <>) (dsSuffix ss)) + +-- | Read snapshot from disk. +-- +-- Fail on data corruption, i.e. when the checksum of the read data differs +-- from the one tracked by @'DiskSnapshot'@. +loadSnapshot :: + forall blk m. + ( LedgerDbSerialiseConstraints blk + , LedgerSupportsProtocol blk + , IOLike m + ) => + Tracer m V2.FlavorImplSpecificTrace -> + ResourceRegistry m -> + CodecConfig blk -> + SomeHasFS m -> + Session m -> + DiskSnapshot -> + ExceptT (SnapshotFailure blk) m (LedgerSeq' m blk, RealPoint blk) +loadSnapshot tracer rr ccfg fs session ds = do + snapshotMeta <- + withExceptT (InitFailureRead . ReadMetadataError (snapshotToMetadataPath ds)) $ + loadSnapshotMetadata fs ds + Monad.when (snapshotBackend snapshotMeta /= UTxOHDLSMSnapshot) $ + throwE $ + InitFailureRead $ + ReadMetadataError (snapshotToMetadataPath ds) MetadataBackendMismatch + (extLedgerSt, checksumAsRead) <- + withExceptT + (InitFailureRead . ReadSnapshotFailed) + $ readExtLedgerState fs (decodeDiskExtLedgerState ccfg) decode (snapshotToStatePath ds) + case pointToWithOriginRealPoint (castPoint (getTip extLedgerSt)) of + Origin -> throwE InitFailureGenesis + NotOrigin pt -> do + values <- + lift $ + allocate + rr + ( \_ -> + LSM.openTableFromSnapshot + session + (fromString $ snapshotToDirName ds) + (LSM.SnapshotLabel $ Text.pack $ "UTxO table") + ) + ( \t -> do + traceWith tracer V2.TraceLedgerTablesHandleClose + LSM.closeTable t + ) + Monad.when (checksumAsRead /= snapshotChecksum snapshotMeta) $ + throwE $ + InitFailureRead ReadSnapshotDataCorruption + (,pt) <$> lift (empty extLedgerSt values (newLSMLedgerTablesHandle tracer rr)) + +stdMkBlockIOFS :: + FilePath -> ResourceRegistry IO -> IO (ResourceKey IO, V2.SomeHasFSAndBlockIO IO) +stdMkBlockIOFS fastStoragePath rr = do + (rk1, bio) <- + allocate + rr + (\_ -> ioHasBlockIO (MountPoint fastStoragePath) defaultIOCtxParams) + (BIO.close . snd) + pure (rk1, uncurry V2.SomeHasFSAndBlockIO bio) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs index a92244b64e..141a610b4e 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs @@ -80,22 +80,49 @@ import Prelude hiding (read) LedgerTablesHandles -------------------------------------------------------------------------------} +-- | The interface fulfilled by handles on both the InMemory and LSM handles. data LedgerTablesHandle m l = LedgerTablesHandle { close :: !(m ()) , duplicate :: !(m (LedgerTablesHandle m l)) - -- ^ It is expected that this operation takes constant time. - , read :: !(LedgerTables l KeysMK -> m (LedgerTables l ValuesMK)) - , readRange :: !((Maybe (TxIn l), Int) -> m (LedgerTables l ValuesMK, Maybe (TxIn l))) - , readAll :: !(m (LedgerTables l ValuesMK)) + -- ^ Create a copy of the handle. + -- + -- When applying diffs to a table, we will first duplicate the handle, then + -- apply the diffs in the copy. It is expected that duplicating the handle + -- takes constant time. + , read :: !(l EmptyMK -> LedgerTables l KeysMK -> m (LedgerTables l ValuesMK)) + -- ^ Read values for the given keys from the tables, and deserialize them as + -- if they were from the same era as the given ledger state. + , readRange :: !(l EmptyMK -> (Maybe (TxIn l), Int) -> m (LedgerTables l ValuesMK, Maybe (TxIn l))) + -- ^ Read the requested number of values, possibly starting from the given + -- key, from the tables, and deserialize them as if they were from the same + -- era as the given ledger state. + -- + -- The returned value contains both the read values as well as the last key + -- retrieved. This is necessary because the LSM backend uses an alternative + -- serialization format and the last key in the returned Map might not be the + -- last key read. + -- + -- The last key retrieved is part of the map too. It is intended to be fed + -- back into the next iteration of the range read. If the function returns + -- Nothing, it means the read returned no results, or in other words, we + -- reached the end of the ledger tables. + , readAll :: !(l EmptyMK -> m (LedgerTables l ValuesMK)) -- ^ Costly read all operation, not to be used in Consensus but only in - -- snapshot-converter executable. + -- snapshot-converter executable. The values will be read as if they were from + -- the same era as the given ledger state. , pushDiffs :: !(forall mk. l mk -> l DiffMK -> m ()) -- ^ Push some diffs into the ledger tables handle. -- -- The first argument has to be the ledger state before applying -- the block, the second argument should be the ledger state after -- applying a block. See 'CanUpgradeLedgerTables'. + -- + -- Note 'CanUpgradeLedgerTables' is only used in the InMemory backend. , takeHandleSnapshot :: !(l EmptyMK -> String -> m (Maybe CRC)) + -- ^ Take a snapshot of a handle. The given ledger state is used to decide the + -- encoding of the values based on the current era. + -- + -- It returns a CRC only on backends that support it, as the InMemory backend. , tablesSize :: !(m (Maybe Int)) -- ^ Consult the size of the ledger tables in the database. This will return -- 'Nothing' in backends that do not support this operation. @@ -167,8 +194,8 @@ empty :: , IOLike m ) => l EmptyMK -> - LedgerTables l ValuesMK -> - (LedgerTables l ValuesMK -> m (LedgerTablesHandle m l)) -> + init -> + (init -> m (LedgerTablesHandle m l)) -> m (LedgerSeq m l) empty st tbs new = LedgerSeq . AS.Empty . StateRef st <$> new tbs @@ -221,7 +248,7 @@ reapplyBlock evs cfg b _rr db = do let ks = getBlockKeySets b StateRef st tbs = currentHandle db newtbs <- duplicate tbs - vals <- read newtbs ks + vals <- read newtbs st ks let st' = tickThenReapply evs cfg b (st `withLedgerTables` vals) newst = forgetLedgerTables st' @@ -527,7 +554,7 @@ volatileStatesBimap f g = -- >>> instance LedgerTablesAreTrivial LS where convertMapKind (LS p) = LS p -- >>> s = [LS (Point Origin), LS (Point (At (Block 0 0))), LS (Point (At (Block 1 1))), LS (Point (At (Block 2 2))), LS (Point (At (Block 3 3)))] -- >>> [l0s, l1s, l2s, l3s, l4s] = s --- >>> emptyHandle = LedgerTablesHandle (pure ()) (pure emptyHandle) (\_ -> undefined) (\_ -> undefined) (pure trivialLedgerTables) (\_ _ _ -> undefined) (\_ -> undefined) (pure Nothing) +-- >>> emptyHandle = LedgerTablesHandle (pure ()) (pure emptyHandle) (\_ -> undefined) (\_ -> undefined) (\_ -> pure trivialLedgerTables) (\_ _ _ -> undefined) (\_ -> undefined) (pure Nothing) -- >>> [l0, l1, l2, l3, l4] = map (flip StateRef emptyHandle) s -- >>> instance GetTip LS where getTip (LS p) = p -- >>> instance Eq (LS EmptyMK) where LS p1 == LS p2 = p1 == p2 diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/IndexedMemPack.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/IndexedMemPack.hs index 22939e57c7..402ccf2aa8 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/IndexedMemPack.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/IndexedMemPack.hs @@ -14,12 +14,16 @@ module Ouroboros.Consensus.Util.IndexedMemPack ( IndexedMemPack (..) , MemPack (..) , indexedPackByteString + , indexedPackByteArray , indexedUnpackError + , indexedUnpack + , indexedUnpackWithOffset ) where import qualified Control.Monad as Monad -import Control.Monad.Trans.Fail (Fail, errorFail, failT) +import Control.Monad.Trans.Fail (Fail, errorFail, failT, runFailAgg) import Data.Array.Byte (ByteArray (..)) +import Data.Bifunctor (first) import Data.ByteString import Data.MemPack import Data.MemPack.Buffer @@ -53,6 +57,12 @@ indexedPackByteArray isPinned idx a = (indexedPackM idx a) {-# INLINE indexedPackByteArray #-} +indexedUnpack :: + forall idx a b. + (Buffer b, IndexedMemPack idx a, HasCallStack) => idx -> b -> Either SomeError a +indexedUnpack idx = first fromMultipleErrors . runFailAgg . indexedUnpackFail idx +{-# INLINEABLE indexedUnpack #-} + indexedUnpackError :: forall idx a b. (Buffer b, IndexedMemPack idx a, HasCallStack) => idx -> b -> a indexedUnpackError idx = errorFail . indexedUnpackFail idx @@ -96,3 +106,23 @@ unpackFailNotFullyConsumed name consumedBytes len = , notFullyConsumedTypeName = name } {-# NOINLINE unpackFailNotFullyConsumed #-} + +indexedUnpackWithOffset :: + forall a b idx. + (IndexedMemPack idx a, Buffer b, HasCallStack) => idx -> b -> Int -> Either SomeError a +indexedUnpackWithOffset idx b off = first fromMultipleErrors . runFailAgg $ do + let len = bufferByteCount b + (a, consumedBytes) <- do + res@(_, consumedBytes) <- runStateT (runUnpack (indexedUnpackM idx) b) off + Monad.when (consumedBytes > len) $ errorLeftOver (indexedTypeName @idx @a idx) consumedBytes len + pure res + Monad.when (consumedBytes /= len) $ + failT $ + toSomeError $ + NotFullyConsumedError + { notFullyConsumedRead = consumedBytes + , notFullyConsumedAvailable = len + , notFullyConsumedTypeName = indexedTypeName @idx @a idx + } + pure a +{-# INLINEABLE indexedUnpackWithOffset #-} diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs index 786951c7ea..7eb87aecc9 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs @@ -12,7 +12,6 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE PackageImports #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -44,11 +43,11 @@ import Control.Monad.Except import Control.Monad.State hiding (state) import Control.ResourceRegistry import Control.Tracer (Tracer (..)) +import Data.Functor.Contravariant ((>$<)) import qualified Data.List as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import qualified Data.SOP.Dict as Dict -import Data.Void import Data.Word import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config @@ -64,13 +63,14 @@ import Ouroboros.Consensus.Storage.LedgerDB.V1 as V1 import Ouroboros.Consensus.Storage.LedgerDB.V1.Args hiding ( LedgerDbFlavorArgs ) -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as V1 +import Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as V1 import Ouroboros.Consensus.Storage.LedgerDB.V2 as V2 import Ouroboros.Consensus.Storage.LedgerDB.V2.Args hiding ( LedgerDbFlavorArgs ) import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2 import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as InMemory +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LSM as LSM import Ouroboros.Consensus.Util hiding (Some) import Ouroboros.Consensus.Util.Args import Ouroboros.Consensus.Util.IOLike @@ -95,7 +95,6 @@ import Test.Util.TestBlock hiding , TestBlockCodecConfig , TestBlockStorageConfig ) -import "quickcheck-dynamic" Test.QuickCheck.Extras tests :: TestTree tests = @@ -106,23 +105,45 @@ tests = , testProperty "InMemV2" $ prop_sequential 100000 inMemV2TestArguments noFilePath simulatedFS , testProperty "LMDB" $ - prop_sequential 1000 lmdbTestArguments realFilePath realFS + prop_sequential 1000 lmdbTestArguments (realFilePath "lmdb") realFS + , testProperty "LSM" $ + prop_sequential 1000 lsmTestArguments (realFilePath "lsm") realFS ] prop_sequential :: Int -> - (SecurityParam -> FilePath -> TestArguments IO) -> + (SecurityParam -> LSM.Salt -> FilePath -> TestArguments IO) -> IO (FilePath, IO ()) -> IO (SomeHasFS IO, IO ()) -> Actions Model -> QC.Property -prop_sequential maxSuccess mkTestArguments getLmdbDir fsOps as = QC.withMaxSuccess maxSuccess $ - QC.monadicIO $ do - ref <- lift $ initialEnvironment fsOps getLmdbDir mkTestArguments =<< initChainDB - (_, env@(Environment _ testInternals _ _ _ _ clean)) <- runPropertyStateT (runActions as) ref - checkNoLeakedHandles env - QC.run $ closeLedgerDB testInternals >> clean - QC.assert True +prop_sequential maxSuccess mkTestArguments getDiskDir fsOps actions = + QC.withMaxSuccess maxSuccess $ + QC.monadic runner $ + Monad.void $ + runActions $ + actions + where + setup :: IO Environment + setup = do + cdb <- initChainDB + rr <- unsafeNewRegistry + initialEnvironment fsOps getDiskDir mkTestArguments cdb rr + + cleanup :: Environment -> IO () + cleanup (Environment _ testInternals _ _ _ _ clean registry) = do + closeRegistry registry + closeLedgerDB testInternals + clean + + runner :: StateT Environment IO Property -> Property + runner mprop = + ioProperty $ + bracket setup cleanup $ + \env0 -> do + (prop1, env1) <- runStateT mprop env0 + p2 <- checkNoLeakedHandles env1 + pure $ prop1 .&&. p2 -- | The initial environment is mostly undefined because it will be initialized -- by the @Init@ command. We are forced to provide this dummy implementation @@ -132,21 +153,23 @@ prop_sequential maxSuccess mkTestArguments getLmdbDir fsOps as = QC.withMaxSucce initialEnvironment :: IO (SomeHasFS IO, IO ()) -> IO (FilePath, IO ()) -> - (SecurityParam -> FilePath -> TestArguments IO) -> + (SecurityParam -> LSM.Salt -> FilePath -> TestArguments IO) -> ChainDB IO -> + ResourceRegistry IO -> IO Environment -initialEnvironment fsOps getLmdbDir mkTestArguments cdb = do +initialEnvironment fsOps getDiskDir mkTestArguments cdb rr = do (sfs, cleanupFS) <- fsOps - (lmdbDir, cleanupLMDB) <- getLmdbDir + (diskDir, cleanupDisk) <- getDiskDir pure $ Environment undefined (TestInternals undefined undefined undefined undefined undefined (pure ()) (pure 0)) cdb - (flip mkTestArguments lmdbDir) + (\sp st -> mkTestArguments sp st diskDir) sfs (pure $ NumOpenHandles 0) - (cleanupFS >> cleanupLMDB) + (cleanupFS >> cleanupDisk) + rr {------------------------------------------------------------------------------- Arguments @@ -160,9 +183,10 @@ data TestArguments m = TestArguments noFilePath :: IO (FilePath, IO ()) noFilePath = pure ("Bogus", pure ()) -realFilePath :: IO (FilePath, IO ()) -realFilePath = liftIO $ do - tmpdir <- (FilePath. "test_lmdb") <$> Dir.getTemporaryDirectory +realFilePath :: String -> IO (FilePath, IO ()) +realFilePath l = liftIO $ do + tmpdir <- (FilePath. ("test_" <> l)) <$> Dir.getTemporaryDirectory + Dir.createDirectoryIfMissing False tmpdir pure ( tmpdir , do @@ -183,9 +207,10 @@ realFS = liftIO $ do inMemV1TestArguments :: SecurityParam -> + LSM.Salt -> FilePath -> TestArguments IO -inMemV1TestArguments secParam _ = +inMemV1TestArguments secParam _ _ = TestArguments { argFlavorArgs = LedgerDbFlavorArgsV1 $ V1Args DisableFlushing InMemoryBackingStoreArgs , argLedgerDbCfg = extLedgerDbConfig secParam @@ -193,19 +218,36 @@ inMemV1TestArguments secParam _ = inMemV2TestArguments :: SecurityParam -> + LSM.Salt -> FilePath -> TestArguments IO -inMemV2TestArguments secParam _ = +inMemV2TestArguments secParam _ _ = TestArguments { argFlavorArgs = LedgerDbFlavorArgsV2 $ V2Args InMemoryHandleArgs , argLedgerDbCfg = extLedgerDbConfig secParam } +lsmTestArguments :: + SecurityParam -> + LSM.Salt -> + FilePath -> + TestArguments IO +lsmTestArguments secParam salt fp = + TestArguments + { argFlavorArgs = + LedgerDbFlavorArgsV2 $ + V2Args $ + LSMHandleArgs $ + LSMArgs (mkFsPath $ FilePath.splitDirectories fp) salt (LSM.stdMkBlockIOFS fp) + , argLedgerDbCfg = extLedgerDbConfig secParam + } + lmdbTestArguments :: SecurityParam -> + LSM.Salt -> FilePath -> TestArguments IO -lmdbTestArguments secParam fp = +lmdbTestArguments secParam _ fp = TestArguments { argFlavorArgs = LedgerDbFlavorArgsV1 $ @@ -278,11 +320,11 @@ instance StateModel Model where data Action Model a where WipeLedgerDB :: Action Model () TruncateSnapshots :: Action Model () - DropAndRestore :: Word64 -> Action Model () + DropAndRestore :: Word64 -> LSM.Salt -> Action Model () ForceTakeSnapshot :: Action Model () GetState :: Action Model (ExtLedgerState TestBlock EmptyMK, ExtLedgerState TestBlock EmptyMK) - Init :: SecurityParam -> Action Model () + Init :: SecurityParam -> LSM.Salt -> Action Model () ValidateAndCommit :: Word64 -> [TestBlock] -> Action Model () -- \| This action is used only to observe the side effects of closing an -- uncommitted forker, to ensure all handles are properly deallocated. @@ -297,12 +339,12 @@ instance StateModel Model where actionName ValidateAndCommit{} = "ValidateAndCommit" actionName OpenAndCloseForker = "OpenAndCloseForker" - arbitraryAction _ UnInit = Some . Init <$> QC.arbitrary + arbitraryAction _ UnInit = Some <$> (Init <$> QC.arbitrary <*> QC.arbitrary) arbitraryAction _ model@(Model chain secParam) = frequency $ [ (2, pure $ Some GetState) , (2, pure $ Some ForceTakeSnapshot) - , (1, Some . DropAndRestore <$> QC.choose (0, fromIntegral $ AS.length chain)) + , (1, Some <$> (DropAndRestore <$> QC.choose (0, fromIntegral $ AS.length chain) <*> QC.arbitrary)) , ( 4 , Some <$> do @@ -335,7 +377,7 @@ instance StateModel Model where initialState = UnInit - nextState _ (Init secParam) _var = Model (AS.Empty genesis) secParam + nextState _ (Init secParam _) _var = Model (AS.Empty genesis) secParam nextState state GetState _var = state nextState state ForceTakeSnapshot _var = state nextState state@(Model _ secParam) (ValidateAndCommit n blks) _var = @@ -371,7 +413,7 @@ instance StateModel Model where mapM_ push blks nextState state WipeLedgerDB _var = state nextState state TruncateSnapshots _var = state - nextState state (DropAndRestore n) _var = modelRollback n state + nextState state (DropAndRestore n _) _var = modelRollback n state nextState state OpenAndCloseForker _var = state nextState UnInit _ _ = error "Uninitialized model created a command different than Init" @@ -489,12 +531,12 @@ openLedgerDB :: ChainDB IO -> LedgerDbCfg (ExtLedgerState TestBlock) -> SomeHasFS IO -> + ResourceRegistry IO -> IO (LedgerDB' IO TestBlock, TestInternals' IO TestBlock, IO NumOpenHandles) -openLedgerDB flavArgs env cfg fs = do +openLedgerDB flavArgs env cfg fs rr = do (stream, volBlocks) <- dbStreamAPI (ledgerDbCfgSecParam cfg) env let getBlock f = Map.findWithDefault (error blockNotFound) f <$> readTVarIO (dbBlocks env) replayGoal <- fmap (realPointToPoint . last . Map.keys) . atomically $ readTVar (dbBlocks env) - rr <- unsafeNewRegistry (tracer, getNumOpenHandles) <- mkTrackOpenHandles let args = LedgerDbArgs @@ -520,7 +562,24 @@ openLedgerDB flavArgs env cfg fs = do LedgerDbFlavorArgsV2 bss -> do (snapManager, bss') <- case bss of V2.V2Args V2.InMemoryHandleArgs -> pure (InMemory.snapshotManager args, V2.InMemoryHandleEnv) - V2.V2Args (V2.LSMHandleArgs (V2.LSMArgs x)) -> absurd x + V2.V2Args (V2.LSMHandleArgs (V2.LSMArgs path salt mkFS)) -> do + (rk1, V2.SomeHasFSAndBlockIO fs' blockio) <- mkFS (lgrRegistry args) + session <- + allocate + (lgrRegistry args) + ( \_ -> + LSM.openSession + (LedgerDBFlavorImplEvent . FlavorImplSpecificTraceV2 . V2.LSMTrace >$< lgrTracer args) + fs' + blockio + salt + path + ) + LSM.closeSession + pure + ( LSM.snapshotManager (snd session) args + , V2.LSMHandleEnv (V2.LSMResources (fst session) (snd session) rk1) + ) let initDb = V2.mkInitDb args @@ -547,36 +606,37 @@ data Environment (LedgerDB' IO TestBlock) (TestInternals' IO TestBlock) (ChainDB IO) - (SecurityParam -> TestArguments IO) + (SecurityParam -> LSM.Salt -> TestArguments IO) (SomeHasFS IO) - (IO NumOpenHandles) - (IO ()) + !(IO NumOpenHandles) + !(IO ()) + !(ResourceRegistry IO) data LedgerDBError = ErrorValidateExceededRollback instance RunModel Model (StateT Environment IO) where type Error Model (StateT Environment IO) = LedgerDBError - perform _ (Init secParam) _ = do - Environment _ _ chainDb mkArgs fs _ cleanup <- get + perform _ (Init secParam salt) _ = do + Environment _ _ chainDb mkArgs fs _ cleanup rr <- get (ldb, testInternals, getNumOpenHandles) <- lift $ do - let args = mkArgs secParam - openLedgerDB (argFlavorArgs args) chainDb (argLedgerDbCfg args) fs - put (Environment ldb testInternals chainDb mkArgs fs getNumOpenHandles cleanup) + let args = mkArgs secParam salt + openLedgerDB (argFlavorArgs args) chainDb (argLedgerDbCfg args) fs rr + put (Environment ldb testInternals chainDb mkArgs fs getNumOpenHandles cleanup rr) pure $ pure () perform _ WipeLedgerDB _ = do - Environment _ testInternals _ _ _ _ _ <- get + Environment _ testInternals _ _ _ _ _ _ <- get lift $ wipeLedgerDB testInternals pure $ pure () perform _ GetState _ = do - Environment ldb _ _ _ _ _ _ <- get + Environment ldb _ _ _ _ _ _ _ <- get lift $ fmap pure $ atomically $ (,) <$> getImmutableTip ldb <*> getVolatileTip ldb perform _ ForceTakeSnapshot _ = do - Environment _ testInternals _ _ _ _ _ <- get + Environment _ testInternals _ _ _ _ _ _ <- get lift $ takeSnapshotNOW testInternals TakeAtImmutableTip Nothing pure $ pure () perform _ (ValidateAndCommit n blks) _ = do - Environment ldb _ chainDb _ _ _ _ <- get + Environment ldb _ chainDb _ _ _ _ _ <- get lift $ do atomically $ modifyTVar (dbBlocks chainDb) $ @@ -592,15 +652,15 @@ instance RunModel Model (StateT Environment IO) where forkerClose forker pure $ pure () ValidateExceededRollBack{} -> pure $ Left ErrorValidateExceededRollback - ValidateLedgerError (AnnLedgerError forker _ _) -> forkerClose forker >> error "Unexpected ledger error" - perform state@(Model _ secParam) (DropAndRestore n) lk = do - Environment _ testInternals chainDb _ _ _ _ <- get + ValidateLedgerError (AnnLedgerError forker _ err) -> forkerClose forker >> error ("Unexpected ledger error" <> show err) + perform state@(Model _ secParam) (DropAndRestore n salt) lk = do + Environment _ testInternals chainDb _ _ _ _ _ <- get lift $ do atomically $ modifyTVar (dbChain chainDb) (drop (fromIntegral n)) closeLedgerDB testInternals - perform state (Init secParam) lk + perform state (Init secParam salt) lk perform _ OpenAndCloseForker _ = do - Environment ldb _ _ _ _ _ _ <- get + Environment ldb _ _ _ _ _ _ _ <- get lift $ withRegistry $ \rr -> do eFrk <- LedgerDB.getForkerAtTarget ldb rr VolatileTip case eFrk of @@ -608,7 +668,7 @@ instance RunModel Model (StateT Environment IO) where Right frk -> forkerClose frk pure $ pure () perform _ TruncateSnapshots _ = do - Environment _ testInternals _ _ _ _ _ <- get + Environment _ testInternals _ _ _ _ _ _ <- get lift $ truncateSnapshots testInternals pure $ pure () perform UnInit _ _ = error "Uninitialized model created a command different than Init" @@ -664,13 +724,16 @@ mkTrackOpenHandles = do atomically $ modifyTVar varOpen $ case ev of V2.TraceLedgerTablesHandleCreate -> succ V2.TraceLedgerTablesHandleClose -> pred + _ -> id _ -> pure () pure (tracer, readTVarIO varOpen) -- | Check that we didn't leak any 'LedgerTablesHandle's (with V2 only). -checkNoLeakedHandles :: Environment -> QC.PropertyM IO () -checkNoLeakedHandles (Environment _ testInternals _ _ _ getNumOpenHandles _) = do - expected <- liftIO $ NumOpenHandles <$> LedgerDB.getNumLedgerTablesHandles testInternals - actual <- liftIO getNumOpenHandles - QC.assertWith (actual == expected) $ - "leaked handles, expected " <> show expected <> ", but actual " <> show actual +checkNoLeakedHandles :: Environment -> IO Property +checkNoLeakedHandles (Environment _ testInternals _ _ _ getNumOpenHandles _ _) = do + expected <- NumOpenHandles <$> LedgerDB.getNumLedgerTablesHandles testInternals + actual <- getNumOpenHandles + pure $ + counterexample + ("leaked handles, expected " <> show expected <> ", but actual " <> show actual) + (actual == expected)