Skip to content

Commit 8f3ca3c

Browse files
authored
Merge pull request #592 from IntersectMBO/mheinzel/merging-tree-data
Introduce MergingRunData and MergingTreeData
2 parents c8e3af6 + d78f451 commit 8f3ca3c

File tree

15 files changed

+983
-137
lines changed

15 files changed

+983
-137
lines changed

.hlint.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
- ignore: {name: "Redundant $!"}
2727
- ignore: {name: "Use shows"}
2828
- ignore: {name: "Use fmap"}
29+
- ignore: {name: "Use <=<"}
2930

3031
# Specify additional command line arguments
3132
#

bench/micro/Bench/Database/LSMTree/Internal/Merge.hs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
module Bench.Database.LSMTree.Internal.Merge (benchmarks) where
22

3-
import Control.Monad (zipWithM)
43
import Control.RefCount
54
import Criterion.Main (Benchmark, bench, bgroup)
65
import qualified Criterion.Main as Cr
@@ -27,6 +26,7 @@ import qualified Database.LSMTree.Internal.Run as Run
2726
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
2827
import Database.LSMTree.Internal.RunNumber
2928
import Database.LSMTree.Internal.Serialise
29+
import Database.LSMTree.Internal.UniqCounter
3030
import Prelude hiding (getContents)
3131
import System.Directory (removeDirectoryRecursive)
3232
import qualified System.FS.API as FS
@@ -268,11 +268,14 @@ merge fs hbio Config {..} targetPaths runs = do
268268
mergeType f targetPaths runs
269269
Merge.stepsToCompletion m stepSize
270270

271+
fsPath :: FS.FsPath
272+
fsPath = FS.mkFsPath []
273+
271274
outputRunPaths :: Run.RunFsPaths
272-
outputRunPaths = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
275+
outputRunPaths = RunFsPaths fsPath (RunNumber 0)
273276

274-
inputRunPaths :: [Run.RunFsPaths]
275-
inputRunPaths = RunFsPaths (FS.mkFsPath []) . RunNumber <$> [1..]
277+
inputRunPathsCounter :: IO (UniqCounter IO)
278+
inputRunPathsCounter = newUniqCounter 1 -- 0 is for output
276279

277280
type InputRuns = V.Vector (Ref (Run IO FS.HandleIO))
278281

@@ -384,17 +387,14 @@ randomRuns ::
384387
-> Config
385388
-> StdGen
386389
-> IO InputRuns
387-
randomRuns hasFS hasBlockIO config@Config {..} rng0 =
388-
V.fromList <$>
389-
zipWithM (unsafeFlushAsWriteBuffer hasFS hasBlockIO Index.Compact)
390-
inputRunPaths runsData
391-
where
392-
runsData :: [SerialisedRunData]
393-
runsData =
394-
zipWith
395-
(randomRunData config)
396-
nentries
397-
(List.unfoldr (Just . R.split) rng0)
390+
randomRuns hasFS hasBlockIO config@Config {..} rng0 = do
391+
counter <- inputRunPathsCounter
392+
fmap V.fromList $
393+
mapM (unsafeCreateRun hasFS hasBlockIO Index.Compact fsPath counter) $
394+
zipWith
395+
(randomRunData config)
396+
nentries
397+
(List.unfoldr (Just . R.split) rng0)
398398

399399
-- | Generate keys and entries to insert into the write buffer.
400400
-- They are already serialised to exclude the cost from the benchmark.

lsm-tree.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ library extras
309309
Database.LSMTree.Extras
310310
Database.LSMTree.Extras.Generators
311311
Database.LSMTree.Extras.Index
312+
Database.LSMTree.Extras.MergingRunData
313+
Database.LSMTree.Extras.MergingTreeData
312314
Database.LSMTree.Extras.NoThunks
313315
Database.LSMTree.Extras.Orphans
314316
Database.LSMTree.Extras.Random
@@ -333,6 +335,7 @@ library extras
333335
, lsm-tree:control
334336
, lsm-tree:kmerge
335337
, lsm-tree:prototypes
338+
, nonempty-containers
336339
, nothunks
337340
, primitive
338341
, QuickCheck
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
-- | Utilities for generating 'MergingRun's. Tests and benchmarks should
2+
-- preferably use these utilities instead of (re-)defining their own.
3+
module Database.LSMTree.Extras.MergingRunData (
4+
-- * Create merging runs
5+
withMergingRun
6+
, unsafeCreateMergingRun
7+
-- * MergingRunData
8+
, MergingRunData (..)
9+
, mergingRunDataMergeType
10+
, mergingRunDataInvariant
11+
, mapMergingRunData
12+
, SerialisedMergingRunData
13+
, serialiseMergingRunData
14+
-- * QuickCheck
15+
, labelMergingRunData
16+
, genMergingRunData
17+
, shrinkMergingRunData
18+
) where
19+
20+
import Control.Exception (bracket)
21+
import Control.RefCount
22+
import qualified Data.Vector as V
23+
import Database.LSMTree.Extras (showPowersOf)
24+
import Database.LSMTree.Extras.Generators ()
25+
import Database.LSMTree.Extras.RunData
26+
import Database.LSMTree.Internal.Index (IndexType)
27+
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
28+
import Database.LSMTree.Internal.MergingRun (MergingRun)
29+
import qualified Database.LSMTree.Internal.MergingRun as MR
30+
import Database.LSMTree.Internal.Paths
31+
import Database.LSMTree.Internal.Run (RunDataCaching (..))
32+
import qualified Database.LSMTree.Internal.Run as Run
33+
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
34+
import Database.LSMTree.Internal.RunNumber
35+
import Database.LSMTree.Internal.Serialise
36+
import Database.LSMTree.Internal.UniqCounter
37+
import qualified System.FS.API as FS
38+
import System.FS.API (HasFS)
39+
import System.FS.BlockIO.API (HasBlockIO)
40+
import Test.QuickCheck as QC
41+
42+
{-------------------------------------------------------------------------------
43+
Create merging runs
44+
-------------------------------------------------------------------------------}
45+
46+
-- | Create a temporary 'MergingRun' using 'unsafeCreateMergingRun'.
47+
withMergingRun ::
48+
MR.IsMergeType t
49+
=> HasFS IO h
50+
-> HasBlockIO IO h
51+
-> ResolveSerialisedValue
52+
-> IndexType
53+
-> FS.FsPath
54+
-> UniqCounter IO
55+
-> SerialisedMergingRunData t
56+
-> (Ref (MergingRun t IO h) -> IO a)
57+
-> IO a
58+
withMergingRun hfs hbio resolve indexType path counter mrd = do
59+
bracket
60+
(unsafeCreateMergingRun hfs hbio resolve indexType path counter mrd)
61+
releaseRef
62+
63+
-- | Flush serialised merging run data to disk.
64+
--
65+
-- This might leak resources if not run with asynchronous exceptions masked.
66+
-- Consider using 'withMergingRun' instead.
67+
--
68+
-- Use of this function should be paired with a 'releaseRef'.
69+
unsafeCreateMergingRun ::
70+
MR.IsMergeType t
71+
=> HasFS IO h
72+
-> HasBlockIO IO h
73+
-> ResolveSerialisedValue
74+
-> IndexType
75+
-> FS.FsPath
76+
-> UniqCounter IO
77+
-> SerialisedMergingRunData t
78+
-> IO (Ref (MergingRun t IO h))
79+
unsafeCreateMergingRun hfs hbio resolve indexType path counter = \case
80+
CompletedMergeData _ numRuns rd -> do
81+
withRun hfs hbio indexType path counter rd $ \run -> do
82+
-- slightly hacky, generally it's larger
83+
let totalDebt = MR.numEntriesToMergeDebt (Run.size run)
84+
MR.newCompleted numRuns totalDebt run
85+
86+
OngoingMergeData mergeType rds -> do
87+
withRuns hfs hbio indexType path counter (toRunData <$> rds)
88+
$ \runs -> do
89+
n <- incrUniqCounter counter
90+
let fsPaths = RunFsPaths path (RunNumber (uniqueToInt n))
91+
MR.new hfs hbio resolve CacheRunData (RunAllocFixed 10) indexType
92+
mergeType fsPaths (V.fromList runs)
93+
94+
{-------------------------------------------------------------------------------
95+
MergingRunData
96+
-------------------------------------------------------------------------------}
97+
98+
-- | A data structure suitable for creating arbitrary 'MergingRun's.
99+
--
100+
-- Note: 'b ~ Void' should rule out blobs.
101+
--
102+
-- Currently, ongoing merges are always \"fresh\", i.e. there is no merge work
103+
-- already performed.
104+
--
105+
-- TODO: Generate merge credits and supply them in 'unsafeCreateMergingRun',
106+
-- similarly to how @ScheduledMergesTest@ does it.
107+
data MergingRunData t k v b =
108+
CompletedMergeData t MR.NumRuns (RunData k v b)
109+
| OngoingMergeData t [NonEmptyRunData k v b] -- ^ at least 2 inputs
110+
deriving stock (Show, Eq)
111+
112+
mergingRunDataMergeType :: MergingRunData t k v b -> t
113+
mergingRunDataMergeType = \case
114+
CompletedMergeData mt _ _ -> mt
115+
OngoingMergeData mt _ -> mt
116+
117+
-- | See @mergeInvariant@ in the prototype.
118+
mergingRunDataInvariant :: MergingRunData t k v b -> Either String ()
119+
mergingRunDataInvariant = \case
120+
CompletedMergeData _ (MR.NumRuns n) _ ->
121+
assertI "completed merges are non-trivial (at least two inputs)" $
122+
n >= 2
123+
OngoingMergeData _ rds -> do
124+
assertI "ongoing merges are non-trivial (at least two inputs)" $
125+
length rds >= 2
126+
where
127+
assertI msg False = Left msg
128+
assertI _ True = Right ()
129+
130+
mapMergingRunData ::
131+
Ord k'
132+
=> (k -> k') -> (v -> v') -> (b -> b')
133+
-> MergingRunData t k v b -> MergingRunData t k' v' b'
134+
mapMergingRunData f g h = \case
135+
CompletedMergeData t n r ->
136+
CompletedMergeData t n $ mapRunData f g h r
137+
OngoingMergeData t rs ->
138+
OngoingMergeData t $ map (mapNonEmptyRunData f g h) rs
139+
140+
type SerialisedMergingRunData t =
141+
MergingRunData t SerialisedKey SerialisedValue SerialisedBlob
142+
143+
serialiseMergingRunData ::
144+
(SerialiseKey k, SerialiseValue v, SerialiseValue b)
145+
=> MergingRunData t k v b -> SerialisedMergingRunData t
146+
serialiseMergingRunData =
147+
mapMergingRunData serialiseKey serialiseValue serialiseBlob
148+
149+
{-------------------------------------------------------------------------------
150+
QuickCheck
151+
-------------------------------------------------------------------------------}
152+
153+
labelMergingRunData ::
154+
Show t => SerialisedMergingRunData t -> Property -> Property
155+
labelMergingRunData (CompletedMergeData mt _ rd) =
156+
tabulate "merging run state" ["CompletedMerge"]
157+
. tabulate "merge type" [show mt]
158+
. labelRunData rd
159+
labelMergingRunData (OngoingMergeData mt rds) =
160+
tabulate "merging run state" ["OngoingMerge"]
161+
. tabulate "merge type" [show mt]
162+
. tabulate "merging run inputs" [showPowersOf 2 (length rds)]
163+
. foldr ((.) . labelNonEmptyRunData) id rds
164+
165+
instance ( Arbitrary t, Ord k, Arbitrary k, Arbitrary v, Arbitrary b
166+
) => Arbitrary (MergingRunData t k v b) where
167+
arbitrary = genMergingRunData arbitrary arbitrary arbitrary arbitrary
168+
shrink = shrinkMergingRunData shrink shrink shrink
169+
170+
genMergingRunData ::
171+
Ord k
172+
=> Gen t
173+
-> Gen k
174+
-> Gen v
175+
-> Gen b
176+
-> Gen (MergingRunData t k v b)
177+
genMergingRunData genMergeType genKey genVal genBlob =
178+
QC.oneof
179+
[ do
180+
mt <- genMergeType
181+
numRuns <- MR.NumRuns <$> QC.chooseInt (2, 8)
182+
rd <- genRunData genKey genVal genBlob
183+
pure (CompletedMergeData mt numRuns rd)
184+
, do
185+
s <- QC.getSize
186+
mt <- genMergeType
187+
n <- QC.chooseInt (2, max 2 (s * 8 `div` 100)) -- 2 to 8
188+
rs <- QC.vectorOf n $
189+
-- Scaled, so overall number of entries is similar to a completed
190+
-- merge. However, the entries themselves should not be smaller.
191+
QC.scale (`div` n) $
192+
genNonEmptyRunData
193+
(resize s genKey)
194+
(resize s genVal)
195+
(resize s genBlob)
196+
pure (OngoingMergeData mt rs)
197+
]
198+
199+
shrinkMergingRunData ::
200+
Ord k
201+
=> (k -> [k])
202+
-> (v -> [v])
203+
-> (b -> [b])
204+
-> MergingRunData t k v b
205+
-> [MergingRunData t k v b]
206+
shrinkMergingRunData shrinkKey shrinkVal shrinkBlob = \case
207+
CompletedMergeData mt numRuns rd ->
208+
[ CompletedMergeData mt numRuns' rd'
209+
| (numRuns', rd') <-
210+
liftShrink2
211+
(fmap MR.NumRuns . filter (>= 2) . shrink . MR.unNumRuns)
212+
(shrinkRunData shrinkKey shrinkVal shrinkBlob)
213+
(numRuns, rd)
214+
]
215+
OngoingMergeData mt rds ->
216+
[ OngoingMergeData mt rds'
217+
| rds' <-
218+
liftShrink
219+
(shrinkNonEmptyRunData shrinkKey shrinkVal shrinkBlob)
220+
rds
221+
, length rds' >= 2
222+
]

0 commit comments

Comments
 (0)