Skip to content

Commit 131e12b

Browse files
committed
Simplify yieldCborMapS
1 parent ee1d264 commit 131e12b

File tree

1 file changed

+29
-112
lines changed

1 file changed

+29
-112
lines changed

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/StreamingLedgerTables.hs

Lines changed: 29 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,34 @@
88
module Ouroboros.Consensus.Util.StreamingLedgerTables where
99

1010
import Cardano.Slotting.Slot
11-
import Codec.CBOR.Decoding (Decoder)
11+
import Codec.CBOR.Decoding (Decoder, decodeBreakOr, decodeMapLenIndef)
1212
import Codec.CBOR.Encoding (Encoding, encodeBreak, encodeMapLenIndef)
13-
import Codec.CBOR.FlatTerm
1413
import Codec.CBOR.Read
1514
import Codec.CBOR.Write
1615
import Control.Concurrent.Class.MonadMVar
16+
import Control.Monad (unless)
1717
import Control.Monad.Class.MonadAsync
1818
import Control.Monad.Class.MonadST
1919
import Control.Monad.Class.MonadSTM
2020
import Control.Monad.Class.MonadThrow
21+
import Control.Monad.Except
22+
import Control.Monad.State.Strict
2123
import Data.ByteString (ByteString)
2224
import qualified Data.ByteString as BS
2325
import Data.ByteString.Builder.Extra (defaultChunkSize)
2426
import qualified Data.Map.Strict as Map
25-
import Data.MemPack
2627
import Data.Proxy
2728
import qualified Data.Set as Set
2829
import qualified Data.Vector as V
2930
import Database.LSMTree
3031
import Ouroboros.Consensus.Ledger.Abstract
31-
import Ouroboros.Consensus.Ledger.Tables
3232
import Ouroboros.Consensus.Ledger.Tables.Diff
3333
import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API
3434
import Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
3535
import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq
3636
import Ouroboros.Network.Block
3737
import Streaming
3838
import qualified Streaming as S
39-
import qualified Streaming.Internal as SI
4039
import qualified Streaming.Prelude as S
4140
import System.FS.API
4241

@@ -65,120 +64,38 @@ noRemainingBytes s =
6564
Just (BS.null -> True, s') -> noRemainingBytes s'
6665
Just _ -> error "Remaining bytes!"
6766

67+
decodeCbor ::
68+
(MonadST m, MonadError DeserialiseFailure m) =>
69+
(forall s. Decoder s a) ->
70+
StateT (Stream (Of ByteString) m r) m a
71+
decodeCbor dec =
72+
StateT $ \s -> go s =<< stToIO (deserialiseIncremental dec)
73+
where
74+
go s = \case
75+
Partial k ->
76+
S.next s >>= \case
77+
Right (bs, s') -> go s' =<< stToIO (k (Just bs))
78+
Left r -> go (pure r) =<< stToIO (k Nothing)
79+
Done bs _off a -> pure (a, S.yield bs *> s)
80+
Fail _bs _off err -> throwError err
81+
6882
yieldCborMapS ::
6983
forall m a b.
70-
MonadST m =>
84+
(MonadST m, MonadError DeserialiseFailure m) =>
7185
(forall s. Decoder s a) ->
7286
(forall s. Decoder s b) ->
7387
Stream (Of ByteString) m () ->
7488
Stream (Of (a, b)) m (Stream (Of ByteString) m ())
75-
yieldCborMapS decK decV s = do
76-
k <- S.lift $ stToIO (deserialiseIncremental decK)
77-
mbs <- S.lift (S.uncons s)
78-
case mbs of
79-
Nothing -> error "Empty stream of bytes"
80-
Just (bs, s') ->
81-
case deserialiseFromBytes decodeTermToken (BS.fromStrict bs) of
82-
Left err -> error $ show err
83-
Right (bs', TkMapLen n) -> go (Just n) (Left k) $ Right (BS.toStrict bs', s')
84-
Right (bs', TkMapBegin) -> go Nothing (Left k) $ Right (BS.toStrict bs', s')
85-
_ -> error "Not a map!"
89+
yieldCborMapS decK decV = execStateT $ do
90+
hoist lift $ decodeCbor decodeMapLenIndef
91+
go
8692
where
87-
go (Just 0) k mbs = case mbs of
88-
Left s' -> pure s'
89-
Right (bs, s') -> pure (S.yield bs *> s')
90-
go remainingItems k mbs = case (k, mbs) of
91-
-- We have a partial decoding, awaiting for a bytestring
92-
93-
-- We have read a bytestring from the stream
94-
(Left (Partial kont), Right (bs, s')) -> do
95-
k' <- S.lift $ stToIO $ kont $ Just bs
96-
case k' of
97-
-- after running the kontinuation, we still require more input,
98-
-- then read again from the stream
99-
Partial{} -> go remainingItems (Left k') . maybeToEither s' =<< S.lift (S.uncons s')
100-
-- We were done with the previous bytestring, so let's
101-
-- recurse without reading more.
102-
_ -> go remainingItems (Left k') (Left s')
103-
104-
-- We are in a partial reading, but we were unable to read more
105-
-- input, so we call `kont` with `Nothing` which will fail.
106-
(Left (Partial kont), Left s') -> do
107-
k' <- S.lift $ stToIO $ kont Nothing
108-
go remainingItems (Left k') (Left s')
109-
110-
-- We have read a bytestring from the stream
111-
(Right (valK, Partial kont), Right (bs, s')) -> do
112-
k' <- S.lift $ stToIO $ kont $ Just bs
113-
case k' of
114-
-- after running the kontinuation, we still require more input,
115-
-- then read again from the stream
116-
Partial{} -> go remainingItems (Right (valK, k')) . maybeToEither s' =<< S.lift (S.uncons s')
117-
-- We were done with the previous bytestring, so let's
118-
-- recurse without reading more.
119-
_ -> go remainingItems (Right (valK, k')) (Left s')
120-
121-
-- We are in a partial reading, but we were unable to read more
122-
-- input, so we call `kont` with `Nothing` which will fail.
123-
(Right (valK, Partial kont), Left s') -> do
124-
k' <- S.lift $ stToIO $ kont Nothing
125-
go remainingItems (Right (valK, k')) (Left s')
126-
127-
-- We completed a read
128-
(Left (Done unused _offset val), Left s') -> do
129-
if BS.null unused
130-
then
131-
-- We have no unused bytes, so read another chunk
132-
S.lift (S.uncons s') >>= \case
133-
-- If there is no more input, fail because we were expecting a value!
134-
Nothing -> error "No value!"
135-
-- Recurse if there is more input
136-
Just mbs' -> do
137-
k' <- S.lift $ stToIO (deserialiseIncremental decV)
138-
go remainingItems (Right (val, k')) $ Right mbs'
139-
else do
140-
-- We still have unused bytes, so use those before reading
141-
-- again.
142-
k' <- S.lift $ stToIO (deserialiseIncremental decV)
143-
go remainingItems (Right (val, k')) (Right (unused, s'))
144-
145-
-- We completed a read
146-
(Right (valK, Done unused _offset val), Left s') -> do
147-
-- yield the pair
148-
S.yield (valK, val)
149-
case remainingItems of
150-
Just 1 -> pure (S.yield unused *> s')
151-
_ -> do
152-
k' <- S.lift $ stToIO (deserialiseIncremental decK)
153-
if BS.null unused
154-
then
155-
-- We have no unused bytes, so read another chunk
156-
S.lift (S.uncons s') >>= \case
157-
-- If there is no more input, then we are done!
158-
Nothing ->
159-
case remainingItems of
160-
Just n -> error $ "Missing " ++ show (n - 1) ++ " items!"
161-
Nothing -> error "Missing a break!"
162-
-- Recurse if there is more input
163-
Just mbs' -> do
164-
go ((\x -> x - 1) <$> remainingItems) (Left k') $ Right mbs'
165-
else do
166-
-- We still have unused bytes, so use those before reading
167-
-- again.
168-
go ((\x -> x - 1) <$> remainingItems) (Left k') (Right (unused, s'))
169-
(Left (Done _ _ _), Right _) -> error "unreachable!"
170-
(Right (_, Done _ _ _), Right _) -> error "unreachable!"
171-
(Left Fail{}, Right{}) -> error "unreachable!"
172-
(Right (_, Fail{}), Right{}) -> error "unreachable!"
173-
(Left (Fail bs _ err), Left s') ->
174-
case remainingItems of
175-
Nothing -> case deserialiseFromBytes decodeTermToken (BS.fromStrict bs) of
176-
Right (bs', TkBreak) -> pure (S.yield (BS.toStrict bs') *> s')
177-
_ -> error "Break not found!"
178-
_ ->
179-
error $ show err
180-
(Right (_, Fail bs _ err), _) ->
181-
error $ show err
93+
go = do
94+
doBreak <- hoist lift $ decodeCbor decodeBreakOr
95+
unless doBreak $ do
96+
kv <- hoist lift $ decodeCbor $ (,) <$> decK <*> decV
97+
lift $ S.yield kv
98+
go
18299

183100
maybeToEither :: a -> Maybe b -> Either a b
184101
maybeToEither _ (Just b) = Right b

0 commit comments

Comments
 (0)