Skip to content

Commit 3e6bd8d

Browse files
authored
Merge pull request #536 from IntersectMBO/mheinzel/merge-union
Implement MergeUnion merges
2 parents fea1896 + 0ce66aa commit 3e6bd8d

File tree

5 files changed

+175
-22
lines changed

5 files changed

+175
-22
lines changed

bench/micro/Bench/Database/LSMTree/Internal/Merge.hs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,39 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [
8484
randomWord64OutOf (totalEntries `div` 2)
8585
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
8686
}
87+
-- different merge types
8788
, benchMerge configWord64
88-
{ name = "word64-mix-x4"
89+
{ name = "word64-mix-collisions-x4-midlevel"
8990
, nentries = totalEntries `splitInto` 4
9091
, finserts = 1
9192
, fdeletes = 1
9293
, fmupserts = 1
94+
, randomKey = -- each run uses half of the possible keys
95+
randomWord64OutOf (totalEntries `div` 2)
96+
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
97+
, mergeType = MergeMidLevel
98+
}
99+
, benchMerge configWord64
100+
{ name = "word64-mix-collisions-x4-lastlevel"
101+
, nentries = totalEntries `splitInto` 4
102+
, finserts = 1
103+
, fdeletes = 1
104+
, fmupserts = 1
105+
, randomKey = -- each run uses half of the possible keys
106+
randomWord64OutOf (totalEntries `div` 2)
93107
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
108+
, mergeType = MergeLastLevel
94109
}
95110
, benchMerge configWord64
96-
{ name = "word64-mix-collisions-x4"
111+
{ name = "word64-mix-collisions-x4-union"
97112
, nentries = totalEntries `splitInto` 4
98113
, finserts = 1
99114
, fdeletes = 1
100115
, fmupserts = 1
101116
, randomKey = -- each run uses half of the possible keys
102117
randomWord64OutOf (totalEntries `div` 2)
103118
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
119+
, mergeType = MergeUnion
104120
}
105121
-- not writing anything at all
106122
, benchMerge configWord64
@@ -174,6 +190,20 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [
174190
, fdeletes = 1
175191
, mergeType = MergeLastLevel
176192
}
193+
, benchMerge configUTxOStaking
194+
{ name = "utxo-x2-tree-union" -- binary union merge
195+
, nentries = totalEntries `distributed` [4, 1]
196+
, mergeType = MergeUnion
197+
}
198+
, benchMerge configUTxOStaking
199+
{ name = "utxo-x10-tree-level" -- merge a whole table (for union)
200+
, nentries = totalEntries `distributed` [ 1, 1, 1
201+
, 4, 4, 4
202+
, 16, 16, 16
203+
, 100 -- last level
204+
]
205+
, mergeType = MergeLastLevel
206+
}
177207
]
178208
where
179209
totalEntries = 50_000
@@ -311,6 +341,14 @@ configUTxO = defaultConfig {
311341
, randomValue = first serialiseValue . uniform @_ @UTxOValue
312342
}
313343

344+
configUTxOStaking :: Config
345+
configUTxOStaking = defaultConfig {
346+
fmupserts = 1
347+
, randomKey = first serialiseKey . uniform @_ @UTxOKey
348+
, randomValue = first serialiseValue . uniform @_ @Word64
349+
, mergeMappend = Just (onDeserialisedValues ((+) @Word64))
350+
}
351+
314352
mergeEnv ::
315353
Config
316354
-> IO ( FilePath -- ^ Temporary directory

src-extras/Database/LSMTree/Extras/Generators.hs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -556,8 +556,7 @@ instance Arbitrary BlobSpan where
556556
-------------------------------------------------------------------------------}
557557

558558
instance Arbitrary MergeType where
559-
arbitrary = QC.elements [MergeMidLevel, MergeLastLevel]
560-
-- TODO: add MergeUnion once it is supported.
559+
arbitrary = QC.elements [MergeMidLevel, MergeLastLevel, MergeUnion]
561560
shrink MergeMidLevel = []
562561
shrink MergeLastLevel = [MergeMidLevel]
563562
shrink MergeUnion = [MergeLastLevel]

src/Database/LSMTree/Internal/Entry.hs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module Database.LSMTree.Internal.Entry (
77
, unNumEntries
88
-- * Value resolution/merging
99
, combine
10+
, combineUnion
1011
, combineMaybe
1112
) where
1213

@@ -105,6 +106,24 @@ combine f (Mupdate u) (Insert v) = Insert (f u v)
105106
combine f (Mupdate u) (InsertWithBlob v blob) = InsertWithBlob (f u v) blob
106107
combine f (Mupdate u) (Mupdate v) = Mupdate (f u v)
107108

109+
-- | Combine two entries of runs that have been 'union'ed together. If any one
110+
-- has a value, the result should have a value (represented by 'Insert'). If
111+
-- both have a value, these values get combined monoidally.
112+
combineUnion :: (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
113+
combineUnion f = go
114+
where
115+
go Delete e = e
116+
go e Delete = e
117+
go (Insert u) (Insert v) = Insert (f u v)
118+
go (Insert u) (InsertWithBlob v b) = InsertWithBlob (f u v) b
119+
go (Insert u) (Mupdate v) = Insert (f u v)
120+
go (InsertWithBlob u b) (Insert v) = InsertWithBlob (f u v) b
121+
go (InsertWithBlob u b) (InsertWithBlob v _) = InsertWithBlob (f u v) b
122+
go (InsertWithBlob u b) (Mupdate v) = InsertWithBlob (f u v) b
123+
go (Mupdate u) (Insert v) = Insert (f u v)
124+
go (Mupdate u) (InsertWithBlob v b) = InsertWithBlob (f u v) b
125+
go (Mupdate u) (Mupdate v) = Insert (f u v)
126+
108127
combineMaybe :: (v -> v -> v) -> Maybe (Entry v b) -> Maybe (Entry v b) -> Maybe (Entry v b)
109128
combineMaybe _ e1 Nothing = e1
110129
combineMaybe _ Nothing e2 = e2

src/Database/LSMTree/Internal/Merge.hs

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,22 +252,35 @@ steps ::
252252
=> Merge m h
253253
-> Int -- ^ How many input entries to consume (at least)
254254
-> m (Int, StepResult)
255-
steps Merge {..} requestedSteps = assertStepsInvariant <$> do
255+
steps m@Merge {..} requestedSteps = assertStepsInvariant <$> do
256256
-- TODO: ideally, we would not check whether the merge was already done on
257257
-- every call to @steps@. It is important for correctness, however, that we
258258
-- do not call @steps@ on a merge when it was already done. It is not yet
259259
-- clear whether our (upcoming) implementation of scheduled merges is going
260260
-- to satisfy this precondition when it calls @steps@, so for now we do the
261261
-- check.
262262
readMutVar mergeState >>= \case
263-
Merging -> go 0
263+
Merging -> case mergeType of
264+
MergeMidLevel -> doStepsLevel m requestedSteps
265+
MergeLastLevel -> doStepsLevel m requestedSteps
266+
MergeUnion -> doStepsUnion m requestedSteps
264267
MergingDone -> pure (0, MergeDone)
265268
Completed -> error "steps: Merge is completed"
266269
Closed -> error "steps: Merge is closed"
267270
where
268271
assertStepsInvariant res = assert (stepsInvariant requestedSteps res) res
269272

270-
go :: Int -> m (Int, StepResult)
273+
{-# SPECIALISE doStepsLevel ::
274+
Merge IO h
275+
-> Int
276+
-> IO (Int, StepResult) #-}
277+
doStepsLevel ::
278+
(MonadMask m, MonadSTM m, MonadST m)
279+
=> Merge m h
280+
-> Int -- ^ How many input entries to consume (at least)
281+
-> m (Int, StepResult)
282+
doStepsLevel Merge {..} requestedSteps = go 0
283+
where
271284
go !n
272285
| n >= requestedSteps =
273286
return (n, MergeInProgress)
@@ -329,6 +342,48 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
329342
writeMutVar mergeState $! MergingDone
330343
pure (n + dropped, MergeDone)
331344

345+
{-# SPECIALISE doStepsUnion ::
346+
Merge IO h
347+
-> Int
348+
-> IO (Int, StepResult) #-}
349+
doStepsUnion ::
350+
(MonadMask m, MonadSTM m, MonadST m)
351+
=> Merge m h
352+
-> Int -- ^ How many input entries to consume (at least)
353+
-> m (Int, StepResult)
354+
doStepsUnion Merge {..} requestedSteps = go 0
355+
where
356+
go !n
357+
| n >= requestedSteps =
358+
return (n, MergeInProgress)
359+
| otherwise = do
360+
(key, entry, hasMore) <- Readers.pop mergeReaders
361+
handleEntry (n + 1) key entry hasMore
362+
363+
-- Similar to 'handleMupdate' in 'stepsLevel', but here we have to combine
364+
-- all entries monoidally, so there are no obsolete/overwritten entries
365+
-- that we could skip.
366+
handleEntry !n !key !entry Readers.Drained = do
367+
-- no future entries, no previous entry to resolve, just write!
368+
writeReaderEntry mergeType mergeBuilder key entry
369+
writeMutVar mergeState $! MergingDone
370+
pure (n, MergeDone)
371+
372+
handleEntry !n !key !entry Readers.HasMore = do
373+
nextKey <- Readers.peekKey mergeReaders
374+
if nextKey /= key
375+
then do
376+
-- resolved all entries for this key, write it
377+
writeReaderEntry mergeType mergeBuilder key entry
378+
go n
379+
else do
380+
(_, nextEntry, hasMore) <- Readers.pop mergeReaders
381+
-- for resolution, we need the full second value to be present
382+
let resolved = combineUnion mergeMappend
383+
(Reader.toFullEntry entry)
384+
(Reader.toFullEntry nextEntry)
385+
handleEntry (n + 1) key (Reader.Entry resolved) hasMore
386+
332387
{-# SPECIALISE writeReaderEntry ::
333388
MergeType
334389
-> RunBuilder IO h

test/Test/Database/LSMTree/Internal/Merge.hs

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
31
module Test.Database.LSMTree.Internal.Merge (tests) where
42

53
import Control.Exception (evaluate)
@@ -9,7 +7,7 @@ import qualified Data.BloomFilter as Bloom
97
import Data.Foldable (traverse_)
108
import Data.Map.Strict (Map)
119
import qualified Data.Map.Strict as Map
12-
import Data.Maybe (isJust)
10+
import Data.Maybe (isJust, mapMaybe)
1311
import qualified Data.Vector as V
1412
import Database.LSMTree.Extras
1513
import Database.LSMTree.Extras.Generators (KeyForIndexCompact)
@@ -38,12 +36,15 @@ import Test.Tasty.QuickCheck
3836

3937
tests :: TestTree
4038
tests = testGroup "Test.Database.LSMTree.Internal.Merge"
41-
[ testProperty "prop_MergeDistributes" $ \level stepSize wbs ->
39+
[ testProperty "prop_MergeDistributes" $ \mergeType stepSize rds ->
40+
ioPropertyWithMockFS $ \fs hbio ->
41+
prop_MergeDistributes fs hbio mergeType stepSize rds
42+
, testProperty "prop_MergeUnion" $ \stepSize rds ->
4243
ioPropertyWithMockFS $ \fs hbio ->
43-
prop_MergeDistributes fs hbio level stepSize wbs
44-
, testProperty "prop_AbortMerge" $ \level stepSize wbs ->
44+
prop_MergeUnion fs hbio stepSize rds
45+
, testProperty "prop_AbortMerge" $ \level stepSize rds ->
4546
ioPropertyWithMockFS $ \fs hbio ->
46-
prop_AbortMerge fs hbio level stepSize wbs
47+
prop_AbortMerge fs hbio level stepSize rds
4748
]
4849
where
4950
ioPropertyWithMockFS ::
@@ -70,12 +71,12 @@ prop_MergeDistributes ::
7071
StepSize ->
7172
SmallList (RunData KeyForIndexCompact SerialisedValue SerialisedBlob) ->
7273
IO Property
73-
prop_MergeDistributes fs hbio level stepSize (SmallList rds) =
74+
prop_MergeDistributes fs hbio mergeType stepSize (SmallList rds) =
7475
withRuns fs hbio (V.fromList (zip (simplePaths [10..]) rds')) $ \runs -> do
7576
let stepsNeeded = sum (map (Map.size . unRunData) rds)
76-
(stepsDone, lhs) <- mergeRuns fs hbio level (RunNumber 0) runs stepSize
77-
withRun fs hbio (simplePath 1)
78-
(RunData $ mergeWriteBuffers level $ fmap unRunData rds') $ \rhs -> do
77+
(stepsDone, lhs) <- mergeRuns fs hbio mergeType (RunNumber 0) runs stepSize
78+
let runData = RunData $ mergeWriteBuffers mergeType $ fmap unRunData rds'
79+
withRun fs hbio (simplePath 1) runData $ \rhs -> do
7980

8081
(lhsSize, lhsFilter, lhsIndex, lhsKOps,
8182
lhsKOpsFileContent, lhsBlobFileContent) <- getRunContent lhs
@@ -133,6 +134,45 @@ prop_MergeDistributes fs hbio level stepSize (SmallList rds) =
133134
, blobFileContent
134135
)
135136

137+
-- | Union-merging multiple runs behaves like 'Map.unionsWith' on their values
138+
-- and blobs.
139+
prop_MergeUnion ::
140+
FS.HasFS IO h ->
141+
FS.HasBlockIO IO h ->
142+
StepSize ->
143+
SmallList (RunData KeyForIndexCompact SerialisedValue SerialisedBlob) ->
144+
IO Property
145+
prop_MergeUnion fs hbio stepSize (SmallList rds) =
146+
withRuns fs hbio (V.fromList (zip (simplePaths [10..]) rds')) $ \runs -> do
147+
(_, run) <- mergeRuns fs hbio MergeUnion (RunNumber 0) runs stepSize
148+
149+
lhsKOps <- readKOps Nothing run
150+
let lhs = Map.fromList (mapMaybe (traverse getValueAndBlob) lhsKOps)
151+
152+
-- cleanup
153+
releaseRef run
154+
155+
return $
156+
lhs === rhs
157+
.&&. counterexample ("Deletes in " <> show lhs)
158+
(all ((/= Entry.Delete) . snd) lhsKOps)
159+
where
160+
rds' = fmap serialiseRunData rds
161+
162+
rhs :: Map SerialisedKey (SerialisedValue, Maybe SerialisedBlob)
163+
rhs = Map.unionsWith resolveValueAndBlob
164+
(map (Map.mapMaybe getValueAndBlob . unRunData) rds')
165+
166+
getValueAndBlob :: Entry.Entry v b -> Maybe (v, Maybe b)
167+
getValueAndBlob = \case
168+
Entry.Insert v -> Just (v, Nothing)
169+
Entry.InsertWithBlob v b -> Just (v, Just b)
170+
Entry.Mupdate v -> Just (v, Nothing)
171+
Entry.Delete -> Nothing
172+
173+
resolveValueAndBlob (v', Nothing) (v, b) = (mappendValues v' v, b)
174+
resolveValueAndBlob (v', Just b) (v, _) = (mappendValues v' v, Just b)
175+
136176
-- | After merging for a few steps, we can prematurely abort the merge, which
137177
-- should clean up properly.
138178
prop_AbortMerge ::
@@ -196,10 +236,12 @@ type SerialisedEntry = Entry.Entry SerialisedValue SerialisedBlob
196236
mergeWriteBuffers :: MergeType
197237
-> [Map SerialisedKey SerialisedEntry]
198238
-> Map SerialisedKey SerialisedEntry
199-
mergeWriteBuffers mergeType =
200-
--TODO: review and update this to support MergeUnion
201-
(if mergeType == MergeMidLevel then id else Map.filter (not . isDelete))
202-
. Map.unionsWith (Entry.combine mappendValues)
239+
mergeWriteBuffers = \case
240+
MergeMidLevel -> Map.unionsWith (Entry.combine mappendValues)
241+
MergeLastLevel -> Map.filter (not . isDelete)
242+
. Map.unionsWith (Entry.combine mappendValues)
243+
MergeUnion -> Map.filter (not . isDelete)
244+
. Map.unionsWith (Entry.combineUnion mappendValues)
203245
where
204246
isDelete Entry.Delete = True
205247
isDelete _ = False

0 commit comments

Comments
 (0)