From d230b9ec01d12fde63bb78bab8ed22336bc3d750 Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Fri, 29 Aug 2025 11:25:35 +0200 Subject: [PATCH 1/5] Implement streaming of Ledger Tables Co-authored-by: Alexander Esgen --- .../ouroboros-consensus-cardano.cabal | 8 + .../Cardano/StreamingLedgerTables.hs | 178 +++++++++ ouroboros-consensus/ouroboros-consensus.cabal | 2 + .../Consensus/Util/StreamingLedgerTables.hs | 358 ++++++++++++++++++ 4 files changed, 546 insertions(+) create mode 100644 ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs diff --git a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal index 7c70e19286..c119064246 100644 --- a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal +++ b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal @@ -98,6 +98,7 @@ library Ouroboros.Consensus.Cardano.Block Ouroboros.Consensus.Cardano.CanHardFork Ouroboros.Consensus.Cardano.Condense + Ouroboros.Consensus.Cardano.StreamingLedgerTables Ouroboros.Consensus.Cardano.Ledger Ouroboros.Consensus.Cardano.Node Ouroboros.Consensus.Cardano.QueryHF @@ -137,6 +138,9 @@ library bytestring >=0.10 && <0.13, cardano-binary, cardano-crypto, + fs-api, + contra-tracer, + directory, cardano-crypto-class ^>=2.2, cardano-crypto-wrapper, cardano-ledger-allegra ^>=1.8, @@ -149,6 +153,8 @@ library cardano-ledger-core ^>=1.18, cardano-ledger-dijkstra ^>=0.1, cardano-ledger-mary ^>=1.9, + temporary, + resource-registry, cardano-ledger-shelley ^>=1.17, cardano-prelude, cardano-protocol-tpraos ^>=1.4.1, @@ -159,6 +165,7 @@ library contra-tracer, crypton, deepseq, + filepath, formatting >=6.3 && <7.3, measures, mempack, @@ -168,6 +175,7 @@ library ouroboros-consensus ^>=0.28, ouroboros-consensus-protocol ^>=0.13, ouroboros-network-api ^>=0.16, + random, serialise ^>=0.2, singletons ^>=3.0, small-steps, diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs new file mode 100644 index 0000000000..256a58b827 --- /dev/null +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs @@ -0,0 +1,178 @@ +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeOperators #-} + +module Ouroboros.Consensus.Cardano.StreamingLedgerTables + ( fromInMemory + , fromLMDB + , toLMDB + , toInMemory + ) where + +import Cardano.Ledger.BaseTypes (WithOrigin (..)) +import Cardano.Ledger.Binary +import Cardano.Ledger.Core (ByronEra, Era, eraDecoder, toEraCBOR) +import qualified Cardano.Ledger.Shelley.API as SL +import qualified Cardano.Ledger.Shelley.LedgerState as SL +import qualified Cardano.Ledger.State as SL +import qualified Codec.CBOR.Encoding +import Control.ResourceRegistry +import Control.Tracer (nullTracer) +import Data.Proxy +import Data.SOP.BasicFunctors +import Data.SOP.Functors +import Data.SOP.Strict +import qualified Data.SOP.Telescope as Telescope +import qualified Data.Text as T +import Lens.Micro +import Ouroboros.Consensus.Byron.Ledger +import Ouroboros.Consensus.Cardano.Block +import Ouroboros.Consensus.Cardano.Ledger +import Ouroboros.Consensus.HardFork.Combinator +import Ouroboros.Consensus.HardFork.Combinator.State +import Ouroboros.Consensus.Ledger.Abstract +import Ouroboros.Consensus.Ledger.Tables.Utils (emptyLedgerTables) +import Ouroboros.Consensus.Shelley.Ledger +import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol () +import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API +import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB as LMDB +import Ouroboros.Consensus.Storage.LedgerDB.V2.Args +import Ouroboros.Consensus.Util.StreamingLedgerTables +import System.Directory +import System.FS.API +import System.FS.IO +import System.FilePath as FilePath +import System.IO.Temp +import System.Random + +type L = LedgerState (CardanoBlock StandardCrypto) + +fromInMemory :: FilePath -> L EmptyMK -> ResourceRegistry IO -> IO (YieldArgs L IO) +fromInMemory fp (HardForkLedgerState (HardForkState idx)) _ = + let + np :: + NP + (Current (Flip LedgerState EmptyMK) -.-> K (Decoders L)) + (CardanoEras StandardCrypto) + np = + (Fn $ const $ K $ error "Byron") + :* (Fn $ K . fromEra ShelleyTxOut . unFlip . currentState) + :* (Fn $ K . fromEra AllegraTxOut . unFlip . currentState) + :* (Fn $ K . fromEra MaryTxOut . unFlip . currentState) + :* (Fn $ K . fromEra AlonzoTxOut . unFlip . currentState) + :* (Fn $ K . fromEra BabbageTxOut . unFlip . currentState) + :* (Fn $ K . fromEra ConwayTxOut . unFlip . currentState) + :* (Fn $ K . fromEra DijkstraTxOut . unFlip . currentState) + :* Nil + in + pure $ + YieldInMemory + (SomeHasFS . ioHasFS) + fp + (hcollapse $ hap np $ Telescope.tip idx) + where + fromEra :: + forall proto era. + ShelleyCompatible proto era => + (TxOut (LedgerState (ShelleyBlock proto era)) -> CardanoTxOut StandardCrypto) -> + LedgerState (ShelleyBlock proto era) EmptyMK -> + Decoders L + fromEra toCardanoTxOut st = + let certInterns = + internsFromMap $ + shelleyLedgerState st + ^. SL.nesEsL + . SL.esLStateL + . SL.lsCertStateL + . SL.certDStateL + . SL.accountsL + . SL.accountsMapL + in Decoders + (eraDecoder @era decodeMemPack) + (eraDecoder @era $ toCardanoTxOut <$> decShareCBOR certInterns) + +fromLMDB :: FilePath -> LMDB.LMDBLimits -> L EmptyMK -> ResourceRegistry IO -> IO (YieldArgs L IO) +fromLMDB fp limits hint reg = do + let (dbPath, snapName) = splitFileName fp + tempDir <- getCanonicalTemporaryDirectory + let lmdbTemp = tempDir FilePath. "lmdb_streaming_in" + removePathForcibly lmdbTemp + _ <- + allocate + reg + (\_ -> System.Directory.createDirectory lmdbTemp) + (\_ -> removePathForcibly lmdbTemp) + (_, bs) <- + allocate + reg + ( \_ -> do + LMDB.newLMDBBackingStore + nullTracer + limits + (LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp) + (SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint dbPath) + (InitFromCopy hint (mkFsPath [snapName])) + ) + bsClose + (_, bsvh) <- allocate reg (\_ -> bsValueHandle bs) bsvhClose + pure (YieldLMDB 1000 bsvh) + +toLMDB :: + FilePath -> + LMDB.LMDBLimits -> + L EmptyMK -> + ResourceRegistry IO -> + IO (SinkArgs L IO) +toLMDB fp limits hint reg = do + let (snapDir, snapName) = splitFileName fp + tempDir <- getCanonicalTemporaryDirectory + let lmdbTemp = tempDir FilePath. "lmdb_streaming_out" + removePathForcibly lmdbTemp + _ <- + allocate reg (\_ -> System.Directory.createDirectory lmdbTemp) (\_ -> removePathForcibly lmdbTemp) + (_, bs) <- + allocate + reg + ( \_ -> + LMDB.newLMDBBackingStore + nullTracer + limits + (LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp) + (SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint snapDir) + (InitFromValues (At 0) hint emptyLedgerTables) + ) + bsClose + pure $ SinkLMDB 1000 (bsWrite bs) (\h -> bsCopy bs h (mkFsPath [snapName, "tables"])) + +toInMemory :: + FilePath -> + L EmptyMK -> + ResourceRegistry IO -> + IO (SinkArgs L IO) +toInMemory fp (HardForkLedgerState (HardForkState idx)) _ = do + currDir <- getCurrentDirectory + let + np = + (Fn $ const $ K $ encOne (Proxy @ByronEra)) + :* (Fn $ const $ K $ encOne (Proxy @ShelleyEra)) + :* (Fn $ const $ K $ encOne (Proxy @AllegraEra)) + :* (Fn $ const $ K $ encOne (Proxy @MaryEra)) + :* (Fn $ const $ K $ encOne (Proxy @AlonzoEra)) + :* (Fn $ const $ K $ encOne (Proxy @BabbageEra)) + :* (Fn $ const $ K $ encOne (Proxy @ConwayEra)) + :* (Fn $ const $ K $ encOne (Proxy @DijkstraEra)) + :* Nil + pure $ + uncurry + (SinkInMemory 1000) + (hcollapse $ hap np $ Telescope.tip idx) + (SomeHasFS $ ioHasFS $ MountPoint currDir) + fp + where + encOne :: + forall era. + Era era => + Proxy era -> + (TxIn L -> Codec.CBOR.Encoding.Encoding, TxOut L -> Codec.CBOR.Encoding.Encoding) + encOne _ = + (toEraCBOR @era . encodeMemPack, toEraCBOR @era . eliminateCardanoTxOut (const encodeMemPack)) diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index e3799c3dac..519f30d6b4 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -281,6 +281,7 @@ library Ouroboros.Consensus.Util.Args Ouroboros.Consensus.Util.Assert Ouroboros.Consensus.Util.CBOR + Ouroboros.Consensus.Util.StreamingLedgerTables Ouroboros.Consensus.Util.CRC Ouroboros.Consensus.Util.CallStack Ouroboros.Consensus.Util.Condense @@ -326,6 +327,7 @@ library deepseq, diff-containers >=1.2, filelock, + filepath, fingertree-rm >=1.0, fs-api ^>=0.4, hashable, diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs new file mode 100644 index 0000000000..5f722deea7 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs @@ -0,0 +1,358 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ViewPatterns #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Ouroboros.Consensus.Util.StreamingLedgerTables + ( stream + , yield + , sink + , YieldArgs (..) + , SinkArgs (..) + , Decoders (..) + ) where + +import Cardano.Slotting.Slot +import Codec.CBOR.Decoding (Decoder, decodeBreakOr, decodeListLen, decodeMapLenOrIndef) +import Codec.CBOR.Encoding (Encoding, encodeBreak, encodeListLen, encodeMapLenIndef) +import Codec.CBOR.Read +import Codec.CBOR.Write +import Control.Concurrent.Class.MonadMVar +import Control.Monad (replicateM_, unless) +import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadST +import Control.Monad.Class.MonadSTM +import Control.Monad.Class.MonadThrow +import Control.Monad.Except +import Control.Monad.State.Strict +import Control.ResourceRegistry +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS +import Data.ByteString.Builder.Extra (defaultChunkSize) +import qualified Data.Map.Strict as Map +import Data.MemPack +import Data.Proxy +import qualified Data.Set as Set +import qualified Data.Text as T +import qualified Data.Vector as V +import Ouroboros.Consensus.Ledger.Abstract +import Ouroboros.Consensus.Ledger.Tables.Diff +import Ouroboros.Consensus.Storage.LedgerDB.API +import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API +import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq +import Ouroboros.Consensus.Util.IndexedMemPack +import Ouroboros.Network.Block +import Streaming +import qualified Streaming as S +import qualified Streaming.Prelude as S +import System.FS.API +import System.FS.CRC +import qualified System.FilePath as F + +data Decoders l + = Decoders + (forall s. Codec.CBOR.Decoding.Decoder s (TxIn l)) + (forall s. Codec.CBOR.Decoding.Decoder s (TxOut l)) + +stream :: + Constraints l m => + l EmptyMK -> + (l EmptyMK -> ResourceRegistry m -> m (YieldArgs l m)) -> + (l EmptyMK -> ResourceRegistry m -> m (SinkArgs l m)) -> + ExceptT DeserialiseFailure m (Maybe CRC, Maybe CRC) +stream st mYieldArgs mSinkArgs = + ExceptT $ + withRegistry $ \reg -> do + yArgs <- mYieldArgs st reg + sArgs <- mSinkArgs st reg + runExceptT $ yield yArgs st $ sink sArgs st + +type Yield l m = + l EmptyMK -> + ( ( Stream (Of (TxIn l, TxOut l)) (ExceptT DeserialiseFailure m) (Stream (Of ByteString) m (Maybe CRC)) -> + ExceptT DeserialiseFailure m (Stream (Of ByteString) m (Maybe CRC, Maybe CRC)) + ) + ) -> + ExceptT DeserialiseFailure m (Maybe CRC, Maybe CRC) + +type Sink l m r = + l EmptyMK -> + Stream (Of (TxIn l, TxOut l)) (ExceptT DeserialiseFailure m) (Stream (Of ByteString) m (Maybe CRC)) -> + ExceptT DeserialiseFailure m (Stream (Of ByteString) m (Maybe CRC, Maybe CRC)) + +instance MonadST m => MonadST (ExceptT e m) where + stToIO = lift . stToIO + +data YieldArgs l m + = -- | Yield an in-memory snapshot + YieldInMemory + -- | How to make a SomeHasFS for @m@ + (MountPoint -> SomeHasFS m) + -- | The file path at which the HasFS has to be opened + FilePath + (Decoders l) + | -- | Yield an LMDB snapshot + YieldLMDB + Int + (LedgerBackingStoreValueHandle m l) + +yield :: Constraints l m => YieldArgs l m -> Yield l m +yield = \case + YieldInMemory mkFs fp (Decoders decK decV) -> yieldInMemoryS mkFs fp decK decV + YieldLMDB chunkSize valueHandle -> yieldLmdbS chunkSize valueHandle + +type Constraints l m = + ( LedgerSupportsV1LedgerDB l + , LedgerSupportsV2LedgerDB l + , HasLedgerTables l + , GetTip l + , IOLike m + ) + +sink :: + Constraints l m => + SinkArgs l m -> Sink l m r +sink = \case + SinkLMDB chunkSize write copy -> sinkLmdbS chunkSize write copy + SinkInMemory chunkSize encK encV shfs fp -> sinkInMemoryS chunkSize encK encV shfs fp + +data SinkArgs l m + = SinkInMemory + Int + (TxIn l -> Encoding) + (TxOut l -> Encoding) + (SomeHasFS m) + FilePath + | SinkLMDB + -- | Chunk size + Int + -- | bsWrite + (SlotNo -> (l EmptyMK, l EmptyMK) -> LedgerTables l DiffMK -> m ()) + (l EmptyMK -> m ()) + +{------------------------------------------------------------------------------- + Yielding InMemory +-------------------------------------------------------------------------------} + +streamingFile :: + forall m. + MonadThrow m => + SomeHasFS m -> + FsPath -> + ( Stream (Of ByteString) m (Maybe CRC) -> + ExceptT DeserialiseFailure m (Stream (Of ByteString) m (Maybe CRC, Maybe CRC)) + ) -> + ExceptT DeserialiseFailure m (Maybe CRC, Maybe CRC) +streamingFile (SomeHasFS fs') path cont = + ExceptT $ withFile fs' path ReadMode $ \hdl -> + runExceptT $ cont (getBS hdl initCRC) >>= noRemainingBytes + where + getBS h !crc = do + bs <- S.lift $ hGetSome fs' h (fromIntegral defaultChunkSize) + if BS.null bs + then pure (Just crc) + else do + S.yield bs + getBS h $! updateCRC bs crc + + noRemainingBytes s = + lift (S.uncons s) >>= \case + Nothing -> lift $ S.effects s + Just (BS.null -> True, s') -> noRemainingBytes s' + Just _ -> throwError $ DeserialiseFailure 0 "Remaining bytes" + +yieldCborMapS :: + forall m a b. + MonadST m => + (forall s. Decoder s a) -> + (forall s. Decoder s b) -> + Stream (Of ByteString) m (Maybe CRC) -> + Stream (Of (a, b)) (ExceptT DeserialiseFailure m) (Stream (Of ByteString) m (Maybe CRC)) +yieldCborMapS decK decV = execStateT $ do + hoist lift (decodeCbor decodeListLen >> decodeCbor decodeMapLenOrIndef) >>= \case + Nothing -> go + Just n -> replicateM_ n yieldKV + where + yieldKV = do + kv <- hoist lift $ decodeCbor $ (,) <$> decK <*> decV + lift $ S.yield kv + + go = do + doBreak <- hoist lift $ decodeCbor decodeBreakOr + unless doBreak $ yieldKV *> go + + decodeCbor dec = + StateT $ \s -> go' s =<< lift (stToIO (deserialiseIncremental dec)) + where + go' s = \case + Partial k -> + lift (S.next s) >>= \case + Right (bs, s') -> go' s' =<< lift (stToIO (k (Just bs))) + Left r -> go' (pure r) =<< lift (stToIO (k Nothing)) + Done bs _off a -> pure (a, S.yield bs *> s) + Fail _bs _off err -> throwError err + +yieldInMemoryS :: + (MonadThrow m, MonadST m) => + (MountPoint -> SomeHasFS m) -> + FilePath -> + (forall s. Decoder s (TxIn l)) -> + (forall s. Decoder s (TxOut l)) -> + Yield l m +yieldInMemoryS mkFs (F.splitFileName -> (fp, fn)) decK decV _ k = + streamingFile (mkFs $ MountPoint fp) (mkFsPath [fn]) $ \s -> do + k $ yieldCborMapS decK decV s + +{------------------------------------------------------------------------------- + Yielding OnDisk backends +-------------------------------------------------------------------------------} + +yieldLmdbS :: + Monad m => + Int -> + LedgerBackingStoreValueHandle m l -> + Yield l m +yieldLmdbS readChunkSize bsvh hint k = do + r <- k (go (RangeQuery Nothing readChunkSize)) + lift $ S.effects r + where + go p = do + (LedgerTables (ValuesMK values), mx) <- lift $ S.lift $ bsvhRangeRead bsvh hint p + case mx of + Nothing -> pure $ pure Nothing + Just x -> do + S.each $ Map.toList values + go (RangeQuery (Just . LedgerTables . KeysMK $ Set.singleton x) readChunkSize) + +yieldLsmS :: + Monad m => + Int -> + LedgerTablesHandle m l -> + Yield l m +yieldLsmS readChunkSize tb hint k = do + r <- k (go (Nothing, readChunkSize)) + lift $ S.effects r + where + go p = do + (LedgerTables (ValuesMK values), mx) <- lift $ S.lift $ readRange tb hint p + if Map.null values + then pure $ pure Nothing + else do + S.each $ Map.toList values + go (mx, readChunkSize) + +{------------------------------------------------------------------------------- + Sink +-------------------------------------------------------------------------------} + +sinkLmdbS :: + forall m l r. + (Ord (TxIn l), GetTip l, Monad m) => + Int -> + (SlotNo -> (l EmptyMK, l EmptyMK) -> LedgerTables l DiffMK -> m ()) -> + (l EmptyMK -> m ()) -> + Sink l m r +sinkLmdbS writeChunkSize bs copyTo hint s = do + r <- go writeChunkSize mempty s + lift $ copyTo hint + pure (fmap (,Nothing) r) + where + sl = withOrigin (error "unreachable") id $ pointSlot $ getTip hint + + go 0 m s' = do + lift $ bs sl (hint, hint) (LedgerTables $ DiffMK $ fromMapInserts m) + go writeChunkSize mempty s' + go n m s' = do + mbs <- S.uncons s' + case mbs of + Nothing -> do + lift $ bs sl (hint, hint) (LedgerTables $ DiffMK $ fromMapInserts m) + S.effects s' + Just ((k, v), s'') -> + go (n - 1) (Map.insert k v m) s'' + +sinkLsmS :: + forall l m r. + ( MonadAsync m + , MonadMVar m + , MonadThrow (STM m) + , MonadMask m + , MonadST m + , MonadEvaluate m + , MemPack (TxIn l) + , IndexedMemPack (l EmptyMK) (TxOut l) + ) => + Int -> + String -> + Session m -> + Sink l m r +sinkLsmS writeChunkSize snapName session st s = do + tb :: UTxOTable m <- lift $ newTable session + r <- go tb writeChunkSize mempty s + lift $ + saveSnapshot + (toSnapshotName snapName) + (SnapshotLabel $ T.pack "UTxO table") + tb + lift $ closeTable tb + pure (fmap (,Nothing) r) + where + go tb 0 m s' = do + lift $ + inserts tb $ + V.fromList [(toTxInBytes (Proxy @l) k, toTxOutBytes st v, Nothing) | (k, v) <- m] + go tb writeChunkSize mempty s' + go tb n m s' = do + mbs <- S.uncons s' + case mbs of + Nothing -> do + lift $ + inserts tb $ + V.fromList + [(toTxInBytes (Proxy @l) k, toTxOutBytes st v, Nothing) | (k, v) <- m] + S.effects s' + Just (item, s'') -> go tb (n - 1) (item : m) s'' + +sinkInMemoryS :: + forall m l r. + MonadThrow m => + Int -> + (TxIn l -> Encoding) -> + (TxOut l -> Encoding) -> + SomeHasFS m -> + FilePath -> + Sink l m r +sinkInMemoryS writeChunkSize encK encV (SomeHasFS fs) fp _ s = + ExceptT $ withFile fs (mkFsPath [fp]) (WriteMode MustBeNew) $ \hdl -> do + let bs = toStrictByteString (encodeListLen 1 <> encodeMapLenIndef) + let !crc0 = updateCRC bs initCRC + void $ hPutSome fs hdl bs + e <- runExceptT $ go hdl crc0 writeChunkSize mempty s + case e of + Left err -> pure $ Left err + Right (r, crc1) -> do + let bs1 = toStrictByteString encodeBreak + void $ hPutSome fs hdl bs1 + let !crc2 = updateCRC bs1 crc1 + pure $ Right (fmap (,Just crc2) r) + where + go tb !crc 0 m s' = do + let bs = toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- reverse m] + lift $ void $ hPutSome fs tb bs + let !crc1 = updateCRC bs crc + go tb crc1 writeChunkSize mempty s' + go tb !crc n m s' = do + mbs <- S.uncons s' + case mbs of + Nothing -> do + let bs = toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- reverse m] + lift $ void $ hPutSome fs tb bs + let !crc1 = updateCRC bs crc + (,crc1) <$> S.effects s' + Just (item, s'') -> go tb crc (n - 1) (item : m) s'' From 2e8777a64c6a9094f4df9d8736a599f1d0d732ea Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Fri, 29 Aug 2025 11:26:47 +0200 Subject: [PATCH 2/5] Re-implement snapshot-converter --- .../app/snapshot-converter.hs | 636 ++++++++++++------ .../ouroboros-consensus-cardano.cabal | 19 +- ouroboros-consensus/ouroboros-consensus.cabal | 2 +- .../Consensus/Util/StreamingLedgerTables.hs | 44 +- 4 files changed, 439 insertions(+), 262 deletions(-) diff --git a/ouroboros-consensus-cardano/app/snapshot-converter.hs b/ouroboros-consensus-cardano/app/snapshot-converter.hs index 7c3eba6b4f..8aacc2a52a 100644 --- a/ouroboros-consensus-cardano/app/snapshot-converter.hs +++ b/ouroboros-consensus-cardano/app/snapshot-converter.hs @@ -1,9 +1,10 @@ -{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE ViewPatterns #-} module Main (main) where @@ -13,261 +14,480 @@ import Cardano.Tools.DBAnalyser.HasAnalysis (mkProtocolInfo) import Codec.Serialise import qualified Control.Monad as Monad import Control.Monad.Except -import qualified Control.Monad.Trans as Trans (lift) -import Control.ResourceRegistry (ResourceRegistry) -import qualified Control.ResourceRegistry as RR -import Control.Tracer (nullTracer) +import Control.Monad.Trans (lift) +import Control.ResourceRegistry import DBAnalyser.Parsers import Data.Bifunctor -import qualified Data.ByteString.Builder as BS -import qualified Data.SOP.Dict as Dict +import Data.Char (toLower) +import qualified Data.Text.Lazy as T import Main.Utf8 import Options.Applicative +import Options.Applicative.Help (Doc, line) import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Cardano.Block +import Ouroboros.Consensus.Cardano.StreamingLedgerTables import Ouroboros.Consensus.Config import Ouroboros.Consensus.Ledger.Basics import Ouroboros.Consensus.Ledger.Extended -import Ouroboros.Consensus.Ledger.SupportsProtocol -import Ouroboros.Consensus.Ledger.Tables.Utils import Ouroboros.Consensus.Node.ProtocolInfo -import Ouroboros.Consensus.Storage.LedgerDB import Ouroboros.Consensus.Storage.LedgerDB.Snapshots -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Args as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore as V1 import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.DbChangelog as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Lock as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Snapshots as V1 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as V2 -import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq as V2 import Ouroboros.Consensus.Util.CRC import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.StreamingLedgerTables +import System.Console.ANSI +import qualified System.Directory as D +import System.Exit import System.FS.API -import System.FS.API.Lazy import System.FS.CRC import System.FS.IO -import System.FilePath (splitFileName) -import System.IO.Temp +import System.FilePath (splitDirectories) +import qualified System.FilePath as F +import System.IO +import System.ProgressBar data Format - = Legacy - | Mem - | LMDB + = Mem FilePath + | LMDB FilePath deriving (Show, Read) data Config = Config { from :: Format -- ^ Which format the input snapshot is in - , inpath :: FilePath - -- ^ Path to the input snapshot , to :: Format -- ^ Which format the output snapshot must be in - , outpath :: FilePath - -- ^ Path to the output snapshot } getCommandLineConfig :: IO (Config, CardanoBlockArgs) getCommandLineConfig = execParser $ info - ((,) <$> parseConfig <*> parseCardanoArgs <**> helper) - (fullDesc <> progDesc "Utility for converting snapshots to and from UTxO-HD") - -parseConfig :: Parser Config -parseConfig = - Config - <$> argument - auto - ( mconcat - [ help "From format (Legacy, Mem or LMDB)" - , metavar "FORMAT-IN" - ] - ) - <*> strArgument - ( mconcat - [ help "Input dir/file. Use relative paths like ./100007913" - , metavar "PATH-IN" - ] + ((,) <$> (Config <$> parseConfig In <*> parseConfig Out) <*> parseCardanoArgs <**> helper) + ( fullDesc + <> header "Utility for converting snapshots among the different snapshot formats used by cardano-node." + <> progDescDoc programDescription ) - <*> argument - auto - ( mconcat - [ help "To format (Legacy, Mem or LMDB)" - , metavar "FORMAT-OUT" + +programDescription :: Maybe Doc +programDescription = + Just $ + mconcat + [ "The input snapshot must correspond to a snapshot that was produced by " + , "a cardano-node, and thus follows the naming convention used in the node." + , line + , "This means in particular that the filepath to the snapshot must have as " + , "the last fragment a directory named after the slot number of the ledger " + , "state snapshotted, plus an optional suffix, joined by an underscore." + , line + , line + , "For the output, the same convention is enforced, so that the produced " + , "snapshot can be loaded right away by a cardano-node." + , line + , line + , "Note that snapshots that have a suffix will be preserved by the node. " + , "If you produce a snapshot with a suffix and you start a node with it, " + , "the node will take as many more snapshots as it is configured to take, " + , "but it will never delete your snapshot, because it has a suffix on the name." + , line + , "Therefore, for the most common use case it is advisable to create a " + , "snapshot without a suffix, as in:" + , line + , line + , "```" + , line + , "$ mkdir out" + , line + , "$ snapshot-converter ---in / ---out out/ --config " + , line + , "```" + ] + +data InOut = In | Out + +inoutForGroup :: InOut -> String +inoutForGroup In = "Input arguments:" +inoutForGroup Out = "Output arguments:" + +inoutForHelp :: InOut -> String -> Bool -> String +inoutForHelp In s b = + mconcat $ + ("Input " <> s) + : if b + then + [ ". Must be a filepath where the last fragment is named after the " + , "slot of the snapshotted state plus an optional suffix. Example: `1645330287_suffix`." ] - ) - <*> strArgument - ( mconcat - [ help "Output dir/file Use relative paths like ./100007913" - , metavar "PATH-OUT" + else [] +inoutForHelp Out s b = + mconcat $ + ("Output " <> s) + : if b + then + [ ". Must be a filepath where the last fragment is named after the " + , "slot of the snapshotted state plus an optional suffix. Example: `1645330287_suffix`." ] - ) + else [] --- Helpers +inoutForCommand :: InOut -> String -> String +inoutForCommand In = (++ "-in") +inoutForCommand Out = (++ "-out") -pathToDiskSnapshot :: FilePath -> Maybe (SomeHasFS IO, FsPath, DiskSnapshot) -pathToDiskSnapshot path = (SomeHasFS $ ioHasFS $ MountPoint dir,mkFsPath [file],) <$> snapshotFromPath file - where - (dir, file) = splitFileName path +parseConfig :: InOut -> Parser Format +parseConfig io = + ( Mem + <$> parserOptionGroup + (inoutForGroup io) + (parsePath (inoutForCommand io "mem") (inoutForHelp io "snapshot dir" True)) + ) + <|> ( LMDB + <$> parserOptionGroup + (inoutForGroup io) + (parsePath (inoutForCommand io "lmdb") (inoutForHelp io "snapshot dir" True)) + ) -defaultLMDBLimits :: V1.LMDBLimits -defaultLMDBLimits = - V1.LMDBLimits - { V1.lmdbMapSize = 16 * 1024 * 1024 * 1024 - , V1.lmdbMaxDatabases = 10 - , V1.lmdbMaxReaders = 16 - } +parsePath :: String -> String -> Parser FilePath +parsePath optName strHelp = + strOption + ( mconcat + [ long optName + , help strHelp + , metavar "PATH" + ] + ) data Error blk = SnapshotError (SnapshotFailure blk) - | TablesCantDeserializeError DeserialiseFailure - | TablesTrailingBytes - | SnapshotFormatMismatch Format String - | ReadSnapshotCRCError FsPath CRCError + | BadDirectoryName FilePath + | WrongSlotDirectoryName FilePath SlotNo + | InvalidMetadata String + | BackendMismatch SnapshotBackend SnapshotBackend + | CRCMismatch CRC CRC + | ReadTablesError DeserialiseFailure + | Cancelled deriving Exception instance StandardHash blk => Show (Error blk) where show (SnapshotError err) = "Couldn't deserialize the snapshot. Are you running the same node version that created the snapshot? " <> show err - show (TablesCantDeserializeError err) = "Couldn't deserialize the tables: " <> show err - show TablesTrailingBytes = "Malformed tables, there are trailing bytes!" - show (SnapshotFormatMismatch expected err) = - "The input snapshot does not seem to correspond to the input format:\n\t" - <> show expected - <> "\n\tThe provided path " - <> err - show (ReadSnapshotCRCError fp err) = "An error occurred while reading the snapshot checksum at " <> show fp <> ": \n\t" <> show err - -checkSnapshotFileStructure :: Format -> FsPath -> SomeHasFS IO -> ExceptT (Error blk) IO () -checkSnapshotFileStructure m p (SomeHasFS fs) = case m of - Legacy -> want (doesFileExist fs) p "is NOT a file" - Mem -> newFormatCheck "tvar" - LMDB -> newFormatCheck "data.mdb" - where - want :: (FsPath -> IO Bool) -> FsPath -> String -> ExceptT (Error blk) IO () - want fileType path err = do - exists <- Trans.lift $ fileType path - Monad.unless exists $ throwError $ SnapshotFormatMismatch m err - - isDir = (doesDirectoryExist, [], "is NOT a directory") - hasTablesDir = (doesDirectoryExist, ["tables"], "DOES NOT contain a \"tables\" directory") - hasState = (doesFileExist, ["state"], "DOES NOT contain a \"state\" file") - hasTables tb = (doesFileExist, ["tables", tb], "DOES NOT contain a \"tables/" <> tb <> "\" file") - - newFormatCheck tb = - mapM_ - (\(doCheck, extra, err) -> want (doCheck fs) (p mkFsPath extra) err) - [ isDir - , hasTablesDir - , hasState - , hasTables tb + show (BadDirectoryName fp) = + mconcat + [ "Filepath " + , fp + , " is not an snapshot. The last fragment on the path should be" + , " named after the slot number of the state it contains and an" + , " optional suffix, such as `163470034` or `163470034_my-suffix`." ] + show (InvalidMetadata s) = "Metadata is invalid: " <> s + show (BackendMismatch b1 b2) = + mconcat + [ "Mismatched backend in snapshot. Reading as " + , show b1 + , " but snapshot is " + , show b2 + ] + show (WrongSlotDirectoryName fp sl) = + mconcat + [ "The name of the snapshot (\"" + , fp + , "\") does not correspond to the slot number of the state (" + , (show . unSlotNo $ sl) + , ")." + ] + show (CRCMismatch c1 c2) = + mconcat + [ "The input snapshot seems corrupted. Metadata has CRC " + , show c1 + , " but reading it gives CRC " + , show c2 + ] + show (ReadTablesError df) = + mconcat + ["Error when reading entries in the UTxO tables: ", show df] + show Cancelled = "Cancelled" + +data InEnv = InEnv + { inState :: LedgerState (CardanoBlock StandardCrypto) EmptyMK + , inFilePath :: FilePath + , inStream :: + LedgerState (CardanoBlock StandardCrypto) EmptyMK -> + ResourceRegistry IO -> + IO (YieldArgs (LedgerState (CardanoBlock StandardCrypto)) IO) + , inProgressMsg :: String + , inCRC :: CRC + , inSnapReadCRC :: Maybe CRC + } -load :: - forall blk. - ( CanStowLedgerTables (LedgerState blk) - , LedgerSupportsProtocol blk - , LedgerSupportsLedgerDB blk - ) => - Config -> - ResourceRegistry IO -> - CodecConfig blk -> - FilePath -> - ExceptT (Error blk) IO (ExtLedgerState blk EmptyMK, LedgerTables (ExtLedgerState blk) ValuesMK) -load config@Config{inpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS), path, ds)} rr ccfg tempFP = - case from config of - Legacy -> do - checkSnapshotFileStructure Legacy path fs - (st, checksumAsRead) <- - first unstowLedgerTables - <$> withExceptT - (SnapshotError . InitFailureRead . ReadSnapshotFailed) - (readExtLedgerState fs (decodeDiskExtLedgerState ccfg) decode path) - let crcPath = path <.> "checksum" - crcFileExists <- Trans.lift $ doesFileExist hasFS crcPath - Monad.when crcFileExists $ do - snapshotCRC <- - withExceptT (ReadSnapshotCRCError crcPath) $ - readCRC hasFS crcPath - Monad.when (checksumAsRead /= snapshotCRC) $ - throwError $ - SnapshotError $ - InitFailureRead ReadSnapshotDataCorruption - pure (forgetLedgerTables st, projectLedgerTables st) - Mem -> do - checkSnapshotFileStructure Mem path fs - (ls, _) <- withExceptT SnapshotError $ V2.loadSnapshot nullTracer rr ccfg fs ds - let h = V2.currentHandle ls - (V2.state h,) <$> Trans.lift (V2.readAll (V2.tables h)) - LMDB -> do - checkSnapshotFileStructure LMDB path fs - ((dbch, k, bstore), _) <- - withExceptT SnapshotError $ - V1.loadSnapshot - nullTracer - (V1.LMDBBackingStoreArgs tempFP defaultLMDBLimits Dict.Dict) - ccfg - (V1.SnapshotsFS fs) - rr - ds - values <- Trans.lift (V1.bsReadAll bstore (V1.changelogLastFlushedState dbch)) - _ <- Trans.lift $ RR.release k - pure (V1.current dbch, values) -load _ _ _ _ = error "Malformed input path!" - -store :: - ( CanStowLedgerTables (LedgerState blk) - , LedgerSupportsProtocol blk - , LedgerSupportsLedgerDB blk - ) => - Config -> - CodecConfig blk -> - (ExtLedgerState blk EmptyMK, LedgerTables (ExtLedgerState blk) ValuesMK) -> - SomeHasFS IO -> - IO () -store config@Config{outpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS), path, DiskSnapshot _ suffix)} ccfg (state, tbs) tempFS = - case to config of - Legacy -> do - crc <- - writeExtLedgerState - fs - (encodeDiskExtLedgerState ccfg) - path - (stowLedgerTables $ state `withLedgerTables` tbs) - withFile hasFS (path <.> "checksum") (WriteMode MustBeNew) $ \h -> - Monad.void $ hPutAll hasFS h . BS.toLazyByteString . BS.word32HexFixed $ getCRC crc - Mem -> do - lseq <- V2.empty state tbs $ V2.newInMemoryLedgerTablesHandle nullTracer fs - let h = V2.currentHandle lseq - Monad.void $ V2.implTakeSnapshot ccfg nullTracer fs suffix h - LMDB -> do - chlog <- newTVarIO (V1.empty state) - lock <- V1.mkLedgerDBLock - bs <- - V1.newLMDBBackingStore - nullTracer - defaultLMDBLimits - (V1.LiveLMDBFS tempFS) - (V1.SnapshotsFS fs) - (V1.InitFromValues (pointSlot $ getTip state) state tbs) - Monad.void $ V1.withReadLock lock $ do - V1.implTakeSnapshot chlog ccfg nullTracer (V1.SnapshotsFS fs) bs suffix -store _ _ _ _ = error "Malformed output path!" +data OutEnv = OutEnv + { outFilePath :: FilePath + , outStream :: + LedgerState (CardanoBlock StandardCrypto) EmptyMK -> + ResourceRegistry IO -> + IO (SinkArgs (LedgerState (CardanoBlock StandardCrypto)) IO) + , outCreateExtra :: Maybe FilePath + , outDeleteExtra :: Maybe FilePath + , outProgressMsg :: String + , outBackend :: SnapshotBackend + } main :: IO () main = withStdTerminalHandles $ do - cryptoInit - uncurry run =<< getCommandLineConfig + eRes <- runExceptT main' + case eRes of + Left err -> do + putStrLn $ show err + exitFailure + Right () -> exitSuccess where - run conf args = do - ccfg <- configCodec . pInfoConfig <$> mkProtocolInfo args - withSystemTempDirectory "lmdb" $ \dir -> do - let tempFS = SomeHasFS $ ioHasFS $ MountPoint dir - RR.withRegistry $ \rr -> do - putStrLn "Loading snapshot..." - state <- either throwIO pure =<< runExceptT (load conf rr ccfg dir) - putStrLn "Loaded snapshot" - putStrLn "Writing snapshot..." - store conf ccfg state tempFS - putStrLn "Written snapshot" + main' = do + lift $ cryptoInit + (conf, args) <- lift $ getCommandLineConfig + ccfg <- lift $ configCodec . pInfoConfig <$> mkProtocolInfo args + + InEnv{..} <- getInEnv ccfg (from conf) + + o@OutEnv{..} <- getOutEnv inState (to conf) + + wipeOutputPaths o + + lift $ putStr "Copying state file..." >> hFlush stdout + lift $ D.copyFile (inFilePath F. "state") (outFilePath F. "state") + lift $ putColored Green True "Done" + + lift $ putStr "Streaming ledger tables..." >> hFlush stdout >> saveCursor + + tid <- lift $ niceAnimatedProgressBar inProgressMsg outProgressMsg + + eRes <- lift $ runExceptT (stream inState inStream outStream) + + case eRes of + Left err -> throwError $ ReadTablesError err + Right (mCRCIn, mCRCOut) -> do + lift $ maybe (pure ()) cancel tid + lift $ clearLine >> restoreCursor >> cursorUp 1 >> putColored Green True "Done" + let crcIn = maybe inCRC (crcOfConcat inCRC) mCRCIn + maybe + ( lift $ + putColored Yellow True "The metadata file is missing, the snapshot is not guaranteed to be correct!" + ) + ( \cs -> + Monad.when (cs /= crcIn) $ throwError $ CRCMismatch cs crcIn + ) + inSnapReadCRC + + let crcOut = maybe inCRC (crcOfConcat inCRC) mCRCOut + + lift $ putStr "Generating new metadata file..." >> hFlush stdout + putMetadata outFilePath (SnapshotMetadata outBackend crcOut) + + lift $ putColored Green True "Done" + + wipeOutputPaths OutEnv{..} = do + wipePath outFilePath + lift $ maybe (pure ()) (D.createDirectory . (outFilePath F.)) outCreateExtra + maybe + (pure ()) + wipePath + outDeleteExtra + + getState ccfg fp@(pathToHasFS -> fs) = do + eState <- lift $ do + putStr $ "Reading ledger state from " <> (fp F. "state") <> "..." + hFlush stdout + runExceptT (readExtLedgerState fs (decodeDiskExtLedgerState ccfg) decode (mkFsPath ["state"])) + case eState of + Left err -> + throwError . SnapshotError . InitFailureRead @(CardanoBlock StandardCrypto) . ReadSnapshotFailed $ + err + Right st -> lift $ do + putColored Green True " Done" + pure . first ledgerState $ st + + getMetadata fp bknd = do + (fs, ds) <- toDiskSnapshot fp + mtd <- + lift $ runExceptT $ loadSnapshotMetadata fs ds + (,ds) + <$> either + ( \case + MetadataFileDoesNotExist -> pure Nothing + MetadataInvalid s -> throwError $ InvalidMetadata s + MetadataBackendMismatch -> error "impossible" + ) + ( \mtd' -> do + if bknd /= snapshotBackend mtd' + then throwError $ BackendMismatch bknd (snapshotBackend mtd') + else pure $ Just $ snapshotChecksum mtd' + ) + mtd + + putMetadata fp bknd = do + (fs, ds) <- toDiskSnapshot fp + lift $ writeSnapshotMetadata fs ds bknd + + getInEnv ccfg = \case + Mem fp -> do + (mtd, ds) <- getMetadata fp UTxOHDMemSnapshot + (st, c) <- getState ccfg fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + ( withOrigin + ( error + "Impossible! the snapshot seems to be at Genesis but cardano-node would never create such an snapshot!" + ) + id + $ pointSlot (getTip st) + ) + ) + + pure $ + InEnv + st + fp + (fromInMemory (fp F. "tables" F. "tvar")) + ("InMemory@[" <> fp <> "]") + c + mtd + LMDB fp -> do + (mtd, ds) <- getMetadata fp UTxOHDLMDBSnapshot + (st, c) <- getState ccfg fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + + pure $ + InEnv + st + fp + (fromLMDB (fp F. "tables") defaultLMDBLimits) + ("LMDB@[" <> fp <> "]") + c + mtd + + getOutEnv st = \case + Mem fp -> do + (_, ds) <- toDiskSnapshot fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + pure $ + OutEnv + fp + (toInMemory (fp F. "tables" F. "tvar")) + (Just "tables") + (Nothing) + ("InMemory@[" <> fp <> "]") + UTxOHDMemSnapshot + LMDB fp -> do + (_, ds) <- toDiskSnapshot fp + Monad.when + ((unSlotNo <$> pointSlot (getTip st)) /= NotOrigin (dsNumber ds)) + ( throwError $ + WrongSlotDirectoryName + (snapshotToDirName ds) + (withOrigin undefined id $ pointSlot (getTip st)) + ) + pure $ + OutEnv + fp + (toLMDB fp defaultLMDBLimits) + Nothing + Nothing + ("LMDB@[" <> fp <> "]") + UTxOHDLMDBSnapshot + +-- Helpers + +-- UI +niceAnimatedProgressBar :: String -> String -> IO (Maybe (Async IO ())) +niceAnimatedProgressBar inMsg outMsg = do + stdoutSupportsANSI <- hNowSupportsANSI stdout + if stdoutSupportsANSI + then do + putStrLn "" + pb <- + newProgressBar + defStyle{stylePrefix = msg (T.pack inMsg), stylePostfix = msg (T.pack outMsg)} + 10 + (Progress 1 100 ()) + + fmap Just $ + async $ + let loop = do + threadDelay 0.2 + updateProgress pb (\prg -> prg{progressDone = (progressDone prg + 4) `mod` 100}) + in Monad.forever loop + else pure Nothing + +putColored :: Color -> Bool -> String -> IO () +putColored c b s = do + stdoutSupportsANSI <- hNowSupportsANSI stdout + Monad.when stdoutSupportsANSI $ setSGR [SetColor Foreground Vivid c] + if b + then + putStrLn s + else + putStr s + Monad.when stdoutSupportsANSI $ setSGR [Reset] + hFlush stdout + +askForConfirmation :: + ExceptT (Error (CardanoBlock StandardCrypto)) IO a -> + String -> + ExceptT (Error (CardanoBlock StandardCrypto)) IO a +askForConfirmation act infoMsg = do + lift $ putColored Yellow False $ "I'm going to " <> infoMsg <> ". Continue? (Y/n) " + answer <- lift $ getLine + case map toLower answer of + "y" -> act + _ -> throwError Cancelled + +-- | Ask before deleting +wipePath :: FilePath -> ExceptT (Error (CardanoBlock StandardCrypto)) IO () +wipePath fp = do + exists <- lift $ D.doesDirectoryExist fp + ( if exists + then flip askForConfirmation ("wipe the path " <> fp) + else id + ) + (lift $ D.removePathForcibly fp >> D.createDirectoryIfMissing True fp) + +toDiskSnapshot :: + FilePath -> ExceptT (Error (CardanoBlock StandardCrypto)) IO (SomeHasFS IO, DiskSnapshot) +toDiskSnapshot fp@(F.splitFileName . maybeRemoveTrailingSlash -> (snapPath, snapName)) = + maybe + (throwError $ BadDirectoryName fp) + (pure . (pathToHasFS snapPath,)) + $ snapshotFromPath snapName + +-- | Given a filepath pointing to a snapshot (with or without a trailing slash), produce: +-- +-- * A HasFS at the snapshot directory +pathToHasFS :: FilePath -> SomeHasFS IO +pathToHasFS (maybeRemoveTrailingSlash -> path) = + SomeHasFS $ ioHasFS $ MountPoint path + +maybeRemoveTrailingSlash :: String -> String +maybeRemoveTrailingSlash s = case last s of + '/' -> init s + '\\' -> init s + _ -> s + +defaultLMDBLimits :: V1.LMDBLimits +defaultLMDBLimits = + V1.LMDBLimits + { V1.lmdbMapSize = 16 * 1024 * 1024 * 1024 + , V1.lmdbMaxDatabases = 10 + , V1.lmdbMaxReaders = 16 + } diff --git a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal index c119064246..10fe8ca145 100644 --- a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal +++ b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal @@ -98,10 +98,10 @@ library Ouroboros.Consensus.Cardano.Block Ouroboros.Consensus.Cardano.CanHardFork Ouroboros.Consensus.Cardano.Condense - Ouroboros.Consensus.Cardano.StreamingLedgerTables Ouroboros.Consensus.Cardano.Ledger Ouroboros.Consensus.Cardano.Node Ouroboros.Consensus.Cardano.QueryHF + Ouroboros.Consensus.Cardano.StreamingLedgerTables Ouroboros.Consensus.Shelley.Crypto Ouroboros.Consensus.Shelley.Eras Ouroboros.Consensus.Shelley.HFEras @@ -138,9 +138,6 @@ library bytestring >=0.10 && <0.13, cardano-binary, cardano-crypto, - fs-api, - contra-tracer, - directory, cardano-crypto-class ^>=2.2, cardano-crypto-wrapper, cardano-ledger-allegra ^>=1.8, @@ -153,8 +150,6 @@ library cardano-ledger-core ^>=1.18, cardano-ledger-dijkstra ^>=0.1, cardano-ledger-mary ^>=1.9, - temporary, - resource-registry, cardano-ledger-shelley ^>=1.17, cardano-prelude, cardano-protocol-tpraos ^>=1.4.1, @@ -165,8 +160,10 @@ library contra-tracer, crypton, deepseq, + directory, filepath, formatting >=6.3 && <7.3, + fs-api, measures, mempack, microlens, @@ -176,12 +173,14 @@ library ouroboros-consensus-protocol ^>=0.13, ouroboros-network-api ^>=0.16, random, + resource-registry, serialise ^>=0.2, singletons ^>=3.0, small-steps, sop-core ^>=0.5, sop-extras ^>=0.4, strict-sop-core ^>=0.1, + temporary, text, these ^>=1.2, validation, @@ -703,10 +702,10 @@ executable snapshot-converter hs-source-dirs: app main-is: snapshot-converter.hs build-depends: + ansi-terminal, base, - bytestring, cardano-crypto-class, - contra-tracer, + directory, filepath, fs-api, mtl, @@ -716,8 +715,8 @@ executable snapshot-converter ouroboros-consensus-cardano:unstable-cardano-tools, resource-registry, serialise, - sop-core, - temporary, + terminal-progress-bar, + text, with-utf8, other-modules: diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 519f30d6b4..fc1fec11e9 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -281,7 +281,6 @@ library Ouroboros.Consensus.Util.Args Ouroboros.Consensus.Util.Assert Ouroboros.Consensus.Util.CBOR - Ouroboros.Consensus.Util.StreamingLedgerTables Ouroboros.Consensus.Util.CRC Ouroboros.Consensus.Util.CallStack Ouroboros.Consensus.Util.Condense @@ -301,6 +300,7 @@ library Ouroboros.Consensus.Util.Orphans Ouroboros.Consensus.Util.RedundantConstraints Ouroboros.Consensus.Util.STM + Ouroboros.Consensus.Util.StreamingLedgerTables Ouroboros.Consensus.Util.Time Ouroboros.Consensus.Util.Versioned diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs index 5f722deea7..2f826f0f81 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs @@ -5,7 +5,6 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} -{-# LANGUAGE TypeApplications #-} {-# LANGUAGE ViewPatterns #-} {-# OPTIONS_GHC -Wno-orphans #-} @@ -46,6 +45,7 @@ import Ouroboros.Consensus.Ledger.Tables.Diff import Ouroboros.Consensus.Storage.LedgerDB.API import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq +import Ouroboros.Consensus.Util.IOLike (IOLike) import Ouroboros.Consensus.Util.IndexedMemPack import Ouroboros.Network.Block import Streaming @@ -277,48 +277,6 @@ sinkLmdbS writeChunkSize bs copyTo hint s = do Just ((k, v), s'') -> go (n - 1) (Map.insert k v m) s'' -sinkLsmS :: - forall l m r. - ( MonadAsync m - , MonadMVar m - , MonadThrow (STM m) - , MonadMask m - , MonadST m - , MonadEvaluate m - , MemPack (TxIn l) - , IndexedMemPack (l EmptyMK) (TxOut l) - ) => - Int -> - String -> - Session m -> - Sink l m r -sinkLsmS writeChunkSize snapName session st s = do - tb :: UTxOTable m <- lift $ newTable session - r <- go tb writeChunkSize mempty s - lift $ - saveSnapshot - (toSnapshotName snapName) - (SnapshotLabel $ T.pack "UTxO table") - tb - lift $ closeTable tb - pure (fmap (,Nothing) r) - where - go tb 0 m s' = do - lift $ - inserts tb $ - V.fromList [(toTxInBytes (Proxy @l) k, toTxOutBytes st v, Nothing) | (k, v) <- m] - go tb writeChunkSize mempty s' - go tb n m s' = do - mbs <- S.uncons s' - case mbs of - Nothing -> do - lift $ - inserts tb $ - V.fromList - [(toTxInBytes (Proxy @l) k, toTxOutBytes st v, Nothing) | (k, v) <- m] - S.effects s' - Just (item, s'') -> go tb (n - 1) (item : m) s'' - sinkInMemoryS :: forall m l r. MonadThrow m => From 5198b6d363d236f35eddeaa00114f4c2e85bccda Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Thu, 25 Sep 2025 11:24:54 +0200 Subject: [PATCH 3/5] Clarify how should snapshots be upgraded in 10.4 -> 10.5 --- .../miscellaneous/utxo-hd/migrating.mdx | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/docs/website/contents/references/miscellaneous/utxo-hd/migrating.mdx b/docs/website/contents/references/miscellaneous/utxo-hd/migrating.mdx index 0cd568960c..bcfe52c3be 100644 --- a/docs/website/contents/references/miscellaneous/utxo-hd/migrating.mdx +++ b/docs/website/contents/references/miscellaneous/utxo-hd/migrating.mdx @@ -142,7 +142,7 @@ follows: -## Convert the existing Ledger snapshots with `snapshot-converter` +## Upgrading a Ledger snapshots from 10.4.x to 10.5.x :::warning @@ -157,8 +157,29 @@ chain from Genesis, however this can take some hours. We provide the `snapshot-converter` tool which can load a snapshot in the Legacy format and write it either in the in-memory or on-disk formats for -UTxO-HD. Supposing you have copied a legacy snapshot to -`/snapshots/` you can run the following command: +UTxO-HD. + +:::info + +On cardano-node 10.6 there was a change in serialization unrelated to the +UTxO-HD changes. This means that going from 10.4.x to 10.5.x can benefit from +this doing a quick conversion using `snapshot-converter` as explained here, but +it won't work when the target is 10.6 or newer. Such a format update can +only be done via a replay of the chain. + +The `snapshot-converter` tool still exists in the repository and will convert +between the different UTxO-HD formats but it will no longer be able to load a +snapshot older than 10.6, and therefore it won't be able to "upgrade" a Legacy +snapshot to a UTxO-HD snapshot anymore. + +For this reason, the process below is only useful for upgrading from 10.4.x to +10.5.x, and **it must be performed with the `snapshot-converter` version that +was released with nodes 10.5.x**. + +::: + +Supposing you have copied a legacy snapshot to `/snapshots/` you +can run the following command:
From 6cc4284b11c31e66a2d08a2cfe546260bf32325b Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Thu, 25 Sep 2025 11:38:57 +0200 Subject: [PATCH 4/5] Mention `snapshot-converter` in `o-c-cardano` README --- ouroboros-consensus-cardano/README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/ouroboros-consensus-cardano/README.md b/ouroboros-consensus-cardano/README.md index 5db3429acd..93af458a91 100644 --- a/ouroboros-consensus-cardano/README.md +++ b/ouroboros-consensus-cardano/README.md @@ -20,6 +20,8 @@ This package also contains a few executables: * `app/immdb-server.hs`: serve an immutable DB via ChainSync and BlockFetch. +* `app/snapshot-converter.hs`: converts snapshots among different storage formats. + ### Assertions Our top level `cabal.project` enables assertions in both our local packages @@ -378,3 +380,23 @@ To point a node to a running ImmDB server, use a topology file like ] } ``` + + +## snapshot-converter + +## About + +This tool converts snapshots among the different backends supported by the node. + +## Running the tool + +Invoking the tool follows the same simple pattern always: + +```sh +cabal run snapshot-converter -- --config /path/to/cardano/config.json +``` + +The `` and `` parameters depend on the input and output format, receiving options: +- `--mem-in PATH`/`--mem-out PATH` for InMemory +- `--lmdb-in PATH`/`--lmdb-out PATH` for LMDB +- `--lsm-database-in DB_PATH --lsm-snapshot-in PATH`/`--lsm-database-out DB_PATH --lsm-snapshot-out PATH` for LSM-trees. From 023e6b922399745b44e8a9819c84bcfffb705db0 Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Wed, 22 Oct 2025 13:17:05 +0200 Subject: [PATCH 5/5] Cleanup backport --- .../app/snapshot-converter.hs | 1 - .../ouroboros-consensus-cardano.cabal | 1 - .../Cardano/StreamingLedgerTables.hs | 3 --- .../Consensus/Util/StreamingLedgerTables.hs | 26 ------------------- 4 files changed, 31 deletions(-) diff --git a/ouroboros-consensus-cardano/app/snapshot-converter.hs b/ouroboros-consensus-cardano/app/snapshot-converter.hs index 8aacc2a52a..13edc2c73f 100644 --- a/ouroboros-consensus-cardano/app/snapshot-converter.hs +++ b/ouroboros-consensus-cardano/app/snapshot-converter.hs @@ -41,7 +41,6 @@ import System.Exit import System.FS.API import System.FS.CRC import System.FS.IO -import System.FilePath (splitDirectories) import qualified System.FilePath as F import System.IO import System.ProgressBar diff --git a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal index 10fe8ca145..68cd417726 100644 --- a/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal +++ b/ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal @@ -172,7 +172,6 @@ library ouroboros-consensus ^>=0.28, ouroboros-consensus-protocol ^>=0.13, ouroboros-network-api ^>=0.16, - random, resource-registry, serialise ^>=0.2, singletons ^>=3.0, diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs index 256a58b827..bf8201dda8 100644 --- a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs @@ -23,7 +23,6 @@ import Data.SOP.BasicFunctors import Data.SOP.Functors import Data.SOP.Strict import qualified Data.SOP.Telescope as Telescope -import qualified Data.Text as T import Lens.Micro import Ouroboros.Consensus.Byron.Ledger import Ouroboros.Consensus.Cardano.Block @@ -36,14 +35,12 @@ import Ouroboros.Consensus.Shelley.Ledger import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol () import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB as LMDB -import Ouroboros.Consensus.Storage.LedgerDB.V2.Args import Ouroboros.Consensus.Util.StreamingLedgerTables import System.Directory import System.FS.API import System.FS.IO import System.FilePath as FilePath import System.IO.Temp -import System.Random type L = LedgerState (CardanoBlock StandardCrypto) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs index 2f826f0f81..1b1c874afe 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs @@ -22,11 +22,8 @@ import Codec.CBOR.Decoding (Decoder, decodeBreakOr, decodeListLen, decodeMapLenO import Codec.CBOR.Encoding (Encoding, encodeBreak, encodeListLen, encodeMapLenIndef) import Codec.CBOR.Read import Codec.CBOR.Write -import Control.Concurrent.Class.MonadMVar import Control.Monad (replicateM_, unless) -import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadST -import Control.Monad.Class.MonadSTM import Control.Monad.Class.MonadThrow import Control.Monad.Except import Control.Monad.State.Strict @@ -35,18 +32,12 @@ import Data.ByteString (ByteString) import qualified Data.ByteString as BS import Data.ByteString.Builder.Extra (defaultChunkSize) import qualified Data.Map.Strict as Map -import Data.MemPack -import Data.Proxy import qualified Data.Set as Set -import qualified Data.Text as T -import qualified Data.Vector as V import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Tables.Diff import Ouroboros.Consensus.Storage.LedgerDB.API import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API -import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq import Ouroboros.Consensus.Util.IOLike (IOLike) -import Ouroboros.Consensus.Util.IndexedMemPack import Ouroboros.Network.Block import Streaming import qualified Streaming as S @@ -230,23 +221,6 @@ yieldLmdbS readChunkSize bsvh hint k = do S.each $ Map.toList values go (RangeQuery (Just . LedgerTables . KeysMK $ Set.singleton x) readChunkSize) -yieldLsmS :: - Monad m => - Int -> - LedgerTablesHandle m l -> - Yield l m -yieldLsmS readChunkSize tb hint k = do - r <- k (go (Nothing, readChunkSize)) - lift $ S.effects r - where - go p = do - (LedgerTables (ValuesMK values), mx) <- lift $ S.lift $ readRange tb hint p - if Map.null values - then pure $ pure Nothing - else do - S.each $ Map.toList values - go (mx, readChunkSize) - {------------------------------------------------------------------------------- Sink -------------------------------------------------------------------------------}