Skip to content

Commit 4f48704

Browse files
committed
move Cursor read logic into separate module
1 parent 23939c1 commit 4f48704

File tree

3 files changed

+146
-116
lines changed

3 files changed

+146
-116
lines changed

lsm-tree.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ library
126126
Database.LSMTree.Internal.Chunk
127127
Database.LSMTree.Internal.Config
128128
Database.LSMTree.Internal.CRC32C
129+
Database.LSMTree.Internal.Cursor
129130
Database.LSMTree.Internal.Entry
130131
Database.LSMTree.Internal.IndexCompact
131132
Database.LSMTree.Internal.IndexCompactAcc

src/Database/LSMTree/Internal.hs

Lines changed: 13 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
{-# LANGUAGE DataKinds #-}
22

3+
-- | This module brings together the internal parts to provide an API in terms
4+
-- of untyped serialised keys, values and blobs. It makes no distinction between
5+
-- normal and monoidal tables, accepting both blobs and mupserts.
6+
-- The typed [normal]("Database.LSMTree.Normal") and
7+
-- [monoidal]("Database.LSMTree.Monoidal") APIs then provide more type-safe
8+
-- wrappers and handle serialisation.
9+
--
10+
-- Apart from defining the API, this module mainly deals with concurrency- and
11+
-- exception-safe opening and closing of resources. Any other non-trivial logic
12+
-- should live somewhere else.
13+
--
314
module Database.LSMTree.Internal (
415
-- * Existentials
516
Session' (..)
@@ -84,8 +95,8 @@ import Data.Word (Word64)
8495
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
8596
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
8697
import Database.LSMTree.Internal.Config
98+
import qualified Database.LSMTree.Internal.Cursor as Cursor
8799
import Database.LSMTree.Internal.Entry (Entry)
88-
import qualified Database.LSMTree.Internal.Entry as Entry
89100
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
90101
ResolveSerialisedValue, lookupsIO)
91102
import Database.LSMTree.Internal.MergeSchedule
@@ -96,14 +107,12 @@ import Database.LSMTree.Internal.Range (Range (..))
96107
import qualified Database.LSMTree.Internal.RawBytes as RB
97108
import Database.LSMTree.Internal.Run (Run)
98109
import qualified Database.LSMTree.Internal.Run as Run
99-
import qualified Database.LSMTree.Internal.RunReader as Reader
100110
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
101111
import qualified Database.LSMTree.Internal.RunReaders as Readers
102112
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
103113
SerialisedKey, SerialisedValue)
104114
import Database.LSMTree.Internal.Snapshot
105115
import Database.LSMTree.Internal.UniqCounter
106-
import qualified Database.LSMTree.Internal.Vector as V
107116
import qualified Database.LSMTree.Internal.WriteBuffer as WB
108117
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
109118
import qualified System.FS.API as FS
@@ -855,8 +864,6 @@ retrieveBlobs sesh wrefs =
855864
Cursors
856865
-------------------------------------------------------------------------------}
857866

858-
-- TODO: Move to a separate Cursors module
859-
860867
-- | A read-only view into the table state at the time of cursor creation.
861868
--
862869
-- For more information, see 'Database.LSMTree.Normal.Cursor'.
@@ -1059,123 +1066,13 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
10591066
-- a drained cursor will just return an empty vector
10601067
return (state, V.empty)
10611068
Just readers -> do
1062-
(vec, hasMore) <- readCursorEntriesWhile resolve keyIsWanted fromEntry readers n
1069+
(vec, hasMore) <- Cursor.readEntriesWhile resolve keyIsWanted fromEntry readers n
10631070
-- if we drained the readers, remove them from the state
10641071
let !state' = case hasMore of
10651072
Readers.HasMore -> state
10661073
Readers.Drained -> CursorOpen (cursorEnv {cursorReaders = Nothing})
10671074
return (state', vec)
10681075

1069-
{-# INLINE readCursorEntriesWhile #-}
1070-
{-# SPECIALISE readCursorEntriesWhile :: forall h res.
1071-
ResolveSerialisedValue
1072-
-> (SerialisedKey -> Bool)
1073-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
1074-
-> Readers.Readers IO h
1075-
-> Int
1076-
-> IO (V.Vector res, Readers.HasMore) #-}
1077-
-- | General notes on the code below:
1078-
-- * it is quite similar to the one in Internal.Merge, but different enough
1079-
-- that it's probably easier to keep them separate
1080-
-- * any function that doesn't take a 'hasMore' argument assumes that the
1081-
-- readers have not been drained yet, so we must check before calling them
1082-
-- * there is probably opportunity for optimisations
1083-
readCursorEntriesWhile :: forall h m res.
1084-
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
1085-
=> ResolveSerialisedValue
1086-
-> (SerialisedKey -> Bool)
1087-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
1088-
-> Readers.Readers m h
1089-
-> Int
1090-
-> m (V.Vector res, Readers.HasMore)
1091-
readCursorEntriesWhile resolve keyIsWanted fromEntry readers n =
1092-
flip (V.unfoldrNM' n) Readers.HasMore $ \case
1093-
Readers.Drained -> return (Nothing, Readers.Drained)
1094-
Readers.HasMore -> readEntryIfWanted
1095-
where
1096-
-- Produces a result unless the readers have been drained or 'keyIsWanted'
1097-
-- returned False.
1098-
readEntryIfWanted :: m (Maybe res, Readers.HasMore)
1099-
readEntryIfWanted = do
1100-
key <- Readers.peekKey readers
1101-
if keyIsWanted key then readEntry
1102-
else return (Nothing, Readers.HasMore)
1103-
1104-
readEntry :: m (Maybe res, Readers.HasMore)
1105-
readEntry = do
1106-
(key, readerEntry, hasMore) <- Readers.pop readers
1107-
let !entry = Reader.toFullEntry readerEntry
1108-
case hasMore of
1109-
Readers.Drained -> do
1110-
handleResolved key entry Readers.Drained
1111-
Readers.HasMore -> do
1112-
case entry of
1113-
Entry.Mupdate v ->
1114-
handleMupdate key v
1115-
_ -> do
1116-
-- Anything but Mupdate supersedes all previous entries of
1117-
-- the same key, so we can simply drop them and are done.
1118-
hasMore' <- dropRemaining key
1119-
handleResolved key entry hasMore'
1120-
1121-
dropRemaining :: SerialisedKey -> m Readers.HasMore
1122-
dropRemaining key = do
1123-
(_, hasMore) <- Readers.dropWhileKey readers key
1124-
return hasMore
1125-
1126-
-- Resolve a 'Mupsert' value with the other entries of the same key.
1127-
handleMupdate :: SerialisedKey
1128-
-> SerialisedValue
1129-
-> m (Maybe res, Readers.HasMore)
1130-
handleMupdate key v = do
1131-
nextKey <- Readers.peekKey readers
1132-
if nextKey /= key
1133-
then
1134-
-- No more entries for same key, done.
1135-
handleResolved key (Entry.Mupdate v) Readers.HasMore
1136-
else do
1137-
(_, nextEntry, hasMore) <- Readers.pop readers
1138-
let resolved = Entry.combine resolve (Entry.Mupdate v)
1139-
(Reader.toFullEntry nextEntry)
1140-
case hasMore of
1141-
Readers.HasMore -> case resolved of
1142-
Entry.Mupdate v' ->
1143-
-- Still a mupsert, keep resolving!
1144-
handleMupdate key v'
1145-
_ -> do
1146-
-- Done with this key, remaining entries are obsolete.
1147-
hasMore' <- dropRemaining key
1148-
handleResolved key resolved hasMore'
1149-
Readers.Drained -> do
1150-
handleResolved key resolved Readers.Drained
1151-
1152-
-- Once we have a resolved entry, we still have to make sure it's not
1153-
-- a 'Delete', since we only want to write values to the result vector.
1154-
handleResolved :: SerialisedKey
1155-
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
1156-
-> Readers.HasMore
1157-
-> m (Maybe res, Readers.HasMore)
1158-
handleResolved key entry hasMore =
1159-
case toResult key entry of
1160-
Just !res ->
1161-
-- Found one resolved value, done.
1162-
return (Just res, hasMore)
1163-
Nothing ->
1164-
-- Resolved value was a Delete, which we don't want to include.
1165-
-- So look for another one (unless there are no more entries!).
1166-
case hasMore of
1167-
Readers.HasMore -> readEntryIfWanted
1168-
Readers.Drained -> return (Nothing, Readers.Drained)
1169-
1170-
toResult :: SerialisedKey
1171-
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
1172-
-> Maybe res
1173-
toResult key = \case
1174-
Entry.Insert v -> Just $ fromEntry key v Nothing
1175-
Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b))
1176-
Entry.Mupdate v -> Just $ fromEntry key v Nothing
1177-
Entry.Delete -> Nothing
1178-
11791076
{-------------------------------------------------------------------------------
11801077
Snapshots
11811078
-------------------------------------------------------------------------------}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
{-# LANGUAGE DataKinds #-}
2+
3+
module Database.LSMTree.Internal.Cursor (
4+
readEntriesWhile
5+
) where
6+
7+
import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
8+
import Control.Monad.Class.MonadST (MonadST (..))
9+
import Control.Monad.Class.MonadThrow
10+
import Control.Monad.Fix (MonadFix)
11+
import qualified Data.Vector as V
12+
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
13+
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
14+
import Database.LSMTree.Internal.Entry (Entry)
15+
import qualified Database.LSMTree.Internal.Entry as Entry
16+
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
17+
import qualified Database.LSMTree.Internal.RunReader as Reader
18+
import qualified Database.LSMTree.Internal.RunReaders as Readers
19+
import Database.LSMTree.Internal.Serialise (SerialisedKey,
20+
SerialisedValue)
21+
import qualified Database.LSMTree.Internal.Vector as V
22+
import System.FS.API (Handle)
23+
24+
{-# INLINE readEntriesWhile #-}
25+
{-# SPECIALISE readEntriesWhile :: forall h res.
26+
ResolveSerialisedValue
27+
-> (SerialisedKey -> Bool)
28+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
29+
-> Readers.Readers IO h
30+
-> Int
31+
-> IO (V.Vector res, Readers.HasMore) #-}
32+
-- | General notes on the code below:
33+
-- * it is quite similar to the one in Internal.Merge, but different enough
34+
-- that it's probably easier to keep them separate
35+
-- * any function that doesn't take a 'hasMore' argument assumes that the
36+
-- readers have not been drained yet, so we must check before calling them
37+
-- * there is probably opportunity for optimisations
38+
readEntriesWhile :: forall h m res.
39+
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
40+
=> ResolveSerialisedValue
41+
-> (SerialisedKey -> Bool)
42+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
43+
-> Readers.Readers m h
44+
-> Int
45+
-> m (V.Vector res, Readers.HasMore)
46+
readEntriesWhile resolve keyIsWanted fromEntry readers n =
47+
flip (V.unfoldrNM' n) Readers.HasMore $ \case
48+
Readers.Drained -> return (Nothing, Readers.Drained)
49+
Readers.HasMore -> readEntryIfWanted
50+
where
51+
-- Produces a result unless the readers have been drained or 'keyIsWanted'
52+
-- returned False.
53+
readEntryIfWanted :: m (Maybe res, Readers.HasMore)
54+
readEntryIfWanted = do
55+
key <- Readers.peekKey readers
56+
if keyIsWanted key then readEntry
57+
else return (Nothing, Readers.HasMore)
58+
59+
readEntry :: m (Maybe res, Readers.HasMore)
60+
readEntry = do
61+
(key, readerEntry, hasMore) <- Readers.pop readers
62+
let !entry = Reader.toFullEntry readerEntry
63+
case hasMore of
64+
Readers.Drained -> do
65+
handleResolved key entry Readers.Drained
66+
Readers.HasMore -> do
67+
case entry of
68+
Entry.Mupdate v ->
69+
handleMupdate key v
70+
_ -> do
71+
-- Anything but Mupdate supersedes all previous entries of
72+
-- the same key, so we can simply drop them and are done.
73+
hasMore' <- dropRemaining key
74+
handleResolved key entry hasMore'
75+
76+
dropRemaining :: SerialisedKey -> m Readers.HasMore
77+
dropRemaining key = do
78+
(_, hasMore) <- Readers.dropWhileKey readers key
79+
return hasMore
80+
81+
-- Resolve a 'Mupsert' value with the other entries of the same key.
82+
handleMupdate :: SerialisedKey
83+
-> SerialisedValue
84+
-> m (Maybe res, Readers.HasMore)
85+
handleMupdate key v = do
86+
nextKey <- Readers.peekKey readers
87+
if nextKey /= key
88+
then
89+
-- No more entries for same key, done.
90+
handleResolved key (Entry.Mupdate v) Readers.HasMore
91+
else do
92+
(_, nextEntry, hasMore) <- Readers.pop readers
93+
let resolved = Entry.combine resolve (Entry.Mupdate v)
94+
(Reader.toFullEntry nextEntry)
95+
case hasMore of
96+
Readers.HasMore -> case resolved of
97+
Entry.Mupdate v' ->
98+
-- Still a mupsert, keep resolving!
99+
handleMupdate key v'
100+
_ -> do
101+
-- Done with this key, remaining entries are obsolete.
102+
hasMore' <- dropRemaining key
103+
handleResolved key resolved hasMore'
104+
Readers.Drained -> do
105+
handleResolved key resolved Readers.Drained
106+
107+
-- Once we have a resolved entry, we still have to make sure it's not
108+
-- a 'Delete', since we only want to write values to the result vector.
109+
handleResolved :: SerialisedKey
110+
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
111+
-> Readers.HasMore
112+
-> m (Maybe res, Readers.HasMore)
113+
handleResolved key entry hasMore =
114+
case toResult key entry of
115+
Just !res ->
116+
-- Found one resolved value, done.
117+
return (Just res, hasMore)
118+
Nothing ->
119+
-- Resolved value was a Delete, which we don't want to include.
120+
-- So look for another one (unless there are no more entries!).
121+
case hasMore of
122+
Readers.HasMore -> readEntryIfWanted
123+
Readers.Drained -> return (Nothing, Readers.Drained)
124+
125+
toResult :: SerialisedKey
126+
-> Entry SerialisedValue (BlobRef.BlobRef m (Handle h))
127+
-> Maybe res
128+
toResult key = \case
129+
Entry.Insert v -> Just $ fromEntry key v Nothing
130+
Entry.InsertWithBlob v b -> Just $ fromEntry key v (Just (WeakBlobRef b))
131+
Entry.Mupdate v -> Just $ fromEntry key v Nothing
132+
Entry.Delete -> Nothing

0 commit comments

Comments
 (0)