Skip to content

Commit 53cd2b7

Browse files
authored
Merge pull request #464 from IntersectMBO/mheinzel/prototype-preparation
Prototype improvements, support mupsert
2 parents 10e6811 + 1b3d3d2 commit 53cd2b7

File tree

5 files changed

+377
-266
lines changed

5 files changed

+377
-266
lines changed

lsm-tree.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ library prototypes
689689
exposed-modules:
690690
FormatPage
691691
ScheduledMerges
692+
ScheduledMergesTest
692693
ScheduledMergesTestQLS
693694

694695
build-depends:
@@ -698,7 +699,6 @@ library prototypes
698699
, constraints
699700
, containers
700701
, contra-tracer
701-
, lsm-tree
702702
, QuickCheck
703703
, quickcheck-dynamic
704704
, quickcheck-lockstep

prototypes/ScheduledMerges.hs

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
module ScheduledMerges (
2020
-- * Main API
2121
LSM,
22-
Key, Value, Blob,
22+
Key (K), Value (V), resolveValue, Blob (B),
2323
new,
2424
LookupResult (..),
2525
lookup, lookups,
2626
Update (..),
2727
update, updates,
2828
insert, inserts,
2929
delete, deletes,
30+
mupsert, mupserts,
3031
supply,
3132
duplicate,
3233

@@ -52,8 +53,6 @@ import Control.Monad.ST
5253
import Control.Tracer (Tracer, contramap, traceWith)
5354
import GHC.Stack (HasCallStack, callStack)
5455

55-
import Database.LSMTree.Normal (LookupResult (..), Update (..))
56-
5756

5857
data LSM s = LSMHandle !(STRef s Counter)
5958
!(STRef s (LSMContent s))
@@ -122,12 +121,20 @@ runSize = Map.size
122121
bufferSize :: Buffer -> Int
123122
bufferSize = Map.size
124123

125-
type Op = Update Value Blob
124+
type Op = Update Value Blob
125+
126+
newtype Key = K Int
127+
deriving stock (Eq, Ord, Show)
128+
deriving newtype Enum
129+
130+
newtype Value = V Int
131+
deriving stock (Eq, Show)
126132

127-
type Key = Int
128-
type Value = Int
129-
type Blob = Int
133+
resolveValue :: Value -> Value -> Value
134+
resolveValue (V x) (V y) = V (x + y)
130135

136+
newtype Blob = B Int
137+
deriving stock (Eq, Show)
131138

132139
-- | The size of the 4 tiering runs at each level are allowed to be:
133140
-- @4^(level-1) < size <= 4^level@
@@ -321,13 +328,23 @@ newMerge tr level mergepolicy mergelast rs = do
321328
MergeLastLevel -> lastLevelMerge (mergek rs)
322329

323330
mergek :: [Run] -> Run
324-
mergek = Map.unions
331+
mergek = Map.unionsWith combine
332+
333+
combine :: Op -> Op -> Op
334+
combine x y = case x of
335+
Insert{} -> x
336+
Delete{} -> x
337+
Mupsert v -> case y of
338+
Insert v' mb -> Insert (resolveValue v v') mb
339+
Delete -> Insert v Nothing
340+
Mupsert v' -> Mupsert (resolveValue v v')
325341

326342
lastLevelMerge :: Run -> Run
327-
lastLevelMerge = Map.filter isInsert
343+
lastLevelMerge = Map.filter (not . isDelete)
328344
where
329-
isInsert Insert{} = True
330-
isInsert Delete = False
345+
isDelete Delete = True
346+
isDelete Insert{} = False
347+
isDelete Mupsert{} = False
331348

332349
expectCompletedMerge :: HasCallStack
333350
=> Tracer (ST s) EventDetail
@@ -421,18 +438,29 @@ new = do
421438
lsm <- newSTRef (LSMContent Map.empty [])
422439
return (LSMHandle c lsm)
423440

441+
inserts :: Tracer (ST s) Event -> LSM s -> [(Key, Value, Maybe Blob)] -> ST s ()
442+
inserts tr lsm kvbs = updates tr lsm [ (k, Insert v b) | (k, v, b) <- kvbs ]
424443

425-
inserts :: Tracer (ST s) Event -> LSM s -> [(Key, Value)] -> ST s ()
426-
inserts tr lsm kvs = updates tr lsm [ (k, Insert v Nothing) | (k,v) <- kvs ]
444+
insert :: Tracer (ST s) Event -> LSM s -> Key -> Value -> Maybe Blob -> ST s ()
445+
insert tr lsm k v b = update tr lsm k (Insert v b)
427446

428-
insert :: Tracer (ST s) Event -> LSM s -> Key -> Value -> ST s ()
429-
insert tr lsm k v = update tr lsm k (Insert v Nothing)
447+
deletes :: Tracer (ST s) Event -> LSM s -> [Key] -> ST s ()
448+
deletes tr lsm ks = updates tr lsm [ (k, Delete) | k <- ks ]
430449

431450
delete :: Tracer (ST s) Event -> LSM s -> Key -> ST s ()
432451
delete tr lsm k = update tr lsm k Delete
433452

434-
deletes :: Tracer (ST s) Event -> LSM s -> [Key] -> ST s ()
435-
deletes tr lsm ks = updates tr lsm [ (k, Delete) | k <- ks ]
453+
mupserts :: Tracer (ST s) Event -> LSM s -> [(Key, Value)] -> ST s ()
454+
mupserts tr lsm kvbs = updates tr lsm [ (k, Mupsert v) | (k, v) <- kvbs ]
455+
456+
mupsert :: Tracer (ST s) Event -> LSM s -> Key -> Value -> ST s ()
457+
mupsert tr lsm k v = update tr lsm k (Mupsert v)
458+
459+
data Update v b =
460+
Insert !v !(Maybe b)
461+
| Mupsert !v
462+
| Delete
463+
deriving stock (Eq, Show)
436464

437465
updates :: Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
438466
updates tr lsm = mapM_ (uncurry (update tr lsm))
@@ -444,7 +472,7 @@ update tr (LSMHandle scr lsmr) k op = do
444472
modifySTRef' scr (+1)
445473
supplyCredits 1 ls
446474
invariant ls
447-
let wb' = Map.insert k op wb
475+
let wb' = Map.insertWith combine k op wb
448476
if bufferSize wb' >= maxBufferSize
449477
then do
450478
ls' <- increment tr sc (bufferToRun wb') ls
@@ -460,21 +488,32 @@ supply (LSMHandle scr lsmr) credits = do
460488
supplyCredits credits ls
461489
invariant ls
462490

491+
data LookupResult v b =
492+
NotFound
493+
| Found !v !(Maybe b)
494+
deriving stock (Eq, Show)
495+
463496
lookups :: LSM s -> [Key] -> ST s [(Key, LookupResult Value Blob)]
464-
lookups lsm = mapM (\k -> (k,) <$> lookup lsm k)
497+
lookups lsm ks = do
498+
runs <- concat <$> allLayers lsm
499+
return $ map (\k -> (k, doLookup k runs)) ks
465500

466501
lookup :: LSM s -> Key -> ST s (LookupResult Value Blob)
467502
lookup lsm k = do
468-
rss <- allLayers lsm
469-
return $!
470-
foldr (\lookures continue ->
471-
case lookures of
472-
Nothing -> continue
473-
Just (Insert v Nothing) -> Found v
474-
Just (Insert v (Just b)) -> FoundWithBlob v b
475-
Just Delete -> NotFound)
476-
NotFound
477-
[ Map.lookup k r | rs <- rss, r <- rs ]
503+
runs <- concat <$> allLayers lsm
504+
return $ doLookup k runs
505+
506+
doLookup :: Key -> [Run] -> LookupResult Value Blob
507+
doLookup k =
508+
foldr (\run continue ->
509+
case Map.lookup k run of
510+
Nothing -> continue
511+
Just (Insert v mb) -> Found v mb
512+
Just Delete -> NotFound
513+
Just (Mupsert v) -> case continue of
514+
NotFound -> Found v Nothing
515+
Found v' mb -> Found (resolveValue v v') mb)
516+
NotFound
478517

479518
bufferToRun :: Buffer -> Run
480519
bufferToRun = id
@@ -615,7 +654,6 @@ duplicate (LSMHandle _scr lsmr) = do
615654
-- it's that simple here, because we share all the pure value and all the
616655
-- STRefs and there's no ref counting to be done
617656

618-
619657
-------------------------------------------------------------------------------
620658
-- Measurements
621659
--
@@ -640,12 +678,13 @@ flattenIncomingRun (Merging (MergingRun _ _ mr)) = do
640678
CompletedMerge r -> return [r]
641679
OngoingMerge _ rs _ -> return rs
642680

643-
logicalValue :: LSM s -> ST s (Map Key Value)
644-
logicalValue = fmap (Map.mapMaybe justInsert . Map.unions . concat)
681+
logicalValue :: LSM s -> ST s (Map Key (Value, Maybe Blob))
682+
logicalValue = fmap (Map.mapMaybe justInsert . Map.unionsWith combine . concat)
645683
. allLayers
646684
where
647-
justInsert (Insert v _) = Just v
685+
justInsert (Insert v b) = Just (v, b)
648686
justInsert Delete = Nothing
687+
justInsert (Mupsert v) = Just (v, Nothing)
649688

650689
dumpRepresentation :: LSM s
651690
-> ST s [(Maybe (MergePolicy, MergeLastLevel, MergingRunState), [Run])]

prototypes/ScheduledMergesTest.hs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
module ScheduledMergesTest (tests) where
2+
3+
import Control.Exception
4+
import Control.Monad (replicateM_, when)
5+
import Control.Monad.ST
6+
import Control.Tracer (Tracer (Tracer))
7+
import qualified Control.Tracer as Tracer
8+
import Data.Foldable (traverse_)
9+
import Data.STRef
10+
11+
import ScheduledMerges as LSM
12+
13+
import Test.Tasty
14+
import Test.Tasty.HUnit (HasCallStack, testCase)
15+
16+
tests :: TestTree
17+
tests = testGroup "Unit tests"
18+
[ testCase "regression_empty_run" test_regression_empty_run
19+
, testCase "merge_again_with_incoming" test_merge_again_with_incoming
20+
]
21+
22+
-- | Results in an empty run on level 2.
23+
test_regression_empty_run :: IO ()
24+
test_regression_empty_run =
25+
runWithTracer $ \tracer -> do
26+
stToIO $ do
27+
lsm <- LSM.new
28+
let ins k = LSM.insert tracer lsm (K k) (V 0) Nothing
29+
let del k = LSM.delete tracer lsm (K k)
30+
-- run 1
31+
ins 0
32+
ins 1
33+
ins 2
34+
ins 3
35+
-- run 2
36+
ins 0
37+
ins 1
38+
ins 2
39+
ins 3
40+
-- run 3
41+
ins 0
42+
ins 1
43+
ins 2
44+
ins 3
45+
-- run 4, deletes all previous elements
46+
del 0
47+
del 1
48+
del 2
49+
del 3
50+
51+
expectShape lsm
52+
[ ([], [4,4,4,4])
53+
]
54+
55+
-- run 5, results in last level merge of run 1-4
56+
ins 0
57+
ins 1
58+
ins 2
59+
ins 3
60+
61+
expectShape lsm
62+
[ ([], [4])
63+
, ([4,4,4,4], [])
64+
]
65+
66+
-- finish merge
67+
LSM.supply lsm 16
68+
69+
expectShape lsm
70+
[ ([], [4])
71+
, ([], [0])
72+
]
73+
74+
-- | Covers the case where a run ends up too small for a level, so it gets
75+
-- merged again with the next incoming runs.
76+
-- That 5-way merge gets completed by supplying credits That merge gets
77+
-- completed by supplying credits and then becomes part of another merge.
78+
test_merge_again_with_incoming :: IO ()
79+
test_merge_again_with_incoming =
80+
runWithTracer $ \tracer -> do
81+
stToIO $ do
82+
lsm <- LSM.new
83+
let ins k = LSM.insert tracer lsm (K k) (V 0) Nothing
84+
-- get something to 3rd level (so 2nd level is not levelling)
85+
-- (needs 5 runs to go to level 2 so the resulting run becomes too big)
86+
traverse_ ins [101..100+(5*16)]
87+
88+
expectShape lsm -- not yet arrived at level 3, but will soon
89+
[ ([], [4,4,4,4])
90+
, ([16,16,16,16], [])
91+
]
92+
93+
-- get a very small run (4 elements) to 2nd level
94+
replicateM_ 4 $
95+
traverse_ ins [201..200+4]
96+
97+
expectShape lsm
98+
[ ([], [4,4,4,4]) -- these runs share the same keys
99+
, ([4,4,4,4,64], [])
100+
]
101+
102+
-- get another run to 2nd level, which the small run can be merged with
103+
traverse_ ins [301..300+16]
104+
105+
expectShape lsm
106+
[ ([], [4,4,4,4])
107+
, ([4,4,4,4], [])
108+
, ([], [80])
109+
]
110+
111+
-- add just one more run so the 5-way merge on 2nd level gets created
112+
traverse_ ins [401..400+4]
113+
114+
expectShape lsm
115+
[ ([], [4])
116+
, ([4,4,4,4,4], [])
117+
, ([], [80])
118+
]
119+
120+
-- complete the merge (20 entries, but credits get scaled up by 1.25)
121+
LSM.supply lsm 16
122+
123+
expectShape lsm
124+
[ ([], [4])
125+
, ([], [20])
126+
, ([], [80])
127+
]
128+
129+
-- get 3 more runs to 2nd level, so the 5-way merge completes
130+
-- and becomes part of a new merge.
131+
-- (actually 4, as runs only move once a fifth run arrives...)
132+
traverse_ ins [501..500+(4*16)]
133+
134+
expectShape lsm
135+
[ ([], [4])
136+
, ([4,4,4,4], [])
137+
, ([16,16,16,20,80], [])
138+
]
139+
140+
-------------------------------------------------------------------------------
141+
-- tracing and expectations on LSM shape
142+
--
143+
144+
-- | Provides a tracer and will add the log of traced events to the reported
145+
-- failure.
146+
runWithTracer :: (Tracer (ST RealWorld) Event -> IO a) -> IO a
147+
runWithTracer action = do
148+
events <- stToIO $ newSTRef []
149+
let tracer = Tracer $ Tracer.emit $ \e -> modifySTRef events (e :)
150+
action tracer `catch` \e -> do
151+
ev <- reverse <$> stToIO (readSTRef events)
152+
throwIO (Traced e ev)
153+
154+
data TracedException = Traced SomeException [Event]
155+
deriving stock (Show)
156+
157+
instance Exception TracedException where
158+
displayException (Traced e ev) =
159+
displayException e <> "\ntrace:\n" <> unlines (map show ev)
160+
161+
expectShape :: HasCallStack => LSM s -> [([Int], [Int])] -> ST s ()
162+
expectShape lsm expected = do
163+
shape <- representationShape <$> dumpRepresentation lsm
164+
when (shape == expected) $
165+
error $ unlines
166+
[ "expected shape: " <> show expected
167+
, "actual shape: " <> show shape
168+
]

0 commit comments

Comments
 (0)