Skip to content

Commit e99258a

Browse files
jasagredoamesgen
andcommitted
Implement streaming of Ledger Tables
Co-authored-by: Alexander Esgen <[email protected]>
1 parent 7dbfc68 commit e99258a

File tree

5 files changed

+611
-0
lines changed

5 files changed

+611
-0
lines changed

ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ library
9898
Ouroboros.Consensus.Cardano.Block
9999
Ouroboros.Consensus.Cardano.CanHardFork
100100
Ouroboros.Consensus.Cardano.Condense
101+
Ouroboros.Consensus.Cardano.StreamingLedgerTables
101102
Ouroboros.Consensus.Cardano.Ledger
102103
Ouroboros.Consensus.Cardano.Node
103104
Ouroboros.Consensus.Cardano.QueryHF
@@ -137,10 +138,14 @@ library
137138
bytestring >=0.10 && <0.13,
138139
cardano-binary,
139140
cardano-crypto,
141+
fs-api,
142+
contra-tracer,
143+
directory,
140144
cardano-crypto-class ^>=2.2,
141145
cardano-crypto-wrapper,
142146
cardano-ledger-allegra ^>=1.8,
143147
cardano-ledger-alonzo ^>=1.14,
148+
random,
144149
cardano-ledger-api ^>=1.12,
145150
cardano-ledger-babbage ^>=1.12,
146151
cardano-ledger-binary ^>=1.7,
@@ -149,6 +154,8 @@ library
149154
cardano-ledger-core ^>=1.18,
150155
cardano-ledger-dijkstra ^>=0.1,
151156
cardano-ledger-mary ^>=1.9,
157+
temporary,
158+
resource-registry,
152159
cardano-ledger-shelley ^>=1.17,
153160
cardano-prelude,
154161
cardano-protocol-tpraos ^>=1.4.1,
@@ -158,6 +165,7 @@ library
158165
containers >=0.5 && <0.8,
159166
crypton,
160167
deepseq,
168+
filepath,
161169
formatting >=6.3 && <7.3,
162170
measures,
163171
mempack,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
{-# LANGUAGE ScopedTypeVariables #-}
2+
{-# LANGUAGE TypeApplications #-}
3+
{-# LANGUAGE TypeOperators #-}
4+
5+
module Ouroboros.Consensus.Cardano.StreamingLedgerTables
6+
( fromInMemory
7+
, fromLSM
8+
, fromLMDB
9+
, toLMDB
10+
, toLSM
11+
, toInMemory
12+
) where
13+
14+
import Cardano.Ledger.BaseTypes (WithOrigin (..))
15+
import Cardano.Ledger.Binary
16+
import Cardano.Ledger.Core (ByronEra, Era, eraDecoder, toEraCBOR)
17+
import qualified Cardano.Ledger.Shelley.API as SL
18+
import qualified Cardano.Ledger.Shelley.LedgerState as SL
19+
import qualified Cardano.Ledger.State as SL
20+
import qualified Codec.CBOR.Encoding
21+
import Control.ResourceRegistry
22+
import Control.Tracer (nullTracer)
23+
import Data.Proxy
24+
import Data.SOP.BasicFunctors
25+
import Data.SOP.Functors
26+
import Data.SOP.Strict
27+
import qualified Data.SOP.Telescope as Telescope
28+
import qualified Data.Text as T
29+
import Lens.Micro
30+
import Ouroboros.Consensus.Byron.Ledger
31+
import Ouroboros.Consensus.Cardano.Block
32+
import Ouroboros.Consensus.Cardano.Ledger
33+
import Ouroboros.Consensus.HardFork.Combinator
34+
import Ouroboros.Consensus.HardFork.Combinator.State
35+
import Ouroboros.Consensus.Ledger.Abstract
36+
import Ouroboros.Consensus.Ledger.Tables.Utils (emptyLedgerTables)
37+
import Ouroboros.Consensus.Shelley.Ledger
38+
import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol ()
39+
import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API
40+
import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB as LMDB
41+
import Ouroboros.Consensus.Storage.LedgerDB.V2.Args
42+
import Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
43+
import Ouroboros.Consensus.Util.StreamingLedgerTables
44+
import System.Directory
45+
import System.FS.API
46+
import System.FS.IO
47+
import System.FilePath as FilePath
48+
import System.IO.Temp
49+
import System.Random
50+
51+
type L = LedgerState (CardanoBlock StandardCrypto)
52+
53+
fromInMemory :: FilePath -> L EmptyMK -> ResourceRegistry IO -> IO (YieldArgs L IO)
54+
fromInMemory fp (HardForkLedgerState (HardForkState idx)) _ =
55+
let
56+
np ::
57+
NP
58+
(Current (Flip LedgerState EmptyMK) -.-> K (Decoders L))
59+
(CardanoEras StandardCrypto)
60+
np =
61+
(Fn $ const $ K $ error "Byron")
62+
:* (Fn $ K . fromEra ShelleyTxOut . unFlip . currentState)
63+
:* (Fn $ K . fromEra AllegraTxOut . unFlip . currentState)
64+
:* (Fn $ K . fromEra MaryTxOut . unFlip . currentState)
65+
:* (Fn $ K . fromEra AlonzoTxOut . unFlip . currentState)
66+
:* (Fn $ K . fromEra BabbageTxOut . unFlip . currentState)
67+
:* (Fn $ K . fromEra ConwayTxOut . unFlip . currentState)
68+
:* (Fn $ K . fromEra DijkstraTxOut . unFlip . currentState)
69+
:* Nil
70+
in
71+
pure $
72+
YieldInMemory
73+
(SomeHasFS . ioHasFS)
74+
fp
75+
(hcollapse $ hap np $ Telescope.tip idx)
76+
where
77+
fromEra ::
78+
forall proto era.
79+
ShelleyCompatible proto era =>
80+
(TxOut (LedgerState (ShelleyBlock proto era)) -> CardanoTxOut StandardCrypto) ->
81+
LedgerState (ShelleyBlock proto era) EmptyMK ->
82+
Decoders L
83+
fromEra toCardanoTxOut st =
84+
let certInterns =
85+
internsFromMap $
86+
shelleyLedgerState st
87+
^. SL.nesEsL
88+
. SL.esLStateL
89+
. SL.lsCertStateL
90+
. SL.certDStateL
91+
. SL.accountsL
92+
. SL.accountsMapL
93+
in Decoders
94+
(eraDecoder @era decodeMemPack)
95+
(eraDecoder @era $ toCardanoTxOut <$> decShareCBOR certInterns)
96+
97+
fromLMDB :: FilePath -> LMDB.LMDBLimits -> L EmptyMK -> ResourceRegistry IO -> IO (YieldArgs L IO)
98+
fromLMDB fp limits hint reg = do
99+
let (dbPath, snapName) = splitFileName fp
100+
tempDir <- getCanonicalTemporaryDirectory
101+
let lmdbTemp = tempDir FilePath.</> "lmdb_streaming_in"
102+
removePathForcibly lmdbTemp
103+
_ <-
104+
allocate
105+
reg
106+
(\_ -> System.Directory.createDirectory lmdbTemp)
107+
(\_ -> removePathForcibly lmdbTemp)
108+
(_, bs) <-
109+
allocate
110+
reg
111+
( \_ -> do
112+
LMDB.newLMDBBackingStore
113+
nullTracer
114+
limits
115+
(LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp)
116+
(SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint dbPath)
117+
(InitFromCopy hint (mkFsPath [snapName]))
118+
)
119+
bsClose
120+
(_, bsvh) <- allocate reg (\_ -> bsValueHandle bs) bsvhClose
121+
pure (YieldLMDB 1000 bsvh)
122+
123+
fromLSM ::
124+
FilePath ->
125+
String ->
126+
L EmptyMK ->
127+
ResourceRegistry IO ->
128+
IO (YieldArgs L IO)
129+
fromLSM fp snapName _ reg = do
130+
(_, SomeHasFSAndBlockIO hasFS blockIO) <- stdMkBlockIOFS fp reg
131+
salt <- fst . genWord64 <$> newStdGen
132+
(_, session) <-
133+
allocate reg (\_ -> openSession nullTracer hasFS blockIO salt (mkFsPath [])) closeSession
134+
tb <-
135+
allocate
136+
reg
137+
( \_ ->
138+
openTableFromSnapshot
139+
session
140+
(toSnapshotName snapName)
141+
(SnapshotLabel $ T.pack "UTxO table")
142+
)
143+
closeTable
144+
YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer reg tb
145+
146+
toLMDB ::
147+
FilePath ->
148+
LMDB.LMDBLimits ->
149+
L EmptyMK ->
150+
ResourceRegistry IO ->
151+
IO (SinkArgs L IO)
152+
toLMDB fp limits hint reg = do
153+
let (snapDir, snapName) = splitFileName fp
154+
tempDir <- getCanonicalTemporaryDirectory
155+
let lmdbTemp = tempDir FilePath.</> "lmdb_streaming_out"
156+
removePathForcibly lmdbTemp
157+
_ <-
158+
allocate reg (\_ -> System.Directory.createDirectory lmdbTemp) (\_ -> removePathForcibly lmdbTemp)
159+
(_, bs) <-
160+
allocate
161+
reg
162+
( \_ ->
163+
LMDB.newLMDBBackingStore
164+
nullTracer
165+
limits
166+
(LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp)
167+
(SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint snapDir)
168+
(InitFromValues (At 0) hint emptyLedgerTables)
169+
)
170+
bsClose
171+
pure $ SinkLMDB 1000 (bsWrite bs) (\h -> bsCopy bs h (mkFsPath [snapName, "tables"]))
172+
173+
toInMemory ::
174+
FilePath ->
175+
L EmptyMK ->
176+
ResourceRegistry IO ->
177+
IO (SinkArgs L IO)
178+
toInMemory fp (HardForkLedgerState (HardForkState idx)) _ = do
179+
currDir <- getCurrentDirectory
180+
let
181+
np =
182+
(Fn $ const $ K $ encOne (Proxy @ByronEra))
183+
:* (Fn $ const $ K $ encOne (Proxy @ShelleyEra))
184+
:* (Fn $ const $ K $ encOne (Proxy @AllegraEra))
185+
:* (Fn $ const $ K $ encOne (Proxy @MaryEra))
186+
:* (Fn $ const $ K $ encOne (Proxy @AlonzoEra))
187+
:* (Fn $ const $ K $ encOne (Proxy @BabbageEra))
188+
:* (Fn $ const $ K $ encOne (Proxy @ConwayEra))
189+
:* (Fn $ const $ K $ encOne (Proxy @DijkstraEra))
190+
:* Nil
191+
pure $
192+
uncurry
193+
(SinkInMemory 1000)
194+
(hcollapse $ hap np $ Telescope.tip idx)
195+
(SomeHasFS $ ioHasFS $ MountPoint currDir)
196+
fp
197+
where
198+
encOne ::
199+
forall era.
200+
Era era =>
201+
Proxy era ->
202+
(TxIn L -> Codec.CBOR.Encoding.Encoding, TxOut L -> Codec.CBOR.Encoding.Encoding)
203+
encOne _ =
204+
(toEraCBOR @era . encodeMemPack, toEraCBOR @era . eliminateCardanoTxOut (const encodeMemPack))
205+
206+
toLSM ::
207+
FilePath ->
208+
String ->
209+
L EmptyMK ->
210+
ResourceRegistry IO ->
211+
IO (SinkArgs L IO)
212+
toLSM fp snapName _ reg = do
213+
removePathForcibly fp
214+
System.Directory.createDirectory fp
215+
(_, SomeHasFSAndBlockIO hasFS blockIO) <- stdMkBlockIOFS fp reg
216+
salt <- fst . genWord64 <$> newStdGen
217+
(_, session) <-
218+
allocate reg (\_ -> newSession nullTracer hasFS blockIO salt (mkFsPath [])) closeSession
219+
pure (SinkLSM 1000 snapName session)

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ library
272272
Ouroboros.Consensus.Util.Args
273273
Ouroboros.Consensus.Util.Assert
274274
Ouroboros.Consensus.Util.CBOR
275+
Ouroboros.Consensus.Util.StreamingLedgerTables
275276
Ouroboros.Consensus.Util.CRC
276277
Ouroboros.Consensus.Util.CallStack
277278
Ouroboros.Consensus.Util.Condense
@@ -317,6 +318,7 @@ library
317318
deepseq,
318319
diff-containers >=1.2,
319320
filelock,
321+
filepath,
320322
fingertree-rm >=1.0,
321323
fs-api ^>=0.4,
322324
hashable,

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
2222
( -- * LedgerTablesHandle
2323
newLSMLedgerTablesHandle
2424
, tableFromValuesMK
25+
, UTxOTable
2526

2627
-- * Snapshots
2728
, loadSnapshot
@@ -39,6 +40,15 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
3940

4041
-- * snapshot-converter
4142
, implTakeSnapshot
43+
, LSM.withNewSession
44+
, toTxInBytes
45+
, toTxOutBytes
46+
, LSM.newSession
47+
, LSM.toSnapshotName
48+
, LSM.SnapshotLabel (LSM.SnapshotLabel)
49+
, LSM.openTableFromSnapshot
50+
, LSM.closeTable
51+
, LSM.listSnapshots
4252
) where
4353

4454
import Cardano.Binary as CBOR

0 commit comments

Comments
 (0)