Skip to content

Commit ac640be

Browse files
committed
WIP
1 parent 4d9cf29 commit ac640be

File tree

3 files changed

+160
-47
lines changed

3 files changed

+160
-47
lines changed

ouroboros-consensus-cardano/ouroboros-consensus-cardano.cabal

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ library
141141
fs-api,
142142
streaming,
143143
contra-tracer,
144+
directory,
144145
cardano-crypto-class ^>=2.2,
145146
cardano-crypto-wrapper,
146147
cardano-ledger-allegra ^>=1.8,
@@ -150,18 +151,22 @@ library
150151
cardano-ledger-binary ^>=1.7,
151152
cardano-ledger-byron ^>=1.2,
152153
cardano-ledger-conway ^>=1.20,
154+
cardano-ledger-conway:testlib,
153155
cardano-ledger-core ^>=1.18,
154156
cardano-ledger-dijkstra ^>=0.1,
155157
cardano-ledger-mary ^>=1.9,
158+
temporary,
156159
cardano-ledger-shelley ^>=1.17,
157160
cardano-prelude,
158161
cardano-protocol-tpraos ^>=1.4.1,
159162
cardano-slotting,
160163
cardano-strict-containers,
161-
cborg ^>=0.2.2,
164+
cborg ^>=0.2.2,
165+
cardano-protocol-tpraos:testlib,
162166
containers >=0.5 && <0.8,
163167
crypton,
164168
deepseq,
169+
filepath,
165170
formatting >=6.3 && <7.3,
166171
measures,
167172
mempack,

ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/StreamingLedgerTables.hs

Lines changed: 138 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,29 @@
44
{-# LANGUAGE ScopedTypeVariables #-}
55
{-# LANGUAGE TypeApplications #-}
66
{-# LANGUAGE TypeOperators #-}
7+
{-# OPTIONS_GHC -Wno-unused-imports -Wno-unused-top-binds -Wno-missing-export-lists #-}
78

89
module Ouroboros.Consensus.Cardano.StreamingLedgerTables where
910

10-
import Cardano.Ledger.BaseTypes (WithOrigin (..))
11+
import Cardano.Ledger.BaseTypes (BlockNo (..), EpochNo (..), SlotNo (..), WithOrigin (..))
1112
import Cardano.Ledger.Binary
12-
import Cardano.Ledger.Core (eraDecoder)
13+
import Cardano.Ledger.Core (ByronEra, Era, eraDecoder, toEraCBOR)
1314
import qualified Cardano.Ledger.Shelley.API as SL
1415
import qualified Cardano.Ledger.Shelley.LedgerState as SL
1516
import qualified Cardano.Ledger.State as SL
17+
import qualified Cardano.Protocol.TPraos.BHeader as SL
18+
import Cardano.Slotting.Time
1619
import Control.Monad.Except
1720
import Control.Tracer (nullTracer)
1821
import Data.ByteString (ByteString)
1922
import qualified Data.Map.Strict as Map
23+
import Data.MemPack
24+
import Data.Proxy
2025
import Data.SOP.BasicFunctors
2126
import Data.SOP.Functors
2227
import Data.SOP.Strict
2328
import qualified Data.SOP.Telescope as Telescope
29+
import qualified Debug.Trace as Debug
2430
import Lens.Micro
2531
import Ouroboros.Consensus.Byron.Ledger
2632
import Ouroboros.Consensus.Cardano.Block
@@ -29,6 +35,7 @@ import Ouroboros.Consensus.Cardano.Ledger
2935
import Ouroboros.Consensus.HardFork.Combinator
3036
import Ouroboros.Consensus.HardFork.Combinator.Basics (LedgerState (..))
3137
import Ouroboros.Consensus.HardFork.Combinator.State
38+
import Ouroboros.Consensus.HardFork.History.Summary
3239
import Ouroboros.Consensus.Ledger.Abstract
3340
import Ouroboros.Consensus.Ledger.Tables.Utils (emptyLedgerTables)
3441
import Ouroboros.Consensus.Shelley.Ledger
@@ -38,17 +45,24 @@ import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB
3845
import Ouroboros.Consensus.Util.IOLike (bracket)
3946
import Ouroboros.Consensus.Util.StreamingLedgerTables
4047
import Streaming
48+
import System.Directory
4149
import System.FS.API
4250
import System.FS.IO
51+
import System.FilePath as FilePath
52+
import System.IO.Temp
53+
import qualified Test.Cardano.Ledger.Conway.Examples as Conway
54+
import Test.Cardano.Protocol.TPraos.Examples
55+
56+
type L = LedgerState (CardanoBlock StandardCrypto)
4357

4458
fromInMemory ::
4559
SomeHasFS IO ->
4660
FsPath ->
4761
LedgerState (CardanoBlock StandardCrypto) EmptyMK ->
4862
( Stream
4963
( Of
50-
( TxIn (LedgerState (CardanoBlock StandardCrypto))
51-
, TxOut (LedgerState (CardanoBlock StandardCrypto))
64+
( TxIn L
65+
, TxOut L
5266
)
5367
)
5468
(ExceptT DeserialiseFailure IO)
@@ -63,27 +77,25 @@ fromInMemory shfs fp (HardForkLedgerState (HardForkState idx)) k =
6377
(Current (Flip LedgerState EmptyMK) -.-> K (ExceptT DeserialiseFailure IO ()))
6478
(CardanoEras StandardCrypto)
6579
np =
66-
( Fn $ undefined ::
67-
(Current (Flip LedgerState EmptyMK) -.-> K (ExceptT DeserialiseFailure IO ())) ByronBlock
68-
)
69-
:* (Fn $ K . foo ShelleyTxOut . unFlip . currentState)
70-
:* (Fn $ K . foo AllegraTxOut . unFlip . currentState)
71-
:* (Fn $ K . foo MaryTxOut . unFlip . currentState)
72-
:* (Fn $ K . foo AlonzoTxOut . unFlip . currentState)
73-
:* (Fn $ K . foo BabbageTxOut . unFlip . currentState)
74-
:* (Fn $ K . foo ConwayTxOut . unFlip . currentState)
75-
:* (Fn $ K . foo DijkstraTxOut . unFlip . currentState)
80+
(Fn $ const $ K $ pure ())
81+
:* (Fn $ K . fromEra ShelleyTxOut . unFlip . currentState)
82+
:* (Fn $ K . fromEra AllegraTxOut . unFlip . currentState)
83+
:* (Fn $ K . fromEra MaryTxOut . unFlip . currentState)
84+
:* (Fn $ K . fromEra AlonzoTxOut . unFlip . currentState)
85+
:* (Fn $ K . fromEra BabbageTxOut . unFlip . currentState)
86+
:* (Fn $ K . fromEra ConwayTxOut . unFlip . currentState)
87+
:* (Fn $ K . fromEra DijkstraTxOut . unFlip . currentState)
7688
:* Nil
7789
in
7890
hcollapse $ hap np $ Telescope.tip idx
7991
where
80-
foo ::
92+
fromEra ::
8193
forall proto era.
8294
ShelleyCompatible proto era =>
8395
(TxOut (LedgerState (ShelleyBlock proto era)) -> CardanoTxOut StandardCrypto) ->
8496
LedgerState (ShelleyBlock proto era) EmptyMK ->
8597
ExceptT DeserialiseFailure IO ()
86-
foo toCardanoTxOut st =
98+
fromEra toCardanoTxOut st =
8799
let certInterns =
88100
internsFromMap $
89101
shelleyLedgerState st
@@ -105,28 +117,95 @@ toLMDB ::
105117
FilePath ->
106118
Stream
107119
( Of
108-
( TxIn (LedgerState (CardanoBlock StandardCrypto))
109-
, TxOut (LedgerState (CardanoBlock StandardCrypto))
120+
( TxIn L
121+
, TxOut L
110122
)
111123
)
112124
(ExceptT DeserialiseFailure IO)
113125
(Stream (Of ByteString) IO ()) ->
114126
ExceptT DeserialiseFailure IO (Stream (Of ByteString) IO ())
115127
toLMDB hint fp s = do
128+
tempDir <- lift $ getCanonicalTemporaryDirectory
129+
let lmdbTemp = tempDir FilePath.</> "lmdb_streaming"
130+
lift $ removePathForcibly lmdbTemp
131+
currDir <- lift $ getCurrentDirectory
132+
lift $ System.Directory.createDirectory lmdbTemp
116133
bs <-
117134
lift $
118135
LMDB.newLMDBBackingStore
119136
nullTracer
120137
limits
121-
(LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint fp)
122-
(SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint fp)
138+
(LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp)
139+
(SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint currDir)
123140
(InitFromValues (At 0) hint emptyLedgerTables)
124141
r <- sinkLmdbS @(ExceptT DeserialiseFailure IO) 1000 hint (\s' h d -> lift $ bsWrite bs s' h d) s
142+
lift $ bsCopy bs hint (mkFsPath (splitDirectories fp))
125143
lift $ bsClose bs
126144
pure r
127145

146+
fromLMDB ::
147+
LedgerState (CardanoBlock StandardCrypto) EmptyMK ->
148+
FilePath ->
149+
Stream (Of (TxIn L, TxOut L)) (ExceptT DeserialiseFailure IO) ()
150+
fromLMDB hint fp = do
151+
tempDir <- lift $ lift $ getCanonicalTemporaryDirectory
152+
let lmdbTemp = tempDir FilePath.</> "lmdb_streaming"
153+
lift $ lift $ removePathForcibly lmdbTemp
154+
Debug.traceM "Deleted directory"
155+
currDir <- lift $ lift $ getCurrentDirectory
156+
lift $ lift $ System.Directory.createDirectory lmdbTemp
157+
bs <-
158+
lift $
159+
lift $
160+
LMDB.newLMDBBackingStore
161+
nullTracer
162+
limits
163+
(LiveLMDBFS $ SomeHasFS $ ioHasFS $ MountPoint lmdbTemp)
164+
(SnapshotsFS $ SomeHasFS $ ioHasFS $ MountPoint currDir)
165+
(InitFromCopy hint (mkFsPath (splitDirectories fp)))
166+
Debug.traceM "Opened LMDB"
167+
bsvh <- lift $ lift $ bsValueHandle bs
168+
Debug.traceM "Opened value handle"
169+
yieldLmdbS 1000 hint bsvh
170+
171+
toInMemory ::
172+
L EmptyMK ->
173+
FilePath ->
174+
Stream (Of (TxIn L, TxOut L)) (ExceptT DeserialiseFailure IO) () ->
175+
ExceptT DeserialiseFailure IO ()
176+
toInMemory (HardForkLedgerState (HardForkState idx)) fp s = do
177+
currDir <- lift $ getCurrentDirectory
178+
let
179+
np =
180+
(Fn $ const $ K $ encOne (Proxy @ByronEra) currDir)
181+
:* (Fn $ const $ K $ encOne (Proxy @ShelleyEra) currDir)
182+
:* (Fn $ const $ K $ encOne (Proxy @AllegraEra) currDir)
183+
:* (Fn $ const $ K $ encOne (Proxy @MaryEra) currDir)
184+
:* (Fn $ const $ K $ encOne (Proxy @AlonzoEra) currDir)
185+
:* (Fn $ const $ K $ encOne (Proxy @BabbageEra) currDir)
186+
:* (Fn $ const $ K $ encOne (Proxy @ConwayEra) currDir)
187+
:* (Fn $ const $ K $ encOne (Proxy @DijkstraEra) currDir)
188+
:* Nil
189+
hcollapse $ hap np $ Telescope.tip idx
190+
where
191+
encOne :: forall era. Era era => Proxy era -> FilePath -> ExceptT DeserialiseFailure IO ()
192+
encOne _ currDir =
193+
sinkInMemoryS
194+
(Proxy @L)
195+
1000
196+
(toEraCBOR @era . encodeMemPack)
197+
(toEraCBOR @era . eliminateCardanoTxOut (const encodeMemPack))
198+
(SomeHasFS $ ioHasFS $ MountPoint currDir)
199+
fp
200+
s
201+
128202
limits :: LMDB.LMDBLimits
129-
limits = undefined
203+
limits =
204+
LMDB.LMDBLimits
205+
{ LMDB.lmdbMapSize = 16 * 1024 * 1024 * 1024
206+
, LMDB.lmdbMaxDatabases = 10
207+
, LMDB.lmdbMaxReaders = 16
208+
}
130209

131210
foo ::
132211
SomeHasFS IO ->
@@ -135,3 +214,41 @@ foo ::
135214
LedgerState (CardanoBlock StandardCrypto) EmptyMK ->
136215
ExceptT DeserialiseFailure IO ()
137216
foo shfs fpFrom fpTo st = fromInMemory shfs fpFrom st (toLMDB st fpTo)
217+
218+
bar ::
219+
LedgerState (CardanoBlock StandardCrypto) EmptyMK ->
220+
FilePath ->
221+
FilePath ->
222+
ExceptT DeserialiseFailure IO ()
223+
bar st fpFrom fpTo = do
224+
let s = fromLMDB st fpFrom
225+
toInMemory st fpTo s
226+
227+
lstate :: L EmptyMK
228+
lstate =
229+
HardForkLedgerState
230+
$ HardForkState
231+
$ TS (K $ Past (Bound (RelativeTime 0) 0 (EpochNo 0)) (Bound (RelativeTime 0) 0 (EpochNo 0)))
232+
$ TS (K $ Past (Bound (RelativeTime 0) 0 (EpochNo 0)) (Bound (RelativeTime 0) 0 (EpochNo 0)))
233+
$ TS (K $ Past (Bound (RelativeTime 0) 0 (EpochNo 0)) (Bound (RelativeTime 0) 0 (EpochNo 0)))
234+
$ TS (K $ Past (Bound (RelativeTime 0) 0 (EpochNo 0)) (Bound (RelativeTime 0) 0 (EpochNo 0)))
235+
$ TS (K $ Past (Bound (RelativeTime 0) 0 (EpochNo 0)) (Bound (RelativeTime 0) 0 (EpochNo 0)))
236+
$ TS (K $ Past (Bound (RelativeTime 0) 0 (EpochNo 0)) (Bound (RelativeTime 0) 0 (EpochNo 0)))
237+
$ TZ
238+
$ Current
239+
(Bound (RelativeTime 0) 0 (EpochNo 0))
240+
$ Flip
241+
ShelleyLedgerState
242+
{ shelleyLedgerTip =
243+
At
244+
ShelleyTip
245+
{ shelleyTipSlotNo = SlotNo 9
246+
, shelleyTipBlockNo = BlockNo 3
247+
, shelleyTipHash =
248+
ShelleyHash $ SL.unHashHeader $ pleHashHeader $ ledgerExamplesTPraos Conway.ledgerExamples
249+
}
250+
, shelleyLedgerState =
251+
leNewEpochState $ pleLedgerExamples $ ledgerExamplesTPraos Conway.ledgerExamples
252+
, shelleyLedgerTransition = ShelleyTransitionInfo{shelleyAfterVoting = 0}
253+
, shelleyLedgerTables = emptyLedgerTables
254+
}

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import Data.Proxy
3939
import qualified Data.Set as Set
4040
import qualified Data.Vector as V
4141
import Database.LSMTree
42+
import Debug.Trace as Debug
4243
import Ouroboros.Consensus.Ledger.Abstract
4344
import Ouroboros.Consensus.Ledger.Tables.Diff
4445
import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API
@@ -50,18 +51,6 @@ import qualified Streaming as S
5051
import qualified Streaming.Prelude as S
5152
import System.FS.API
5253

53-
yieldInMemory ::
54-
SomeHasFS IO ->
55-
FsPath ->
56-
(forall s. Decoder s a) ->
57-
(forall s. Decoder s b) ->
58-
( Stream (Of (a, b)) (ExceptT DeserialiseFailure IO) (Stream (Of ByteString) IO ()) ->
59-
ExceptT DeserialiseFailure IO (Stream (Of ByteString) IO r)
60-
) ->
61-
ExceptT DeserialiseFailure IO r
62-
yieldInMemory shfs fp decK decV k =
63-
yieldInMemoryS shfs fp decK decV k
64-
6554
streamingFile ::
6655
forall m r.
6756
MonadThrow m =>
@@ -153,6 +142,7 @@ yieldLmdbS readChunkSize hint bsvh =
153142
where
154143
go p = do
155144
LedgerTables (ValuesMK values) <- S.lift $ bsvhRangeRead bsvh hint p
145+
Debug.traceM "Read some"
156146
if Map.null values
157147
then pure ()
158148
else do
@@ -203,16 +193,13 @@ sinkLmdbS writeChunkSize hint bs s = do
203193

204194
sinkLsmS ::
205195
forall l m.
206-
( SerialiseKey (TxIn l)
207-
, ResolveValue (LSMTxOut l)
208-
, SerialiseValue (LSMTxOut l)
209-
, MonadAsync m
196+
( MonadAsync m
210197
, MonadMVar m
211198
, MonadThrow (STM m)
212199
, MonadMask m
213200
, MonadST m
214201
, MonadEvaluate m
215-
, HasLSMTxOut l
202+
, LedgerSupportsLSMLedgerDB l
216203
) =>
217204
Proxy l ->
218205
Int ->
@@ -233,28 +220,32 @@ sinkLsmS p writeChunkSize session s =
233220
Just (item, s'') -> go tb (n - 1) (item : m) s''
234221

235222
sinkInMemoryS ::
223+
forall m l.
236224
MonadThrow m =>
237225
Proxy l ->
238226
Int ->
239227
(TxIn l -> Encoding) ->
240228
(TxOut l -> Encoding) ->
241229
SomeHasFS m ->
242230
FilePath ->
243-
Stream (Of (TxIn l, TxOut l)) m () ->
244-
m ()
231+
Stream (Of (TxIn l, TxOut l)) (ExceptT DeserialiseFailure m) () ->
232+
ExceptT DeserialiseFailure m ()
245233
sinkInMemoryS _ writeChunkSize encK encV (SomeHasFS fs) fp s =
246-
withFile fs (mkFsPath [fp]) (WriteMode MustBeNew) $ \hdl -> do
234+
ExceptT $ withFile fs (mkFsPath [fp]) (WriteMode MustBeNew) $ \hdl -> do
247235
void $ hPutSome fs hdl $ toStrictByteString encodeMapLenIndef
248-
go hdl writeChunkSize mempty s
249-
void $ hPutSome fs hdl $ toStrictByteString encodeBreak
250-
pure ()
236+
e <- runExceptT $ go hdl writeChunkSize mempty s
237+
case e of
238+
Left err -> pure $ Left err
239+
Right () -> do
240+
void $ hPutSome fs hdl $ toStrictByteString encodeBreak
241+
pure $ Right ()
251242
where
252243
go tb 0 m s' = do
253-
void $ hPutSome fs tb $ toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- m]
244+
lift $ void $ hPutSome fs tb $ toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- m]
254245
go tb writeChunkSize mempty s'
255246
go tb n m s' = do
256247
mbs <- S.uncons s'
257248
case mbs of
258249
Nothing ->
259-
void $ hPutSome fs tb $ toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- m]
250+
lift $ void $ hPutSome fs tb $ toStrictByteString $ mconcat [encK k <> encV v | (k, v) <- m]
260251
Just (item, s'') -> go tb (n - 1) (item : m) s''

0 commit comments

Comments
 (0)