Skip to content

Commit 1b3158b

Browse files
committed
Add incremental functionality for the ordinary index
1 parent 861c42d commit 1b3158b

File tree

3 files changed

+150
-7
lines changed

3 files changed

+150
-7
lines changed

lsm-tree.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ library
119119
Database.LSMTree.Internal.IndexCompact
120120
Database.LSMTree.Internal.IndexCompactAcc
121121
Database.LSMTree.Internal.IndexOrdinary
122+
Database.LSMTree.Internal.IndexOrdinaryAcc
122123
Database.LSMTree.Internal.Lookup
123124
Database.LSMTree.Internal.Managed
124125
Database.LSMTree.Internal.Merge

src/Database/LSMTree/Internal/Chunk.hs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import Prelude hiding (length)
1818

1919
import Control.Exception (assert)
2020
import Control.Monad.ST.Strict (ST)
21+
import Data.List (scanl')
2122
import Data.STRef.Strict (STRef, newSTRef, readSTRef, writeSTRef)
2223
import Data.Vector.Primitive (Vector, length, unsafeCopy,
2324
unsafeFreeze)
@@ -54,34 +55,51 @@ createBaler minChunkSize = assert (minChunkSize > 0) $
5455
newSTRef 0
5556

5657
{-|
57-
Feeds a baler a block of bytes.
58+
Feeds a baler blocks of bytes.
5859
5960
Bytes received by a baler are generally queued for later output, but if
6061
feeding new bytes makes the accumulated content exceed the minimum chunk
6162
size then a chunk containing all the accumulated content is output.
6263
-}
63-
feedBaler :: Vector Word8 -> Baler s -> ST s (Maybe Chunk)
64-
feedBaler block (Baler buffer remnantSizeRef) = do
64+
feedBaler :: forall s . [Vector Word8] -> Baler s -> ST s (Maybe Chunk)
65+
feedBaler blocks (Baler buffer remnantSizeRef) = do
6566
remnantSize <- readSTRef remnantSizeRef
6667
let
6768

69+
inputSize :: Int
70+
!inputSize = sum (map length blocks)
71+
6872
totalSize :: Int
69-
!totalSize = remnantSize + length block
73+
!totalSize = remnantSize + inputSize
7074

7175
if totalSize <= Mutable.length buffer
7276
then do
73-
unsafeCopy (Mutable.slice remnantSize (length block) buffer)
74-
block
77+
unsafeCopyBlocks (Mutable.drop remnantSize buffer)
7578
writeSTRef remnantSizeRef totalSize
7679
return Nothing
7780
else do
7881
protoChunk <- Mutable.unsafeNew totalSize
7982
Mutable.unsafeCopy (Mutable.take remnantSize protoChunk)
8083
(Mutable.take remnantSize buffer)
81-
unsafeCopy (Mutable.drop remnantSize protoChunk) block
84+
unsafeCopyBlocks (Mutable.drop remnantSize protoChunk)
8285
writeSTRef remnantSizeRef 0
8386
chunk <- Chunk <$> unsafeFreeze protoChunk
8487
return (Just chunk)
88+
where
89+
90+
unsafeCopyBlocks :: MVector s Word8 -> ST s ()
91+
unsafeCopyBlocks vec
92+
= sequence_ $
93+
zipWith3 (\ start size block -> unsafeCopy
94+
(Mutable.slice start size vec)
95+
block)
96+
(scanl' (+) 0 blockSizes)
97+
blockSizes
98+
blocks
99+
where
100+
101+
blockSizes :: [Int]
102+
blockSizes = map length blocks
85103

86104
{-|
87105
Returns the bytes still queued in a baler, if any, thereby invalidating the
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
{- HLINT ignore "Avoid restricted alias" -}
2+
3+
{-|
4+
Incremental construction functionality for the general-purpose fence pointer
5+
index.
6+
-}
7+
module Database.LSMTree.Internal.IndexOrdinaryAcc
8+
(
9+
IndexOrdinaryAcc,
10+
new,
11+
append,
12+
unsafeEnd
13+
)
14+
where
15+
16+
import Prelude hiding (take)
17+
18+
import Control.Exception (assert)
19+
import Control.Monad.ST.Strict (ST, runST)
20+
import Data.Primitive.ByteArray (newByteArray, unsafeFreezeByteArray,
21+
writeByteArray)
22+
import Data.STRef.Strict (STRef, newSTRef, readSTRef, writeSTRef)
23+
import Data.Vector (force, take, unsafeFreeze)
24+
import Data.Vector.Mutable (MVector)
25+
import qualified Data.Vector.Mutable as Mutable (unsafeNew, write)
26+
import qualified Data.Vector.Primitive as Primitive (Vector, length)
27+
import Data.Word (Word16, Word8)
28+
import Database.LSMTree.Internal.Chunk (Baler, Chunk, createBaler,
29+
feedBaler, unsafeEndBaler)
30+
import Database.LSMTree.Internal.IndexCompactAcc
31+
(Append (AppendMultiPage, AppendSinglePage))
32+
import Database.LSMTree.Internal.IndexOrdinary
33+
(IndexOrdinary (IndexOrdinary))
34+
import Database.LSMTree.Internal.Serialise
35+
(SerialisedKey (SerialisedKey'))
36+
import Database.LSMTree.Internal.Vector (mkPrimVector)
37+
38+
{-|
39+
A general-purpose fence pointer index under incremental construction.
40+
41+
A value @IndexOrdinaryAcc buffer keyCountRef baler@ denotes a partially
42+
constructed index with the following properties:
43+
44+
* The keys that the index assigns to pages are stored as a prefix of the
45+
mutable vector @buffer@.
46+
* The reference @keyCountRef@ points to the number of those keys.
47+
* The @baler@ object is used by the index for incremental output of the
48+
serialised key list.
49+
-}
50+
data IndexOrdinaryAcc s = IndexOrdinaryAcc
51+
!(MVector s SerialisedKey)
52+
!(STRef s Int)
53+
!(Baler s)
54+
55+
-- | Creates a new, initially empty, index.
56+
new :: Int -- ^ Maximum number of keys
57+
-> Int -- ^ Minimum chunk size in bytes
58+
-> ST s (IndexOrdinaryAcc s) -- ^ Construction of the index
59+
new maxKeyCount minChunkSize = assert (maxKeyCount >= 0) $
60+
IndexOrdinaryAcc <$>
61+
Mutable.unsafeNew maxKeyCount <*>
62+
newSTRef 0 <*>
63+
createBaler minChunkSize
64+
65+
{-|
66+
Appends keys to the key list of an index and outputs newly available chunks
67+
of the serialised key list.
68+
69+
__Warning:__ Appending keys whose length cannot be represented by a 16-bit
70+
word may result in a corrupted serialised key list.
71+
-}
72+
append :: Append -> IndexOrdinaryAcc s -> ST s (Maybe Chunk)
73+
append instruction (IndexOrdinaryAcc buffer keyCountRef baler)
74+
= case instruction of
75+
AppendSinglePage _ key -> do
76+
keyCount <- readSTRef keyCountRef
77+
Mutable.write buffer keyCount key
78+
writeSTRef keyCountRef $! succ keyCount
79+
feedBaler (keyListElem key) baler
80+
AppendMultiPage key overflowPageCount -> do
81+
keyCount <- readSTRef keyCountRef
82+
let
83+
84+
pageCount :: Int
85+
!pageCount = succ (fromIntegral overflowPageCount)
86+
87+
keyCount' :: Int
88+
!keyCount' = keyCount + pageCount
89+
90+
mapM_ (flip (Mutable.write buffer) key)
91+
[keyCount .. pred keyCount']
92+
writeSTRef keyCountRef $! keyCount'
93+
feedBaler (concat (replicate pageCount (keyListElem key))) baler
94+
where
95+
96+
keyListElem :: SerialisedKey -> [Primitive.Vector Word8]
97+
keyListElem (SerialisedKey' keyBytes) = [keySizeBytes, keyBytes] where
98+
99+
keySize :: Int
100+
!keySize = Primitive.length keyBytes
101+
102+
keySizeAsWord16 :: Word16
103+
!keySizeAsWord16 = assert (keySize <= fromIntegral (maxBound :: Word16)) $
104+
fromIntegral keySize
105+
106+
keySizeBytes :: Primitive.Vector Word8
107+
!keySizeBytes = mkPrimVector 0 2 $
108+
runST $ do
109+
rep <- newByteArray 2
110+
writeByteArray rep 0 keySizeAsWord16
111+
unsafeFreezeByteArray rep
112+
113+
{-|
114+
Returns the constructed index, along with a final chunk in case the
115+
serialised key list has not been fully output yet, thereby invalidating the
116+
index under construction. Executing @unsafeEnd index@ is only safe when
117+
@index@ is not used afterwards.
118+
-}
119+
unsafeEnd :: IndexOrdinaryAcc s -> ST s (Maybe Chunk, IndexOrdinary)
120+
unsafeEnd (IndexOrdinaryAcc buffer keyCountRef baler) = do
121+
keyCount <- readSTRef keyCountRef
122+
keys <- force <$> take keyCount <$> unsafeFreeze buffer
123+
remnant <- unsafeEndBaler baler
124+
return (remnant, IndexOrdinary keys)

0 commit comments

Comments
 (0)