From 6963be2e678261ccaa4e87bd7e515aeb25b2084d Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Fri, 29 Aug 2025 11:25:35 +0200 Subject: [PATCH 1/3] Implement streaming of Ledger Tables Co-authored-by: Alexander Esgen --- .../ouroboros-consensus-cardano.cabal | 8 + .../Cardano/StreamingLedgerTables.hs | 219 +++++++++++ ouroboros-consensus/ouroboros-consensus.cabal | 2 + .../Consensus/Storage/LedgerDB/V2/LSM.hs | 10 + .../Consensus/Util/StreamingLedgerTables.hs | 372 ++++++++++++++++++ 5 files changed, 611 insertions(+) create mode 100644 ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs diff --git a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal index eee812f8bf..58524d8aee 100644 --- a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal +++ b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal @@ -98,6 +98,7 @@ library Ouroboros.Consensus.Cardano.Block Ouroboros.Consensus.Cardano.CanHardFork Ouroboros.Consensus.Cardano.Condense + Ouroboros.Consensus.Cardano.StreamingLedgerTables Ouroboros.Consensus.Cardano.Ledger Ouroboros.Consensus.Cardano.Node Ouroboros.Consensus.Cardano.QueryHF @@ -137,10 +138,14 @@ library bytestring >=0.10 && <0.13, cardano-binary, cardano-crypto, + fs-api, + contra-tracer, + directory, cardano-crypto-class ^>=2.2, cardano-crypto-wrapper, cardano-ledger-allegra ^>=1.8, cardano-ledger-alonzo ^>=1.14, + random, cardano-ledger-api ^>=1.12, cardano-ledger-babbage ^>=1.12, cardano-ledger-binary ^>=1.7, @@ -149,6 +154,8 @@ library cardano-ledger-core ^>=1.18, cardano-ledger-dijkstra ^>=0.1, cardano-ledger-mary ^>=1.9, + temporary, + resource-registry, cardano-ledger-shelley ^>=1.17, cardano-prelude, cardano-protocol-tpraos ^>=1.4.1, @@ -158,6 +165,7 @@ library containers >=0.5 && <0.8, crypton, deepseq, + filepath, formatting >=6.3 && <7.3, measures, mempack, diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs new file mode 100644 index 0000000000..e064bf4e04 --- /dev/null +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs @@ -0,0 +1,219 @@ +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeOperators #-} + +module Ouroboros.Consensus.Cardano.StreamingLedgerTables + ( fromInMemory + , fromLSM + , fromLMDB + , toLMDB + , toLSM + , toInMemory + ) where + +import Cardano.Ledger.BaseTypes (WithOrigin (..)) +import Cardano.Ledger.Binary +import Cardano.Ledger.Core (ByronEra, Era, eraDecoder, toEraCBOR) +import qualified Cardano.Ledger.Shelley.API as SL +import qualified Cardano.Ledger.Shelley.LedgerState as SL +import qualified Cardano.Ledger.State as SL +import qualified Codec.CBOR.Encoding +import Control.ResourceRegistry +import Control.Tracer (nullTracer) +import Data.Proxy +import Data.SOP.BasicFunctors +import Data.SOP.Functors +import Data.SOP.Strict +import qualified Data.SOP.Telescope as Telescope +import qualified Data.Text as T +import Lens.Micro +import Ouroboros.Consensus.Byron.Ledger +import Ouroboros.Consensus.Cardano.Block +import Ouroboros.Consensus.Cardano.Ledger +import Ouroboros.Consensus.HardFork.Combinator +import Ouroboros.Consensus.HardFork.Combinator.State +import Ouroboros.Consensus.Ledger.Abstract +import Ouroboros.Consensus.Ledger.Tables.Utils (emptyLedgerTables) +import Ouroboros.Consensus.Shelley.Ledger +import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol () +import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API +import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB as LMDB +import Ouroboros.Consensus.Storage.LedgerDB.V2.Args +import Ouroboros.Consensus.Storage.LedgerDB.V2.LSM +import Ouroboros.Consensus.Util.StreamingLedgerTables +import System.Directory +import System.FS.API +import System.FS.IO +import System.FilePath as FilePath +import System.IO.Temp +import System.Random + +type L = LedgerState (CardanoBlock StandardCrypto) + +fromInMemory :: FilePath -> L EmptyMK -> ResourceRegistry IO -> IO (YieldArgs L IO) +fromInMemory fp (HardForkLedgerState (HardForkState idx)) _ = + let + np :: + NP + (Current (Flip LedgerState EmptyMK) -.-> K (Decoders L)) + (CardanoEras StandardCrypto) + np = + (Fn $ const $ K $ error "Byron") + :* (Fn $ K . fromEra ShelleyTxOut . unFlip . currentState) + :* (Fn $ K . fromEra AllegraTxOut . unFlip . currentState) + :* (Fn $ K . fromEra MaryTxOut . unFlip . currentState) + :* (Fn $ K . fromEra AlonzoTxOut . unFlip . currentState) + :* (Fn $ K . fromEra BabbageTxOut . unFlip . currentState) + :* (Fn $ K . fromEra ConwayTxOut . unFlip . currentState) + :* (Fn $ K . fromEra DijkstraTxOut . unFlip . currentState) + :* Nil + in + pure $ + YieldInMemory + (SomeHasFS . ioHasFS) + fp + (hcollapse $ hap np $ Telescope.tip idx) + where + fromEra :: + forall proto era. + ShelleyCompatible proto era => + (TxOut (LedgerState (ShelleyBlock proto era)) -> CardanoTxOut StandardCrypto) -> + LedgerState (ShelleyBlock proto era) EmptyMK -> + Decoders L + fromEra toCardanoTxOut st = + let certInterns = + internsFromMap $ + shelleyLedgerState st + ^. SL.nesEsL + . SL.esLStateL + . SL.lsCertStateL + . SL.certDStateL + . SL.accountsL + . SL.accountsMapL + in Decoders + (eraDecoder @era decodeMemPack) + (eraDecoder @era $ toCardanoTxOut <$> decShareCBOR certInterns) + +fromLMDB :: FilePath -> LMDB.LMDBLimits -> L EmptyMK -> ResourceRegistry IO -> IO (YieldArgs L IO) +fromLMDB fp limits hint reg = do + let (dbPath, snapName) = splitFileName fp + tempDir <- getCanonicalTemporaryDirectory + let lmdbTemp = tempDir FilePath. "lmdb_streaming_in" + removePathForcibly lmdbTemp + _ <- + allocate + reg + (\_ -> System.Directory.createDirectory lmdbTemp) + (\_ -> removePathForcibly lmdbTemp) + (_, bs) <- + allocate + reg + ( \_ -> do + LMDB.newLMDBBackingStore + nullTracer + limits + (LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp) + (SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint dbPath) + (InitFromCopy hint (mkFsPath [snapName])) + ) + bsClose + (_, bsvh) <- allocate reg (\_ -> bsValueHandle bs) bsvhClose + pure (YieldLMDB 1000 bsvh) + +fromLSM :: + FilePath -> + String -> + L EmptyMK -> + ResourceRegistry IO -> + IO (YieldArgs L IO) +fromLSM fp snapName _ reg = do + (_, SomeHasFSAndBlockIO hasFS blockIO) <- stdMkBlockIOFS fp reg + salt <- fst . genWord64 <$> newStdGen + (_, session) <- + allocate reg (\_ -> openSession nullTracer hasFS blockIO salt (mkFsPath [])) closeSession + tb <- + allocate + reg + ( \_ -> + openTableFromSnapshot + session + (toSnapshotName snapName) + (SnapshotLabel $ T.pack "UTxO table") + ) + closeTable + YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer reg tb + +toLMDB :: + FilePath -> + LMDB.LMDBLimits -> + L EmptyMK -> + ResourceRegistry IO -> + IO (SinkArgs L IO) +toLMDB fp limits hint reg = do + let (snapDir, snapName) = splitFileName fp + tempDir <- getCanonicalTemporaryDirectory + let lmdbTemp = tempDir FilePath. "lmdb_streaming_out" + removePathForcibly lmdbTemp + _ <- + allocate reg (\_ -> System.Directory.createDirectory lmdbTemp) (\_ -> removePathForcibly lmdbTemp) + (_, bs) <- + allocate + reg + ( \_ -> + LMDB.newLMDBBackingStore + nullTracer + limits + (LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp) + (SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint snapDir) + (InitFromValues (At 0) hint emptyLedgerTables) + ) + bsClose + pure $ SinkLMDB 1000 (bsWrite bs) (\h -> bsCopy bs h (mkFsPath [snapName, "tables"])) + +toInMemory :: + FilePath -> + L EmptyMK -> + ResourceRegistry IO -> + IO (SinkArgs L IO) +toInMemory fp (HardForkLedgerState (HardForkState idx)) _ = do + currDir <- getCurrentDirectory + let + np = + (Fn $ const $ K $ encOne (Proxy @ByronEra)) + :* (Fn $ const $ K $ encOne (Proxy @ShelleyEra)) + :* (Fn $ const $ K $ encOne (Proxy @AllegraEra)) + :* (Fn $ const $ K $ encOne (Proxy @MaryEra)) + :* (Fn $ const $ K $ encOne (Proxy @AlonzoEra)) + :* (Fn $ const $ K $ encOne (Proxy @BabbageEra)) + :* (Fn $ const $ K $ encOne (Proxy @ConwayEra)) + :* (Fn $ const $ K $ encOne (Proxy @DijkstraEra)) + :* Nil + pure $ + uncurry + (SinkInMemory 1000) + (hcollapse $ hap np $ Telescope.tip idx) + (SomeHasFS $ ioHasFS $ MountPoint currDir) + fp + where + encOne :: + forall era. + Era era => + Proxy era -> + (TxIn L -> Codec.CBOR.Encoding.Encoding, TxOut L -> Codec.CBOR.Encoding.Encoding) + encOne _ = + (toEraCBOR @era . encodeMemPack, toEraCBOR @era . eliminateCardanoTxOut (const encodeMemPack)) + +toLSM :: + FilePath -> + String -> + L EmptyMK -> + ResourceRegistry IO -> + IO (SinkArgs L IO) +toLSM fp snapName _ reg = do + removePathForcibly fp + System.Directory.createDirectory fp + (_, SomeHasFSAndBlockIO hasFS blockIO) <- stdMkBlockIOFS fp reg + salt <- fst . genWord64 <$> newStdGen + (_, session) <- + allocate reg (\_ -> newSession nullTracer hasFS blockIO salt (mkFsPath [])) closeSession + pure (SinkLSM 1000 snapName session) diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 836705b2b4..5f91e83ccb 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -280,6 +280,7 @@ library Ouroboros.Consensus.Util.Args Ouroboros.Consensus.Util.Assert Ouroboros.Consensus.Util.CBOR + Ouroboros.Consensus.Util.StreamingLedgerTables Ouroboros.Consensus.Util.CRC Ouroboros.Consensus.Util.CallStack Ouroboros.Consensus.Util.Condense @@ -326,6 +327,7 @@ library deepseq, diff-containers >=1.2, filelock, + filepath, fingertree-rm >=1.0, fs-api ^>=0.4, hashable, diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs index 3b045c661a..8013cef224 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs @@ -22,6 +22,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM ( -- * LedgerTablesHandle newLSMLedgerTablesHandle , tableFromValuesMK + , UTxOTable -- * Snapshots , loadSnapshot @@ -39,6 +40,15 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM -- * snapshot-converter , implTakeSnapshot + , LSM.withNewSession + , toTxInBytes + , toTxOutBytes + , LSM.newSession + , LSM.toSnapshotName + , LSM.SnapshotLabel (LSM.SnapshotLabel) + , LSM.openTableFromSnapshot + , LSM.closeTable + , LSM.listSnapshots ) where import Cardano.Binary as CBOR diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs new file mode 100644 index 0000000000..805a41d2bc --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs @@ -0,0 +1,372 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ViewPatterns #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Ouroboros.Consensus.Util.StreamingLedgerTables + ( stream + , yield + , sink + , YieldArgs (..) + , SinkArgs (..) + , Decoders (..) + ) where + +import Cardano.Slotting.Slot +import Codec.CBOR.Decoding (Decoder, decodeBreakOr, decodeListLen, decodeMapLenOrIndef) +import Codec.CBOR.Encoding (Encoding, encodeBreak, encodeListLen, encodeMapLenIndef) +import Codec.CBOR.Read +import Codec.CBOR.Write +import Control.Concurrent.Class.MonadMVar +import Control.Monad (replicateM_, unless) +import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadST +import Control.Monad.Class.MonadSTM +import Control.Monad.Class.MonadThrow +import Control.Monad.Except +import Control.Monad.State.Strict +import Control.ResourceRegistry +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS +import Data.ByteString.Builder.Extra (defaultChunkSize) +import qualified Data.Map.Strict as Map +import Data.MemPack +import Data.Proxy +import qualified Data.Set as Set +import qualified Data.Text as T +import qualified Data.Vector as V +import Database.LSMTree +import Ouroboros.Consensus.Ledger.Abstract +import Ouroboros.Consensus.Ledger.Tables.Diff +import Ouroboros.Consensus.Storage.LedgerDB.API +import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API +import Ouroboros.Consensus.Storage.LedgerDB.V2.LSM +import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq +import Ouroboros.Consensus.Util.IndexedMemPack +import Ouroboros.Network.Block +import Streaming +import qualified Streaming as S +import qualified Streaming.Prelude as S +import System.FS.API +import System.FS.CRC +import qualified System.FilePath as F + +data Decoders l + = Decoders + (forall s. Codec.CBOR.Decoding.Decoder s (TxIn l)) + (forall s. Codec.CBOR.Decoding.Decoder s (TxOut l)) + +stream :: + Constraints l m => + l EmptyMK -> + (l EmptyMK -> ResourceRegistry m -> m (YieldArgs l m)) -> + (l EmptyMK -> ResourceRegistry m -> m (SinkArgs l m)) -> + ExceptT DeserialiseFailure m (Maybe CRC, Maybe CRC) +stream st mYieldArgs mSinkArgs = + ExceptT $ + withRegistry $ \reg -> do + yArgs <- mYieldArgs st reg + sArgs <- mSinkArgs st reg + runExceptT $ yield yArgs st $ sink sArgs st + +type Yield l m = + l EmptyMK -> + ( ( Stream (Of (TxIn l, TxOut l)) (ExceptT DeserialiseFailure m) (Stream (Of ByteString) m (Maybe CRC)) -> + ExceptT DeserialiseFailure m (Stream (Of ByteString) m (Maybe CRC, Maybe CRC)) + ) + ) -> + ExceptT DeserialiseFailure m (Maybe CRC, Maybe CRC) + +type Sink l m r = + l EmptyMK -> + Stream (Of (TxIn l, TxOut l)) (ExceptT DeserialiseFailure m) (Stream (Of ByteString) m (Maybe CRC)) -> + ExceptT DeserialiseFailure m (Stream (Of ByteString) m (Maybe CRC, Maybe CRC)) + +instance MonadST m => MonadST (ExceptT e m) where + stToIO = lift . stToIO + +data YieldArgs l m + = -- | Yield an in-memory snapshot + YieldInMemory + -- | How to make a SomeHasFS for @m@ + (MountPoint -> SomeHasFS m) + -- | The file path at which the HasFS has to be opened + FilePath + (Decoders l) + | -- | Yield an LMDB snapshot + YieldLMDB + Int + (LedgerBackingStoreValueHandle m l) + | -- | Yield an LSM snapshot + YieldLSM + Int + (LedgerTablesHandle m l) + +yield :: Constraints l m => YieldArgs l m -> Yield l m +yield = \case + YieldInMemory mkFs fp (Decoders decK decV) -> yieldInMemoryS mkFs fp decK decV + YieldLMDB chunkSize valueHandle -> yieldLmdbS chunkSize valueHandle + YieldLSM chunkSize hdl -> yieldLsmS chunkSize hdl + +type Constraints l m = + ( LedgerSupportsV1LedgerDB l + , LedgerSupportsV2LedgerDB l + , HasLedgerTables l + , GetTip l + , IOLike m + ) + +sink :: + Constraints l m => + SinkArgs l m -> Sink l m r +sink = \case + SinkLMDB chunkSize write copy -> sinkLmdbS chunkSize write copy + SinkLSM chunkSize snapName session -> sinkLsmS chunkSize snapName session + SinkInMemory chunkSize encK encV shfs fp -> sinkInMemoryS chunkSize encK encV shfs fp + +data SinkArgs l m + = SinkInMemory + Int + (TxIn l -> Encoding) + (TxOut l -> Encoding) + (SomeHasFS m) + FilePath + | SinkLSM + -- | Chunk size + Int + -- | Snap name + String + (Session m) + | SinkLMDB + -- | Chunk size + Int + -- | bsWrite + (SlotNo -> (l EmptyMK, l EmptyMK) -> LedgerTables l DiffMK -> m ()) + (l EmptyMK -> m ()) + +{------------------------------------------------------------------------------- + Yielding InMemory +-------------------------------------------------------------------------------} + +streamingFile :: + forall m. + MonadThrow m => + SomeHasFS m -> + FsPath -> + ( Stream (Of ByteString) m (Maybe CRC) -> + ExceptT DeserialiseFailure m (Stream (Of ByteString) m (Maybe CRC, Maybe CRC)) + ) -> + ExceptT DeserialiseFailure m (Maybe CRC, Maybe CRC) +streamingFile (SomeHasFS fs') path cont = + ExceptT $ withFile fs' path ReadMode $ \hdl -> + runExceptT $ cont (getBS hdl initCRC) >>= noRemainingBytes + where + getBS h !crc = do + bs <- S.lift $ hGetSome fs' h (fromIntegral defaultChunkSize) + if BS.null bs + then pure (Just crc) + else do + S.yield bs + getBS h $! updateCRC bs crc + + noRemainingBytes s = + lift (S.uncons s) >>= \case + Nothing -> lift $ S.effects s + Just (BS.null -> True, s') -> noRemainingBytes s' + Just _ -> throwError $ DeserialiseFailure 0 "Remaining bytes" + +yieldCborMapS :: + forall m a b. + MonadST m => + (forall s. Decoder s a) -> + (forall s. Decoder s b) -> + Stream (Of ByteString) m (Maybe CRC) -> + Stream (Of (a, b)) (ExceptT DeserialiseFailure m) (Stream (Of ByteString) m (Maybe CRC)) +yieldCborMapS decK decV = execStateT $ do + hoist lift (decodeCbor decodeListLen >> decodeCbor decodeMapLenOrIndef) >>= \case + Nothing -> go + Just n -> replicateM_ n yieldKV + where + yieldKV = do + kv <- hoist lift $ decodeCbor $ (,) <$> decK <*> decV + lift $ S.yield kv + + go = do + doBreak <- hoist lift $ decodeCbor decodeBreakOr + unless doBreak $ yieldKV *> go + + decodeCbor dec = + StateT $ \s -> go' s =<< lift (stToIO (deserialiseIncremental dec)) + where + go' s = \case + Partial k -> + lift (S.next s) >>= \case + Right (bs, s') -> go' s' =<< lift (stToIO (k (Just bs))) + Left r -> go' (pure r) =<< lift (stToIO (k Nothing)) + Done bs _off a -> pure (a, S.yield bs *> s) + Fail _bs _off err -> throwError err + +yieldInMemoryS :: + (MonadThrow m, MonadST m) => + (MountPoint -> SomeHasFS m) -> + FilePath -> + (forall s. Decoder s (TxIn l)) -> + (forall s. Decoder s (TxOut l)) -> + Yield l m +yieldInMemoryS mkFs (F.splitFileName -> (fp, fn)) decK decV _ k = + streamingFile (mkFs $ MountPoint fp) (mkFsPath [fn]) $ \s -> do + k $ yieldCborMapS decK decV s + +{------------------------------------------------------------------------------- + Yielding OnDisk backends +-------------------------------------------------------------------------------} + +yieldLmdbS :: + Monad m => + Int -> + LedgerBackingStoreValueHandle m l -> + Yield l m +yieldLmdbS readChunkSize bsvh hint k = do + r <- k (go (RangeQuery Nothing readChunkSize)) + lift $ S.effects r + where + go p = do + (LedgerTables (ValuesMK values), mx) <- lift $ S.lift $ bsvhRangeRead bsvh hint p + case mx of + Nothing -> pure $ pure Nothing + Just x -> do + S.each $ Map.toList values + go (RangeQuery (Just . LedgerTables . KeysMK $ Set.singleton x) readChunkSize) + +yieldLsmS :: + Monad m => + Int -> + LedgerTablesHandle m l -> + Yield l m +yieldLsmS readChunkSize tb hint k = do + r <- k (go (Nothing, readChunkSize)) + lift $ S.effects r + where + go p = do + (LedgerTables (ValuesMK values), mx) <- lift $ S.lift $ readRange tb hint p + if Map.null values + then pure $ pure Nothing + else do + S.each $ Map.toList values + go (mx, readChunkSize) + +{------------------------------------------------------------------------------- + Sink +-------------------------------------------------------------------------------} + +sinkLmdbS :: + forall m l r. + (Ord (TxIn l), GetTip l, Monad m) => + Int -> + (SlotNo -> (l EmptyMK, l EmptyMK) -> LedgerTables l DiffMK -> m ()) -> + (l EmptyMK -> m ()) -> + Sink l m r +sinkLmdbS writeChunkSize bs copyTo hint s = do + r <- go writeChunkSize mempty s + lift $ copyTo hint + pure (fmap (,Nothing) r) + where + sl = withOrigin (error "unreachable") id $ pointSlot $ getTip hint + + go 0 m s' = do + lift $ bs sl (hint, hint) (LedgerTables $ DiffMK $ fromMapInserts m) + go writeChunkSize mempty s' + go n m s' = do + mbs <- S.uncons s' + case mbs of + Nothing -> do + lift $ bs sl (hint, hint) (LedgerTables $ DiffMK $ fromMapInserts m) + S.effects s' + Just ((k, v), s'') -> + go (n - 1) (Map.insert k v m) s'' + +sinkLsmS :: + forall l m r. + ( MonadAsync m + , MonadMVar m + , MonadThrow (STM m) + , MonadMask m + , MonadST m + , MonadEvaluate m + , MemPack (TxIn l) + , IndexedMemPack (l EmptyMK) (TxOut l) + ) => + Int -> + String -> + Session m -> + Sink l m r +sinkLsmS writeChunkSize snapName session st s = do + tb :: UTxOTable m <- lift $ newTable session + r <- go tb writeChunkSize mempty s + lift $ + saveSnapshot + (toSnapshotName snapName) + (SnapshotLabel $ T.pack "UTxO table") + tb + lift $ closeTable tb + pure (fmap (,Nothing) r) + where + go tb 0 m s' = do + lift $ + inserts tb $ + V.fromList [(toTxInBytes (Proxy @l) k, toTxOutBytes st v, Nothing) | (k, v) <- m] + go tb writeChunkSize mempty s' + go tb n m s' = do + mbs <- S.uncons s' + case mbs of + Nothing -> do + lift $ + inserts tb $ + V.fromList + [(toTxInBytes (Proxy @l) k, toTxOutBytes st v, Nothing) | (k, v) <- m] + S.effects s' + Just (item, s'') -> go tb (n - 1) (item : m) s'' + +sinkInMemoryS :: + forall m l r. + MonadThrow m => + Int -> + (TxIn l -> Encoding) -> + (TxOut l -> Encoding) -> + SomeHasFS m -> + FilePath -> + Sink l m r +sinkInMemoryS writeChunkSize encK encV (SomeHasFS fs) fp _ s = + ExceptT $ withFile fs (mkFsPath [fp]) (WriteMode MustBeNew) $ \hdl -> do + let bs = toStrictByteString (encodeListLen 1 <> encodeMapLenIndef) + let !crc0 = updateCRC bs initCRC + void $ hPutSome fs hdl bs + e <- runExceptT $ go hdl crc0 writeChunkSize mempty s + case e of + Left err -> pure $ Left err + Right (r, crc1) -> do + let bs1 = toStrictByteString encodeBreak + void $ hPutSome fs hdl bs1 + let !crc2 = updateCRC bs1 crc1 + pure $ Right (fmap (,Just crc2) r) + where + go tb !crc 0 m s' = do + let bs = toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- reverse m] + lift $ void $ hPutSome fs tb bs + let !crc1 = updateCRC bs crc + go tb crc1 writeChunkSize mempty s' + go tb !crc n m s' = do + mbs <- S.uncons s' + case mbs of + Nothing -> do + let bs = toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- reverse m] + lift $ void $ hPutSome fs tb bs + let !crc1 = updateCRC bs crc + (,crc1) <$> S.effects s' + Just (item, s'') -> go tb crc (n - 1) (item : m) s'' From 6273d72a3b26a5f511ced381a61551ad9fd2837e Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Fri, 29 Aug 2025 11:26:47 +0200 Subject: [PATCH 2/3] Re-implement snapshot-converter --- .../app/snapshot-converter.hs | 682 ++++++++++++------ ...0250904_111240_javier.sagredo_lsm_sc_da.md | 25 + .../ouroboros-consensus-cardano.cabal | 22 +- ...0250904_111212_javier.sagredo_lsm_sc_da.md | 25 + ouroboros-consensus/ouroboros-consensus.cabal | 2 +- 5 files changed, 535 insertions(+), 221 deletions(-) create mode 100644 ouroboros-consensus-cardano/changelog.d/20250904_111240_javier.sagredo_lsm_sc_da.md create mode 100644 ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md diff --git a/ouroboros-consensus-cardano/app/snapshot-converter.hs b/ouroboros-consensus-cardano/app/snapshot-converter.hs index bf4354a20c..c1d98d01aa 100644 --- a/ouroboros-consensus-cardano/app/snapshot-converter.hs +++ b/ouroboros-consensus-cardano/app/snapshot-converter.hs @@ -1,9 +1,10 @@ -{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE ViewPatterns #-} module Main (main) where @@ -13,262 +14,525 @@ import Cardano.Tools.DBAnalyser.HasAnalysis (mkProtocolInfo) import Codec.Serialise import qualified Control.Monad as Monad import Control.Monad.Except -import qualified Control.Monad.Trans as Trans (lift) -import Control.ResourceRegistry (ResourceRegistry) -import qualified Control.ResourceRegistry as RR -import Control.Tracer (nullTracer) +import Control.Monad.Trans (lift) +import Control.ResourceRegistry import DBAnalyser.Parsers import Data.Bifunctor -import qualified Data.ByteString.Builder as BS -import qualified Data.SOP.Dict as Dict +import Data.Char (toLower) +import qualified Data.Text.Lazy as T import Main.Utf8 import Options.Applicative +import Options.Applicative.Help (Doc, line) import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Cardano.Block +import Ouroboros.Consensus.Cardano.StreamingLedgerTables import Ouroboros.Consensus.Config import Ouroboros.Consensus.Ledger.Basics import Ouroboros.Consensus.Ledger.Extended -import Ouroboros.Consensus.Ledger.SupportsProtocol -import Ouroboros.Consensus.Ledger.Tables.Utils import Ouroboros.Consensus.Node.ProtocolInfo -import Ouroboros.Consensus.Storage.LedgerDB import Ouroboros.Consensus.Storage.LedgerDB.Snapshots -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Args as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore as V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.DbChangelog as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Lock as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as InMemory -import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as V2 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq as V2 import Ouroboros.Consensus.Util.CRC import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.StreamingLedgerTables +import System.Console.ANSI +import qualified System.Directory as D +import System.Exit import System.FS.API -import System.FS.API.Lazy import System.FS.CRC import System.FS.IO -import System.FilePath (splitFileName) -import System.IO.Temp +import System.FilePath (splitDirectories) +import qualified System.FilePath as F +import System.IO +import System.ProgressBar data Format - = Legacy - | Mem - | LMDB + = Mem FilePath + | LMDB FilePath + | LSM FilePath FilePath deriving (Show, Read) data Config = Config { from :: Format -- ^ Which format the input snapshot is in - , inpath :: FilePath - -- ^ Path to the input snapshot , to :: Format -- ^ Which format the output snapshot must be in - , outpath :: FilePath - -- ^ Path to the output snapshot } getCommandLineConfig :: IO (Config, CardanoBlockArgs) getCommandLineConfig = execParser $ info - ((,) <$> parseConfig <*> parseCardanoArgs <**> helper) - (fullDesc <> progDesc "Utility for converting snapshots to and from UTxO-HD") - -parseConfig :: Parser Config -parseConfig = - Config - <$> argument - auto - ( mconcat - [ help "From format (Legacy, Mem or LMDB)" - , metavar "FORMAT-IN" - ] - ) - <*> strArgument - ( mconcat - [ help "Input dir/file. Use relative paths like ./100007913" - , metavar "PATH-IN" - ] + ((,) <$> (Config <$> parseConfig In <*> parseConfig Out) <*> parseCardanoArgs <**> helper) + ( fullDesc + <> header "Utility for converting snapshots among the different snapshot formats used by cardano-node." + <> progDescDoc programDescription ) - <*> argument - auto - ( mconcat - [ help "To format (Legacy, Mem or LMDB)" - , metavar "FORMAT-OUT" + +programDescription :: Maybe Doc +programDescription = + Just $ + mconcat + [ "The input snapshot must correspond to a snapshot that was produced by " + , "a cardano-node, and thus follows the naming convention used in the node." + , line + , "This means in particular that the filepath to the snapshot must have as " + , "the last fragment a directory named after the slot number of the ledger " + , "state snapshotted, plus an optional suffix, joined by an underscore." + , line + , line + , "For the output, the same convention is enforced, so that the produced " + , "snapshot can be loaded right away by a cardano-node." + , line + , line + , "Note that snapshots that have a suffix will be preserved by the node. " + , "If you produce a snapshot with a suffix and you start a node with it, " + , "the node will take as many more snapshots as it is configured to take, " + , "but it will never delete your snapshot, because it has a suffix on the name." + , line + , "Therefore, for the most common use case it is advisable to create a " + , "snapshot without a suffix, as in:" + , line + , line + , "```" + , line + , "$ mkdir out" + , line + , "$ snapshot-converter ---in / ---out out/ --config " + , line + , "```" + ] + +data InOut = In | Out + +inoutForGroup :: InOut -> String +inoutForGroup In = "Input arguments:" +inoutForGroup Out = "Output arguments:" + +inoutForHelp :: InOut -> String -> Bool -> String +inoutForHelp In s b = + mconcat $ + ("Input " <> s) + : if b + then + [ ". Must be a filepath where the last fragment is named after the " + , "slot of the snapshotted state plus an optional suffix. Example: `1645330287_suffix`." ] - ) - <*> strArgument - ( mconcat - [ help "Output dir/file Use relative paths like ./100007913" - , metavar "PATH-OUT" + else [] +inoutForHelp Out s b = + mconcat $ + ("Output " <> s) + : if b + then + [ ". Must be a filepath where the last fragment is named after the " + , "slot of the snapshotted state plus an optional suffix. Example: `1645330287_suffix`." ] - ) + else [] --- Helpers +inoutForCommand :: InOut -> String -> String +inoutForCommand In = (++ "-in") +inoutForCommand Out = (++ "-out") -pathToDiskSnapshot :: FilePath -> Maybe (SomeHasFS IO, FsPath, DiskSnapshot) -pathToDiskSnapshot path = (SomeHasFS $ ioHasFS $ MountPoint dir,mkFsPath [file],) <$> snapshotFromPath file - where - (dir, file) = splitFileName path +parseConfig :: InOut -> Parser Format +parseConfig io = + ( Mem + <$> parserOptionGroup + (inoutForGroup io) + (parsePath (inoutForCommand io "mem") (inoutForHelp io "snapshot dir" True)) + ) + <|> ( LMDB + <$> parserOptionGroup + (inoutForGroup io) + (parsePath (inoutForCommand io "lmdb") (inoutForHelp io "snapshot dir" True)) + ) + <|> ( LSM + <$> parserOptionGroup + (inoutForGroup io) + (parsePath (inoutForCommand io "lsm-snapshot") (inoutForHelp io "snapshot dir" True)) + <*> parserOptionGroup + (inoutForGroup io) + (parsePath (inoutForCommand io "lsm-database") (inoutForHelp io "LSM database" False)) + ) -defaultLMDBLimits :: V1.LMDBLimits -defaultLMDBLimits = - V1.LMDBLimits - { V1.lmdbMapSize = 16 * 1024 * 1024 * 1024 - , V1.lmdbMaxDatabases = 10 - , V1.lmdbMaxReaders = 16 - } +parsePath :: String -> String -> Parser FilePath +parsePath optName strHelp = + strOption + ( mconcat + [ long optName + , help strHelp + , metavar "PATH" + ] + ) data Error blk = SnapshotError (SnapshotFailure blk) - | TablesCantDeserializeError DeserialiseFailure - | TablesTrailingBytes - | SnapshotFormatMismatch Format String - | ReadSnapshotCRCError FsPath CRCError + | BadDirectoryName FilePath + | WrongSlotDirectoryName FilePath SlotNo + | InvalidMetadata String + | BackendMismatch SnapshotBackend SnapshotBackend + | CRCMismatch CRC CRC + | ReadTablesError DeserialiseFailure + | Cancelled deriving Exception instance StandardHash blk => Show (Error blk) where show (SnapshotError err) = "Couldn't deserialize the snapshot. Are you running the same node version that created the snapshot? " <> show err - show (TablesCantDeserializeError err) = "Couldn't deserialize the tables: " <> show err - show TablesTrailingBytes = "Malformed tables, there are trailing bytes!" - show (SnapshotFormatMismatch expected err) = - "The input snapshot does not seem to correspond to the input format:\n\t" - <> show expected - <> "\n\tThe provided path " - <> err - show (ReadSnapshotCRCError fp err) = "An error occurred while reading the snapshot checksum at " <> show fp <> ": \n\t" <> show err - -checkSnapshotFileStructure :: Format -> FsPath -> SomeHasFS IO -> ExceptT (Error blk) IO () -checkSnapshotFileStructure m p (SomeHasFS fs) = case m of - Legacy -> want (doesFileExist fs) p "is NOT a file" - Mem -> newFormatCheck "tvar" - LMDB -> newFormatCheck "data.mdb" - where - want :: (FsPath -> IO Bool) -> FsPath -> String -> ExceptT (Error blk) IO () - want fileType path err = do - exists <- Trans.lift $ fileType path - Monad.unless exists $ throwError $ SnapshotFormatMismatch m err - - isDir = (doesDirectoryExist, [], "is NOT a directory") - hasTablesDir = (doesDirectoryExist, ["tables"], "DOES NOT contain a \"tables\" directory") - hasState = (doesFileExist, ["state"], "DOES NOT contain a \"state\" file") - hasTables tb = (doesFileExist, ["tables", tb], "DOES NOT contain a \"tables/" <> tb <> "\" file") - - newFormatCheck tb = - mapM_ - (\(doCheck, extra, err) -> want (doCheck fs) (p mkFsPath extra) err) - [ isDir - , hasTablesDir - , hasState - , hasTables tb + show (BadDirectoryName fp) = + mconcat + [ "Filepath " + , fp + , " is not an snapshot. The last fragment on the path should be" + , " named after the slot number of the state it contains and an" + , " optional suffix, such as `163470034` or `163470034_my-suffix`." ] + show (InvalidMetadata s) = "Metadata is invalid: " <> s + show (BackendMismatch b1 b2) = + mconcat + [ "Mismatched backend in snapshot. Reading as " + , show b1 + , " but snapshot is " + , show b2 + ] + show (WrongSlotDirectoryName fp sl) = + mconcat + [ "The name of the snapshot (\"" + , fp + , "\") does not correspond to the slot number of the state (" + , (show . unSlotNo $ sl) + , ")." + ] + show (CRCMismatch c1 c2) = + mconcat + [ "The input snapshot seems corrupted. Metadata has CRC " + , show c1 + , " but reading it gives CRC " + , show c2 + ] + show (ReadTablesError df) = + mconcat + ["Error when reading entries in the UTxO tables: ", show df] + show Cancelled = "Cancelled" + +data InEnv = InEnv + { inState :: LedgerState (CardanoBlock StandardCrypto) EmptyMK + , inFilePath :: FilePath + , inStream :: + LedgerState (CardanoBlock StandardCrypto) EmptyMK -> + ResourceRegistry IO -> + IO (YieldArgs (LedgerState (CardanoBlock StandardCrypto)) IO) + , inProgressMsg :: String + , inCRC :: CRC + , inSnapReadCRC :: Maybe CRC + } -load :: - forall blk. - ( CanStowLedgerTables (LedgerState blk) - , LedgerSupportsProtocol blk - , LedgerSupportsLedgerDB blk - ) => - Config -> - ResourceRegistry IO -> - CodecConfig blk -> - FilePath -> - ExceptT (Error blk) IO (ExtLedgerState blk EmptyMK, LedgerTables (ExtLedgerState blk) ValuesMK) -load config@Config{inpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS), path, ds)} rr ccfg tempFP = - case from config of - Legacy -> do - checkSnapshotFileStructure Legacy path fs - (st, checksumAsRead) <- - first unstowLedgerTables - <$> withExceptT - (SnapshotError . InitFailureRead . ReadSnapshotFailed) - (readExtLedgerState fs (decodeDiskExtLedgerState ccfg) decode path) - let crcPath = path <.> "checksum" - crcFileExists <- Trans.lift $ doesFileExist hasFS crcPath - Monad.when crcFileExists $ do - snapshotCRC <- - withExceptT (ReadSnapshotCRCError crcPath) $ - readCRC hasFS crcPath - Monad.when (checksumAsRead /= snapshotCRC) $ - throwError $ - SnapshotError $ - InitFailureRead ReadSnapshotDataCorruption - pure (forgetLedgerTables st, projectLedgerTables st) - Mem -> do - checkSnapshotFileStructure Mem path fs - (ls, _) <- withExceptT SnapshotError $ V2.loadSnapshot nullTracer rr ccfg fs ds - let h = V2.currentHandle ls - (V2.state h,) <$> Trans.lift (V2.readAll (V2.tables h) (V2.state h)) - LMDB -> do - checkSnapshotFileStructure LMDB path fs - ((dbch, k, bstore), _) <- - withExceptT SnapshotError $ - V1.loadSnapshot - nullTracer - (V1.LMDBBackingStoreArgs tempFP defaultLMDBLimits Dict.Dict) - ccfg - (V1.SnapshotsFS fs) - rr - ds - values <- Trans.lift (V1.bsReadAll bstore (V1.changelogLastFlushedState dbch)) - _ <- Trans.lift $ RR.release k - pure (V1.current dbch, values) -load _ _ _ _ = error "Malformed input path!" - -store :: - ( CanStowLedgerTables (LedgerState blk) - , LedgerSupportsProtocol blk - , LedgerSupportsLedgerDB blk - ) => - Config -> - CodecConfig blk -> - (ExtLedgerState blk EmptyMK, LedgerTables (ExtLedgerState blk) ValuesMK) -> - SomeHasFS IO -> - IO () -store config@Config{outpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS), path, DiskSnapshot _ suffix)} ccfg (state, tbs) tempFS = - case to config of - Legacy -> do - crc <- - writeExtLedgerState - fs - (encodeDiskExtLedgerState ccfg) - path - (stowLedgerTables $ state `withLedgerTables` tbs) - withFile hasFS (path <.> "checksum") (WriteMode MustBeNew) $ \h -> - Monad.void $ hPutAll hasFS h . BS.toLazyByteString . BS.word32HexFixed $ getCRC crc - Mem -> do - lseq <- V2.empty state tbs $ V2.newInMemoryLedgerTablesHandle nullTracer fs - let h = V2.currentHandle lseq - Monad.void $ InMemory.implTakeSnapshot ccfg nullTracer fs suffix h - LMDB -> do - chlog <- newTVarIO (V1.empty state) - lock <- V1.mkLedgerDBLock - bs <- - V1.newLMDBBackingStore - nullTracer - defaultLMDBLimits - (V1.LiveLMDBFS tempFS) - (V1.SnapshotsFS fs) - (V1.InitFromValues (pointSlot $ getTip state) state tbs) - Monad.void $ V1.withReadLock lock $ do - V1.implTakeSnapshot chlog ccfg nullTracer (V1.SnapshotsFS fs) bs suffix -store _ _ _ _ = error "Malformed output path!" +data OutEnv = OutEnv + { outFilePath :: FilePath + , outStream :: + LedgerState (CardanoBlock StandardCrypto) EmptyMK -> + ResourceRegistry IO -> + IO (SinkArgs (LedgerState (CardanoBlock StandardCrypto)) IO) + , outCreateExtra :: Maybe FilePath + , outDeleteExtra :: Maybe FilePath + , outProgressMsg :: String + , outBackend :: SnapshotBackend + } main :: IO () main = withStdTerminalHandles $ do - cryptoInit - uncurry run =<< getCommandLineConfig + eRes <- runExceptT main' + case eRes of + Left err -> do + putStrLn $ show err + exitFailure + Right () -> exitSuccess where - run conf args = do - ccfg <- configCodec . pInfoConfig <$> mkProtocolInfo args - withSystemTempDirectory "lmdb" $ \dir -> do - let tempFS = SomeHasFS $ ioHasFS $ MountPoint dir - RR.withRegistry $ \rr -> do - putStrLn "Loading snapshot..." - state <- either throwIO pure =<< runExceptT (load conf rr ccfg dir) - putStrLn "Loaded snapshot" - putStrLn "Writing snapshot..." - store conf ccfg state tempFS - putStrLn "Written snapshot" + main' = do + lift $ cryptoInit + (conf, args) <- lift $ getCommandLineConfig + ccfg <- lift $ configCodec . pInfoConfig <$> mkProtocolInfo args + + InEnv{..} <- getInEnv ccfg (from conf) + + o@OutEnv{..} <- getOutEnv inState (to conf) + + wipeOutputPaths o + + lift $ putStr "Copying state file..." >> hFlush stdout + lift $ D.copyFile (inFilePath F. "state") (outFilePath F. "state") + lift $ putColored Green True "Done" + + lift $ putStr "Streaming ledger tables..." >> hFlush stdout >> saveCursor + + tid <- lift $ niceAnimatedProgressBar inProgressMsg outProgressMsg + + eRes <- lift $ runExceptT (stream inState inStream outStream) + + case eRes of + Left err -> throwError $ ReadTablesError err + Right (mCRCIn, mCRCOut) -> do + lift $ maybe (pure ()) cancel tid + lift $ clearLine >> restoreCursor >> cursorUp 1 >> putColored Green True "Done" + let crcIn = maybe inCRC (crcOfConcat inCRC) mCRCIn + maybe + ( lift $ + putColored Yellow True "The metadata file is missing, the snapshot is not guaranteed to be correct!" + ) + ( \cs -> + Monad.when (cs /= crcIn) $ throwError $ CRCMismatch cs crcIn + ) + inSnapReadCRC + + let crcOut = maybe inCRC (crcOfConcat inCRC) mCRCOut + + lift $ putStr "Generating new metadata file..." >> hFlush stdout + putMetadata outFilePath (SnapshotMetadata outBackend crcOut) + + lift $ putColored Green True "Done" + + wipeOutputPaths OutEnv{..} = do + wipePath outFilePath + lift $ maybe (pure ()) (D.createDirectory . (outFilePath F.)) outCreateExtra + maybe + (pure ()) + wipePath + outDeleteExtra + + getState ccfg fp@(pathToHasFS -> fs) = do + eState <- lift $ do + putStr $ "Reading ledger state from " <> (fp F. "state") <> "..." + hFlush stdout + runExceptT (readExtLedgerState fs (decodeDiskExtLedgerState ccfg) decode (mkFsPath ["state"])) + case eState of + Left err -> + throwError . SnapshotError . InitFailureRead @(CardanoBlock StandardCrypto) . ReadSnapshotFailed $ + err + Right st -> lift $ do + putColored Green True " Done" + pure . first ledgerState $ st + + getMetadata fp bknd = do + (fs, ds) <- toDiskSnapshot fp + mtd <- + lift $ runExceptT $ loadSnapshotMetadata fs ds + (,ds) + <$> either + ( \case + MetadataFileDoesNotExist -> pure Nothing + MetadataInvalid s -> throwError $ InvalidMetadata s + MetadataBackendMismatch -> error "impossible" + ) + ( \mtd' -> do + if bknd /= snapshotBackend mtd' + then throwError $ BackendMismatch bknd (snapshotBackend mtd') + else pure $ Just $ snapshotChecksum mtd' + ) + mtd + + putMetadata fp bknd = do + (fs, ds) <- toDiskSnapshot fp + lift $ writeSnapshotMetadata fs ds bknd + + getInEnv ccfg = \case + Mem fp -> do + (mtd, ds) <- getMetadata fp UTxOHDMemSnapshot + (st, c) <- getState ccfg fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + ( withOrigin + ( error + "Impossible! the snapshot seems to be at Genesis but cardano-node would never create such an snapshot!" + ) + id + $ pointSlot (getTip st) + ) + ) + + pure $ + InEnv + st + fp + (fromInMemory (fp F. "tables" F. "tvar")) + ("InMemory@[" <> fp <> "]") + c + mtd + LMDB fp -> do + (mtd, ds) <- getMetadata fp UTxOHDLMDBSnapshot + (st, c) <- getState ccfg fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + + pure $ + InEnv + st + fp + (fromLMDB (fp F. "tables") defaultLMDBLimits) + ("LMDB@[" <> fp <> "]") + c + mtd + LSM fp lsmDbPath -> do + (mtd, ds) <- getMetadata fp UTxOHDLSMSnapshot + (st, c) <- getState ccfg fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + + pure $ + InEnv + st + fp + (fromLSM lsmDbPath (last $ splitDirectories fp)) + ("LSM@[" <> lsmDbPath <> "]") + c + mtd + + getOutEnv st = \case + Mem fp -> do + (_, ds) <- toDiskSnapshot fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + pure $ + OutEnv + fp + (toInMemory (fp F. "tables" F. "tvar")) + (Just "tables") + Nothing + ("InMemory@[" <> fp <> "]") + UTxOHDMemSnapshot + LMDB fp -> do + (_, ds) <- toDiskSnapshot fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + pure $ + OutEnv + fp + (toLMDB fp defaultLMDBLimits) + Nothing + Nothing + ("LMDB@[" <> fp <> "]") + UTxOHDLMDBSnapshot + LSM fp lsmDbPath -> do + (_, ds) <- toDiskSnapshot fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + pure $ + OutEnv + fp + (toLSM lsmDbPath (last $ splitDirectories fp)) + Nothing + (Just lsmDbPath) + ("LSM@[" <> lsmDbPath <> "]") + UTxOHDLSMSnapshot + +-- Helpers + +-- UI +niceAnimatedProgressBar :: String -> String -> IO (Maybe (Async IO ())) +niceAnimatedProgressBar inMsg outMsg = do + stdoutSupportsANSI <- hNowSupportsANSI stdout + if stdoutSupportsANSI + then do + putStrLn "" + pb <- + newProgressBar + defStyle{stylePrefix = msg (T.pack inMsg), stylePostfix = msg (T.pack outMsg)} + 10 + (Progress 1 100 ()) + + fmap Just $ + async $ + let loop = do + threadDelay 0.2 + updateProgress pb (\prg -> prg{progressDone = (progressDone prg + 4) `mod` 100}) + in Monad.forever loop + else pure Nothing + +putColored :: Color -> Bool -> String -> IO () +putColored c b s = do + stdoutSupportsANSI <- hNowSupportsANSI stdout + Monad.when stdoutSupportsANSI $ setSGR [SetColor Foreground Vivid c] + if b + then + putStrLn s + else + putStr s + Monad.when stdoutSupportsANSI $ setSGR [Reset] + hFlush stdout + +askForConfirmation :: + ExceptT (Error (CardanoBlock StandardCrypto)) IO a -> + String -> + ExceptT (Error (CardanoBlock StandardCrypto)) IO a +askForConfirmation act infoMsg = do + lift $ putColored Yellow False $ "I'm going to " <> infoMsg <> ". Continue? (Y/n) " + answer <- lift $ getLine + case map toLower answer of + "y" -> act + _ -> throwError Cancelled + +-- | Ask before deleting +wipePath :: FilePath -> ExceptT (Error (CardanoBlock StandardCrypto)) IO () +wipePath fp = do + exists <- lift $ D.doesDirectoryExist fp + ( if exists + then flip askForConfirmation ("wipe the path " <> fp) + else id + ) + (lift $ D.removePathForcibly fp >> D.createDirectoryIfMissing True fp) + +toDiskSnapshot :: + FilePath -> ExceptT (Error (CardanoBlock StandardCrypto)) IO (SomeHasFS IO, DiskSnapshot) +toDiskSnapshot fp@(F.splitFileName . maybeRemoveTrailingSlash -> (snapPath, snapName)) = + maybe + (throwError $ BadDirectoryName fp) + (pure . (pathToHasFS snapPath,)) + $ snapshotFromPath snapName + +-- | Given a filepath pointing to a snapshot (with or without a trailing slash), produce: +-- +-- * A HasFS at the snapshot directory +pathToHasFS :: FilePath -> SomeHasFS IO +pathToHasFS (maybeRemoveTrailingSlash -> path) = + SomeHasFS $ ioHasFS $ MountPoint path + +maybeRemoveTrailingSlash :: String -> String +maybeRemoveTrailingSlash s = case last s of + '/' -> init s + '\\' -> init s + _ -> s + +defaultLMDBLimits :: V1.LMDBLimits +defaultLMDBLimits = + V1.LMDBLimits + { V1.lmdbMapSize = 16 * 1024 * 1024 * 1024 + , V1.lmdbMaxDatabases = 10 + , V1.lmdbMaxReaders = 16 + } diff --git a/ouroboros-consensus-cardano/changelog.d/20250904_111240_javier.sagredo_lsm_sc_da.md b/ouroboros-consensus-cardano/changelog.d/20250904_111240_javier.sagredo_lsm_sc_da.md new file mode 100644 index 0000000000..41b1fa620d --- /dev/null +++ b/ouroboros-consensus-cardano/changelog.d/20250904_111240_javier.sagredo_lsm_sc_da.md @@ -0,0 +1,25 @@ + + + + + diff --git a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal index 58524d8aee..98286f47d7 100644 --- a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal +++ b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal @@ -98,10 +98,10 @@ library Ouroboros.Consensus.Cardano.Block Ouroboros.Consensus.Cardano.CanHardFork Ouroboros.Consensus.Cardano.Condense - Ouroboros.Consensus.Cardano.StreamingLedgerTables Ouroboros.Consensus.Cardano.Ledger Ouroboros.Consensus.Cardano.Node Ouroboros.Consensus.Cardano.QueryHF + Ouroboros.Consensus.Cardano.StreamingLedgerTables Ouroboros.Consensus.Shelley.Crypto Ouroboros.Consensus.Shelley.Eras Ouroboros.Consensus.Shelley.HFEras @@ -138,14 +138,10 @@ library bytestring >=0.10 && <0.13, cardano-binary, cardano-crypto, - fs-api, - contra-tracer, - directory, cardano-crypto-class ^>=2.2, cardano-crypto-wrapper, cardano-ledger-allegra ^>=1.8, cardano-ledger-alonzo ^>=1.14, - random, cardano-ledger-api ^>=1.12, cardano-ledger-babbage ^>=1.12, cardano-ledger-binary ^>=1.7, @@ -154,8 +150,6 @@ library cardano-ledger-core ^>=1.18, cardano-ledger-dijkstra ^>=0.1, cardano-ledger-mary ^>=1.9, - temporary, - resource-registry, cardano-ledger-shelley ^>=1.17, cardano-prelude, cardano-protocol-tpraos ^>=1.4.1, @@ -163,10 +157,13 @@ library cardano-strict-containers, cborg ^>=0.2.2, containers >=0.5 && <0.8, + contra-tracer, crypton, deepseq, + directory, filepath, formatting >=6.3 && <7.3, + fs-api, measures, mempack, microlens, @@ -175,12 +172,15 @@ library ouroboros-consensus ^>=0.27, ouroboros-consensus-protocol ^>=0.12, ouroboros-network-api ^>=0.16, + random, + resource-registry, serialise ^>=0.2, singletons ^>=3.0, small-steps, sop-core ^>=0.5, sop-extras ^>=0.4, strict-sop-core ^>=0.1, + temporary, text, these ^>=1.2, validation, @@ -698,10 +698,10 @@ executable snapshot-converter hs-source-dirs: app main-is: snapshot-converter.hs build-depends: + ansi-terminal, base, - bytestring, cardano-crypto-class, - contra-tracer, + directory, filepath, fs-api, mtl, @@ -711,8 +711,8 @@ executable snapshot-converter ouroboros-consensus-cardano:unstable-cardano-tools, resource-registry, serialise, - sop-core, - temporary, + terminal-progress-bar, + text, with-utf8, other-modules: diff --git a/ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md b/ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md new file mode 100644 index 0000000000..41b1fa620d --- /dev/null +++ b/ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md @@ -0,0 +1,25 @@ + + + + + diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 5f91e83ccb..ac70f1cec8 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -280,7 +280,6 @@ library Ouroboros.Consensus.Util.Args Ouroboros.Consensus.Util.Assert Ouroboros.Consensus.Util.CBOR - Ouroboros.Consensus.Util.StreamingLedgerTables Ouroboros.Consensus.Util.CRC Ouroboros.Consensus.Util.CallStack Ouroboros.Consensus.Util.Condense @@ -300,6 +299,7 @@ library Ouroboros.Consensus.Util.Orphans Ouroboros.Consensus.Util.RedundantConstraints Ouroboros.Consensus.Util.STM + Ouroboros.Consensus.Util.StreamingLedgerTables Ouroboros.Consensus.Util.Time Ouroboros.Consensus.Util.Versioned From dceb55c08e6a57ad73ce8244b4d04d2e84d4a4fa Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Thu, 4 Sep 2025 15:12:15 +0200 Subject: [PATCH 3/3] Delete `readAll` functionality in LedgerDB backends --- ...0250904_111212_javier.sagredo_lsm_sc_da.md | 6 ++-- .../Storage/LedgerDB/V1/BackingStore/API.hs | 13 --------- .../LedgerDB/V1/BackingStore/Impl/InMemory.hs | 5 ---- .../LedgerDB/V1/BackingStore/Impl/LMDB.hs | 29 ------------------- .../Consensus/Storage/LedgerDB/V2/InMemory.hs | 3 -- .../Consensus/Storage/LedgerDB/V2/LSM.hs | 5 ---- .../Storage/LedgerDB/V2/LedgerSeq.hs | 4 --- 7 files changed, 2 insertions(+), 63 deletions(-) diff --git a/ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md b/ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md index 41b1fa620d..bc4d2f1162 100644 --- a/ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md +++ b/ouroboros-consensus/changelog.d/20250904_111212_javier.sagredo_lsm_sc_da.md @@ -17,9 +17,7 @@ For top level release notes, leave all the headers commented out. - A bullet item for the Non-Breaking category. --> - +- Removed the `readAll` functionality from the LedgerDB backends now that it became unnecessary as tables conversion is now streamed. diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/API.hs index 1109be6a3c..098c26dd3d 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/API.hs @@ -54,7 +54,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API -- * 🧪 Testing , bsRead - , bsReadAll ) where import Cardano.Slotting.Slot (SlotNo, WithOrigin (..)) @@ -169,9 +168,6 @@ data BackingStoreValueHandle m keys key values = BackingStoreValueHandle -- itself is idempotent. , bsvhRangeRead :: !(ReadHint values -> RangeQuery keys -> m (values, Maybe key)) -- ^ See 'RangeQuery' - , bsvhReadAll :: !(ReadHint values -> m values) - -- ^ Costly read all operation, not to be used in Consensus but only in - -- snapshot-converter executable. , bsvhRead :: !(ReadHint values -> keys -> m values) -- ^ Read the given keys from the handle -- @@ -206,7 +202,6 @@ castBackingStoreValueHandle f g h bsvh = BackingStoreValueHandle { bsvhAtSlot , bsvhClose - , bsvhReadAll = \rhint -> f <$> bsvhReadAll rhint , bsvhRangeRead = \rhint (RangeQuery prev count) -> fmap (second (fmap h) . first f) . bsvhRangeRead rhint $ RangeQuery (fmap g prev) count , bsvhRead = \rhint -> fmap f . bsvhRead rhint . g @@ -215,7 +210,6 @@ castBackingStoreValueHandle f g h bsvh = where BackingStoreValueHandle { bsvhClose - , bsvhReadAll , bsvhAtSlot , bsvhRangeRead , bsvhRead @@ -233,13 +227,6 @@ bsRead store rhint keys = withBsValueHandle store $ \vh -> do values <- bsvhRead vh rhint keys pure (bsvhAtSlot vh, values) -bsReadAll :: - MonadThrow m => - BackingStore m keys key values diff -> - ReadHint values -> - m values -bsReadAll store rhint = withBsValueHandle store $ \vh -> bsvhReadAll vh rhint - -- | A 'IOLike.bracket'ed 'bsValueHandle' withBsValueHandle :: MonadThrow m => diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/InMemory.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/InMemory.hs index 81231c0243..29324bfb7b 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/InMemory.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/InMemory.hs @@ -184,11 +184,6 @@ newInMemoryBackingStore tracer (SnapshotsFS (SomeHasFS fs)) initialization = do pure $ rangeRead rq values traceWith tracer $ BSValueHandleTrace Nothing BSVHRangeRead pure r - , bsvhReadAll = \_ -> - atomically $ do - guardClosed ref - guardHandleClosed refHandleClosed - pure values , bsvhRead = \_ keys -> do traceWith tracer $ BSValueHandleTrace Nothing BSVHReading r <- atomically $ do diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/LMDB.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/LMDB.hs index 919db97859..e66ca00d27 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/LMDB.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/BackingStore/Impl/LMDB.hs @@ -9,7 +9,6 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeOperators #-} -- | A 'BackingStore' implementation based on [LMDB](http://www.lmdb.tech/doc/). @@ -42,7 +41,6 @@ import Data.Functor.Contravariant ((>$<)) import Data.Map (Map) import qualified Data.Map.Strict as Map import Data.MemPack -import Data.Proxy import qualified Data.Set as Set import qualified Data.Text as Strict import qualified Database.LMDB.Simple as LMDB @@ -158,19 +156,6 @@ getDb :: LMDB.Transaction mode (LMDBMK k v) getDb (K2 name) = LMDBMK name <$> LMDB.getDatabase (Just name) -readAll :: - (Ord (TxIn l), MemPack (TxIn l), IndexedMemPack idx (TxOut l)) => - Proxy l -> - idx -> - LMDBMK (TxIn l) (TxOut l) -> - LMDB.Transaction mode (ValuesMK (TxIn l) (TxOut l)) -readAll _ st (LMDBMK _ dbMK) = - ValuesMK - <$> Bridge.runCursorAsTransaction' - st - LMDB.Cursor.cgetAll - dbMK - -- | @'rangeRead' rq dbMK@ performs a range read of @rqCount rq@ -- values from database @dbMK@, starting from some key depending on @rqPrev rq@. -- @@ -659,25 +644,11 @@ mkLMDBBackingStoreValueHandle db = do Trace.traceWith tracer API.BSVHStatted pure res - bsvhReadAll :: l EmptyMK -> m (LedgerTables l ValuesMK) - bsvhReadAll st = - Status.withReadAccess dbStatusLock (throwIO LMDBErrClosed) $ do - Status.withReadAccess vhStatusLock (throwIO (LMDBErrNoValueHandle vhId)) $ do - Trace.traceWith tracer API.BSVHRangeReading - res <- - liftIO $ - TrH.submitReadOnly trh $ - let dbMK = getLedgerTables dbBackingTables - in LedgerTables <$> readAll (Proxy @l) st dbMK - Trace.traceWith tracer API.BSVHRangeRead - pure res - bsvh = API.BackingStoreValueHandle { API.bsvhAtSlot = initSlot , API.bsvhClose = bsvhClose , API.bsvhRead = bsvhRead - , API.bsvhReadAll = bsvhReadAll , API.bsvhRangeRead = bsvhRangeRead , API.bsvhStat = bsvhStat } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs index 7fe11e86a6..a7000e4bb0 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs @@ -122,9 +122,6 @@ newInMemoryLedgerTablesHandle tracer someFS@(SomeHasFS hasFS) l = do let m' = Map.take t . (maybe id (\g -> snd . Map.split g) f) $ m in pure (LedgerTables (ValuesMK m'), fst <$> Map.lookupMax m') ) - , readAll = \_ -> do - hs <- readTVarIO tv - guardClosed hs pure , pushDiffs = \st0 !diffs -> atomically $ modifyTVar diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs index 8013cef224..af0bec8f87 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs @@ -283,11 +283,6 @@ newLSMLedgerTablesHandle tracer rr (resKey, t) = do Map.empty $ V.zip vec' res , readRange = implReadRange t - , readAll = \st -> - let readAll' m = do - (v, n) <- implReadRange t st (m, 100000) - maybe (pure v) (fmap (ltliftA2 unionValues v) . readAll' . Just) n - in readAll' Nothing , pushDiffs = const (implPushDiffs t) , takeHandleSnapshot = \_ snapshotName -> do LSM.saveSnapshot diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs index 141a610b4e..34fa1f2c8b 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs @@ -106,10 +106,6 @@ data LedgerTablesHandle m l = LedgerTablesHandle -- back into the next iteration of the range read. If the function returns -- Nothing, it means the read returned no results, or in other words, we -- reached the end of the ledger tables. - , readAll :: !(l EmptyMK -> m (LedgerTables l ValuesMK)) - -- ^ Costly read all operation, not to be used in Consensus but only in - -- snapshot-converter executable. The values will be read as if they were from - -- the same era as the given ledger state. , pushDiffs :: !(forall mk. l mk -> l DiffMK -> m ()) -- ^ Push some diffs into the ledger tables handle. --