Skip to content

Commit 749bbe9

Browse files
committed
Streaming tables WIP
1 parent 37c56c3 commit 749bbe9

File tree

3 files changed

+295
-1
lines changed

3 files changed

+295
-1
lines changed

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ library
272272
Ouroboros.Consensus.Util.Args
273273
Ouroboros.Consensus.Util.Assert
274274
Ouroboros.Consensus.Util.CBOR
275+
Ouroboros.Consensus.Util.StreamingLedgerTables
275276
Ouroboros.Consensus.Util.CRC
276277
Ouroboros.Consensus.Util.CallStack
277278
Ouroboros.Consensus.Util.Condense

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
2121
( -- * LedgerTablesHandle
2222
newLSMLedgerTablesHandle
2323
, tableFromValuesMK
24-
24+
, UTxOTable
25+
2526
-- * LSM TxOuts
2627
, LSMTxOut
2728
, HasLSMTxOut (..)
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
{-# LANGUAGE TypeApplications #-}
2+
{-# LANGUAGE ScopedTypeVariables #-}
3+
{-# LANGUAGE FlexibleContexts #-}
4+
{-# LANGUAGE RankNTypes #-}
5+
{-# LANGUAGE ViewPatterns #-}
6+
{-# LANGUAGE LambdaCase #-}
7+
-- |
8+
9+
module Ouroboros.Consensus.Util.StreamingLedgerTables where
10+
11+
import Data.Proxy
12+
import Streaming
13+
import qualified Streaming as S
14+
import System.FS.API
15+
import qualified Data.ByteString as BS
16+
import Data.ByteString (ByteString)
17+
import Codec.CBOR.Encoding (Encoding, encodeMapLenIndef, encodeBreak)
18+
import Codec.CBOR.Decoding (Decoder)
19+
import Cardano.Slotting.Slot
20+
import Ouroboros.Consensus.Ledger.Abstract
21+
import Codec.CBOR.Read
22+
import Codec.CBOR.Write
23+
import Ouroboros.Network.Block
24+
import Codec.CBOR.FlatTerm
25+
import Control.Monad.Class.MonadST
26+
import Control.Monad.Class.MonadAsync
27+
import Control.Monad.Class.MonadSTM
28+
import Control.Concurrent.Class.MonadMVar
29+
import qualified Streaming.Prelude as S
30+
import Control.Monad.Class.MonadThrow
31+
import qualified Streaming.Internal as SI
32+
import Data.ByteString.Builder.Extra (defaultChunkSize)
33+
import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API
34+
import Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
35+
import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq
36+
import Ouroboros.Consensus.Ledger.Tables
37+
import Ouroboros.Consensus.Ledger.Tables.Diff
38+
import qualified Data.Map.Strict as Map
39+
import qualified Data.Set as Set
40+
import Database.LSMTree
41+
import qualified Data.Vector as V
42+
import Data.MemPack
43+
44+
streamingFile ::
45+
MonadThrow m =>
46+
SomeHasFS m ->
47+
FsPath ->
48+
(Stream (Of ByteString) m () -> m (Stream (Of ByteString) m r)) ->
49+
m r
50+
streamingFile fs@(SomeHasFS fs') path cont =
51+
withFile fs' path ReadMode $ \hdl ->
52+
cont (foo hdl) >>= noRemainingBytes
53+
where
54+
foo h = do
55+
bs <- S.lift $ hGetSome fs' h (fromIntegral defaultChunkSize)
56+
if BS.null bs
57+
then pure ()
58+
else do S.yield bs
59+
foo h
60+
61+
noRemainingBytes :: Monad m => Stream (Of ByteString) m r -> m r
62+
noRemainingBytes s = S.uncons s >>= \case
63+
Nothing -> S.effects s
64+
Just (BS.null -> True, s') -> noRemainingBytes s'
65+
Just _ -> error "Remaining bytes!"
66+
67+
yieldCborMapS ::
68+
forall m a b. MonadST m =>
69+
(forall s. Decoder s a) ->
70+
(forall s. Decoder s b) ->
71+
Stream (Of ByteString) m () ->
72+
Stream (Of (a, b)) m (Stream (Of ByteString) m ())
73+
yieldCborMapS decK decV s = do
74+
k <- S.lift $ stToIO (deserialiseIncremental decK)
75+
mbs <- S.lift (S.uncons s)
76+
case mbs of
77+
Nothing -> error "Empty stream of bytes"
78+
Just (bs, s') ->
79+
case deserialiseFromBytes decodeTermToken (BS.fromStrict bs) of
80+
Left err -> error $ show err
81+
Right (bs', TkMapLen n) -> go (Just n) (Left k) $ Right (BS.toStrict bs', s')
82+
Right (bs', TkMapBegin) -> go Nothing (Left k) $ Right (BS.toStrict bs', s')
83+
_ -> error "Not a map!"
84+
where
85+
go (Just 0) k mbs = case mbs of
86+
Left s' -> pure s'
87+
Right (bs, s') -> pure (S.yield bs *> s')
88+
go remainingItems k mbs = case (k, mbs) of
89+
-- We have a partial decoding, awaiting for a bytestring
90+
91+
-- We have read a bytestring from the stream
92+
(Left (Partial kont), Right (bs, s')) -> do
93+
k' <- S.lift $ stToIO $ kont $ Just bs
94+
case k' of
95+
-- after running the kontinuation, we still require more input,
96+
-- then read again from the stream
97+
Partial{} -> go remainingItems (Left k') . maybeToEither s' =<< S.lift (S.uncons s')
98+
-- We were done with the previous bytestring, so let's
99+
-- recurse without reading more.
100+
_ -> go remainingItems (Left k') (Left s')
101+
102+
-- We are in a partial reading, but we were unable to read more
103+
-- input, so we call `kont` with `Nothing` which will fail.
104+
(Left (Partial kont), Left s') -> do
105+
k' <- S.lift $ stToIO $ kont Nothing
106+
go remainingItems (Left k') (Left s')
107+
108+
-- We have read a bytestring from the stream
109+
(Right (valK, Partial kont), Right (bs, s')) -> do
110+
k' <- S.lift $ stToIO $ kont $ Just bs
111+
case k' of
112+
-- after running the kontinuation, we still require more input,
113+
-- then read again from the stream
114+
Partial{} -> go remainingItems (Right (valK, k')) . maybeToEither s' =<< S.lift (S.uncons s')
115+
-- We were done with the previous bytestring, so let's
116+
-- recurse without reading more.
117+
_ -> go remainingItems (Right (valK, k')) (Left s')
118+
119+
-- We are in a partial reading, but we were unable to read more
120+
-- input, so we call `kont` with `Nothing` which will fail.
121+
(Right (valK, Partial kont), Left s') -> do
122+
k' <- S.lift $ stToIO $ kont Nothing
123+
go remainingItems (Right (valK, k')) (Left s')
124+
125+
-- We completed a read
126+
(Left (Done unused _offset val), Left s') -> do
127+
if BS.null unused
128+
then
129+
-- We have no unused bytes, so read another chunk
130+
S.lift (S.uncons s') >>= \case
131+
-- If there is no more input, fail because we were expecting a value!
132+
Nothing -> error "No value!"
133+
134+
-- Recurse if there is more input
135+
Just mbs' -> do
136+
k' <- S.lift $ stToIO (deserialiseIncremental decV)
137+
go remainingItems (Right (val, k')) $ Right mbs'
138+
else do
139+
-- We still have unused bytes, so use those before reading
140+
-- again.
141+
k' <- S.lift $ stToIO (deserialiseIncremental decV)
142+
go remainingItems (Right (val, k')) (Right (unused, s'))
143+
144+
-- We completed a read
145+
(Right (valK, Done unused _offset val), Left s') -> do
146+
-- yield the pair
147+
S.yield (valK, val)
148+
case remainingItems of
149+
Just 1 -> pure (S.yield unused *> s')
150+
_ -> do
151+
k' <- S.lift $ stToIO (deserialiseIncremental decK)
152+
if BS.null unused
153+
then
154+
-- We have no unused bytes, so read another chunk
155+
S.lift (S.uncons s') >>= \case
156+
-- If there is no more input, then we are done!
157+
Nothing ->
158+
case remainingItems of
159+
Just n -> error $ "Missing " ++ show (n - 1) ++ " items!"
160+
Nothing -> error "Missing a break!"
161+
162+
-- Recurse if there is more input
163+
Just mbs' -> do
164+
go ((\x -> x - 1) <$> remainingItems) (Left k') $ Right mbs'
165+
else do
166+
-- We still have unused bytes, so use those before reading
167+
-- again.
168+
go ((\x -> x - 1) <$> remainingItems) (Left k') (Right (unused, s'))
169+
170+
171+
(Left (Done _ _ _), Right _) -> error "unreachable!"
172+
(Right (_, Done _ _ _), Right _) -> error "unreachable!"
173+
(Left Fail{}, Right{}) -> error "unreachable!"
174+
(Right (_, Fail{}), Right{}) -> error "unreachable!"
175+
176+
(Left (Fail bs _ err), Left s') ->
177+
case remainingItems of
178+
Nothing -> case deserialiseFromBytes decodeTermToken (BS.fromStrict bs) of
179+
Right (bs', TkBreak) -> pure (S.yield (BS.toStrict bs') *> s')
180+
_ -> error "Break not found!"
181+
_ ->
182+
error $ show err
183+
184+
(Right (_, Fail bs _ err), _) ->
185+
error $ show err
186+
187+
maybeToEither :: a -> Maybe b -> Either a b
188+
maybeToEither _ (Just b) = Right b
189+
maybeToEither a Nothing = Left a
190+
191+
yieldLmdbS ::
192+
Monad m =>
193+
l EmptyMK ->
194+
LedgerBackingStoreValueHandle m l ->
195+
Stream (Of (TxIn l, TxOut l)) m ()
196+
yieldLmdbS hint bsvh =
197+
go (RangeQuery Nothing 100000)
198+
where
199+
go p = do
200+
LedgerTables (ValuesMK values) <- S.lift $ bsvhRangeRead bsvh hint p
201+
if Map.null values
202+
then pure ()
203+
else do
204+
S.each $ Map.toList values
205+
go (RangeQuery (LedgerTables . KeysMK . Set.singleton . fst <$> Map.lookupMax values) 100000)
206+
207+
yieldLSM_S ::
208+
Monad m =>
209+
l EmptyMK ->
210+
LedgerTablesHandle m l ->
211+
Stream (Of (TxIn l, TxOut l)) m ()
212+
yieldLSM_S hint tb = do
213+
go (Nothing, 100000)
214+
where
215+
go p = do
216+
(LedgerTables (ValuesMK values), mx) <- S.lift $ readRange tb hint p
217+
if Map.null values
218+
then pure ()
219+
else do
220+
S.each $ Map.toList values
221+
go (mx, 100000)
222+
223+
sinkLmdbS ::
224+
(Ord (TxIn l), GetTip l, Monad m) =>
225+
l EmptyMK ->
226+
LedgerBackingStore m l ->
227+
Stream (Of (TxIn l, TxOut l)) m () ->
228+
m ()
229+
sinkLmdbS hint bs s = do
230+
go 1000 mempty s
231+
where
232+
sl = withOrigin (error "unreachable") id $ pointSlot $ getTip hint
233+
234+
go 0 m s' = do
235+
bsWrite bs sl (hint, hint) (LedgerTables $ DiffMK $ fromMapInserts m)
236+
go 1000 mempty s'
237+
go n m s' = do
238+
mbs <- S.uncons s'
239+
case mbs of
240+
Nothing -> do
241+
bsWrite bs sl (hint, hint) (LedgerTables $ DiffMK $ fromMapInserts m)
242+
pure ()
243+
Just ((k, v), s'') -> go (n - 1) (Map.insert k v m) s''
244+
245+
sinkLsmS ::
246+
forall l m. (SerialiseKey (TxIn l), ResolveValue (LSMTxOut l), SerialiseValue (LSMTxOut l), MonadAsync m, MonadMVar m, MonadThrow (STM m), MonadMask m, MonadST m, MonadEvaluate m, HasLSMTxOut l) =>
247+
Proxy l ->
248+
Session m ->
249+
Stream (Of (TxIn l, TxOut l)) m () ->
250+
m ()
251+
sinkLsmS p session s =
252+
withTable session $ \(tb :: UTxOTable m l) -> go tb 1000 mempty s
253+
where
254+
go tb 0 m s' = do
255+
inserts tb $ V.fromList [ (k, toLSMTxOut p v, Nothing) | (k, v) <- m ]
256+
go tb 1000 mempty s'
257+
go tb n m s' = do
258+
mbs <- S.uncons s'
259+
case mbs of
260+
Nothing -> do
261+
inserts tb $ V.fromList [ (k, toLSMTxOut p v, Nothing) | (k, v) <- m ]
262+
pure ()
263+
Just (item, s'') -> go tb (n - 1) (item : m) s''
264+
265+
sinkInMemoryS ::
266+
(Monad m, MonadThrow m) =>
267+
Proxy l ->
268+
(TxIn l -> Encoding) ->
269+
(TxOut l -> Encoding) ->
270+
SomeHasFS m ->
271+
FilePath ->
272+
Stream (Of (TxIn l, TxOut l)) m () ->
273+
m ()
274+
sinkInMemoryS _ encK encV (SomeHasFS fs) fp s = do
275+
withFile fs (mkFsPath [fp]) (WriteMode MustBeNew) $ \hdl -> do
276+
hPutSome fs hdl $ toStrictByteString encodeMapLenIndef
277+
go hdl 1000 mempty s
278+
hPutSome fs hdl $ toStrictByteString encodeBreak
279+
pure ()
280+
where
281+
go tb 0 m s' = do
282+
hPutSome fs tb $ toStrictByteString $ foldl (<>) mempty [ encK k <> encV v | (k, v) <- m ]
283+
go tb 1000 mempty s'
284+
go tb n m s' = do
285+
mbs <- S.uncons s'
286+
case mbs of
287+
Nothing -> do
288+
hPutSome fs tb $ toStrictByteString $ foldl (<>) mempty [ encK k <> encV v | (k, v) <- m ]
289+
pure ()
290+
Just (item, s'') -> go tb (n - 1) (item : m) s''
291+
292+

0 commit comments

Comments
 (0)