Skip to content

Commit 86704e0

Browse files
authored
Merge pull request #609 from IntersectMBO/mheinzel/merging-tree-lookups
Lookups in merging tree
2 parents 034f718 + b944764 commit 86704e0

File tree

8 files changed

+481
-54
lines changed

8 files changed

+481
-54
lines changed

lsm-tree.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ library
141141
Database.LSMTree.Internal.MergeSchedule
142142
Database.LSMTree.Internal.MergingRun
143143
Database.LSMTree.Internal.MergingTree
144+
Database.LSMTree.Internal.MergingTree.Lookup
144145
Database.LSMTree.Internal.Page
145146
Database.LSMTree.Internal.PageAcc
146147
Database.LSMTree.Internal.PageAcc1

prototypes/ScheduledMerges.hs

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -935,16 +935,17 @@ checkedUnionDebt tree debtRef = do
935935

936936
type LookupAcc = Maybe Op
937937

938-
mergeAcc :: [LookupAcc] -> LookupAcc
939-
mergeAcc = foldl (updateAcc combine) Nothing . catMaybes
940-
941-
unionAcc :: [LookupAcc] -> LookupAcc
942-
unionAcc = foldl (updateAcc combineUnion) Nothing . catMaybes
943-
944938
updateAcc :: (Op -> Op -> Op) -> LookupAcc -> Op -> LookupAcc
945939
updateAcc _ Nothing old = Just old
946940
updateAcc f (Just new_) old = Just (f new_ old) -- acc has more recent Op
947941

942+
mergeAcc :: TreeMergeType -> [LookupAcc] -> LookupAcc
943+
mergeAcc mt = foldl (updateAcc com) Nothing . catMaybes
944+
where
945+
com = case mt of
946+
MergeLevel -> combine
947+
MergeUnion -> combineUnion
948+
948949
-- | We handle lookups by accumulating results by going through the runs from
949950
-- most recent to least recent, starting with the write buffer.
950951
--
@@ -957,8 +958,12 @@ doLookup wb runs ul k = do
957958
NoUnion ->
958959
return (convertAcc acc0)
959960
Union tree _ -> do
960-
accTree <- lookupsTree k tree
961-
return (convertAcc (mergeAcc [acc0, accTree]))
961+
treeBatches <- buildLookupTree tree
962+
let treeResults = lookupBatch Nothing k <$> treeBatches
963+
return $ convertAcc $ foldLookupTree $
964+
if null wb && null runs
965+
then treeResults
966+
else LookupNode MergeLevel [LookupBatch acc0, treeResults ]
962967
where
963968
convertAcc :: LookupAcc -> LookupResult Value Blob
964969
convertAcc = \case
@@ -976,6 +981,10 @@ lookupBatch acc k rs =
976981
let ops = [op | r <- rs, Just op <- [Map.lookup k r]]
977982
in foldl (updateAcc combine) acc ops
978983

984+
data LookupTree a = LookupBatch a
985+
| LookupNode TreeMergeType [LookupTree a]
986+
deriving stock Functor
987+
979988
-- | Do lookups on runs at the leaves and recursively combine the resulting
980989
-- 'LookupAcc's, either using 'mergeAcc' or 'unionAcc' depending on the merge
981990
-- type.
@@ -989,32 +998,38 @@ lookupBatch acc k rs =
989998
-- have a union level) and then do lookups, two batches of lookups have to be
990999
-- performed (plus a batch for the table's regular levels if it has been updated
9911000
-- after the union).
992-
lookupsTree :: Key -> MergingTree s -> ST s LookupAcc
993-
lookupsTree k = go
1001+
--
1002+
-- TODO: we can still improve the batching, for example combining the child of
1003+
-- PendingLevelMerge with the pre-existing runs when it is already completed.
1004+
buildLookupTree :: MergingTree s -> ST s (LookupTree [Run])
1005+
buildLookupTree = go
9941006
where
995-
go :: MergingTree s -> ST s LookupAcc
1007+
go :: MergingTree s -> ST s (LookupTree [Run])
9961008
go (MergingTree treeState) = readSTRef treeState >>= \case
9971009
CompletedTreeMerge r ->
998-
return $ lookupBatch' [r]
1010+
return $ LookupBatch [r]
9991011
OngoingTreeMerge (MergingRun mt _ mergeState) ->
10001012
readSTRef mergeState >>= \case
10011013
CompletedMerge r ->
1002-
return $ lookupBatch' [r]
1014+
return $ LookupBatch [r]
10031015
OngoingMerge _ rs _ -> case mt of
1004-
MergeLevel -> return $ lookupBatch' rs -- combine into batch
1005-
MergeUnion -> return $ unionAcc (map (\r -> lookupBatch' [r]) rs)
1006-
PendingTreeMerge (PendingUnionMerge trees) -> do
1007-
unionAcc <$> traverse go trees
1016+
MergeLevel -> return $ LookupBatch rs -- combine into batch
1017+
MergeUnion -> return $ LookupNode MergeUnion $ map (\r -> LookupBatch [r]) rs
10081018
PendingTreeMerge (PendingLevelMerge prs tree) -> do
1009-
runs <- concat <$> traverse flattenPreExistingRun prs -- combine into batch
1010-
let acc0 = lookupBatch' runs
1019+
preExisting <- LookupBatch . concat <$>
1020+
traverse flattenPreExistingRun prs -- combine into batch
10111021
case tree of
1012-
Nothing -> return acc0 -- only runs and merging level runs, done
1022+
Nothing -> return preExisting
10131023
Just t -> do
1014-
accTree <- go t
1015-
return (mergeAcc [acc0, accTree])
1024+
lTree <- go t
1025+
return (LookupNode MergeLevel [preExisting, lTree])
1026+
PendingTreeMerge (PendingUnionMerge trees) -> do
1027+
LookupNode MergeUnion <$> traverse go trees
10161028

1017-
lookupBatch' = lookupBatch Nothing k
1029+
foldLookupTree :: LookupTree LookupAcc -> LookupAcc
1030+
foldLookupTree = \case
1031+
LookupBatch acc -> acc
1032+
LookupNode mt children -> mergeAcc mt (map foldLookupTree children)
10181033

10191034
-------------------------------------------------------------------------------
10201035
-- Nominal credits

src/Database/LSMTree/Common.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ module Database.LSMTree.Common (
4949
) where
5050

5151
import Control.Concurrent.Class.MonadMVar.Strict
52-
import Control.Concurrent.Class.MonadSTM (MonadSTM, STM)
52+
import Control.Concurrent.Class.MonadSTM (STM)
53+
import Control.Monad.Class.MonadAsync
5354
import Control.Monad.Class.MonadST
5455
import Control.Monad.Class.MonadThrow
5556
import Control.Monad.Primitive (PrimMonad)
@@ -74,8 +75,8 @@ import System.FS.IO (HandleIO)
7475
-------------------------------------------------------------------------------}
7576

7677
-- | Utility class for grouping @io-classes@ constraints.
77-
class ( MonadMVar m, MonadSTM m, MonadThrow (STM m), MonadThrow m, MonadCatch m
78-
, MonadMask m, PrimMonad m, MonadST m
78+
class ( MonadAsync m, MonadMVar m, MonadThrow (STM m), MonadThrow m
79+
, MonadCatch m , MonadMask m, PrimMonad m, MonadST m
7980
) => IOLike m
8081

8182
instance IOLike IO

src/Database/LSMTree/Internal.hs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ import Control.Concurrent.Class.MonadSTM.RWVar (RWVar)
8282
import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW
8383
import Control.DeepSeq
8484
import Control.Monad (forM, unless, void)
85+
import Control.Monad.Class.MonadAsync as Async
8586
import Control.Monad.Class.MonadST (MonadST (..))
8687
import Control.Monad.Class.MonadThrow
8788
import Control.Monad.Primitive
@@ -105,9 +106,12 @@ import Database.LSMTree.Internal.Config
105106
import qualified Database.LSMTree.Internal.Cursor as Cursor
106107
import Database.LSMTree.Internal.Entry (Entry)
107108
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
108-
ResolveSerialisedValue, lookupsIO)
109+
ResolveSerialisedValue, lookupsIO,
110+
lookupsIOWithoutWriteBuffer)
109111
import Database.LSMTree.Internal.MergeSchedule
112+
import qualified Database.LSMTree.Internal.MergingRun as MR
110113
import Database.LSMTree.Internal.MergingTree
114+
import qualified Database.LSMTree.Internal.MergingTree.Lookup as MT
111115
import Database.LSMTree.Internal.Paths (SessionRoot (..),
112116
SnapshotMetaDataChecksumFile (..),
113117
SnapshotMetaDataFile (..), SnapshotName)
@@ -123,6 +127,7 @@ import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
123127
import Database.LSMTree.Internal.Snapshot
124128
import Database.LSMTree.Internal.Snapshot.Codec
125129
import Database.LSMTree.Internal.UniqCounter
130+
import qualified Database.LSMTree.Internal.Vector as V
126131
import qualified Database.LSMTree.Internal.WriteBuffer as WB
127132
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
128133
import qualified System.FS.API as FS
@@ -799,16 +804,48 @@ close t = do
799804
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-}
800805
-- | See 'Database.LSMTree.Normal.lookups'.
801806
lookups ::
802-
(MonadST m, MonadSTM m, MonadThrow m)
807+
(MonadAsync m, MonadMask m, MonadMVar m, MonadST m)
803808
=> ResolveSerialisedValue
804809
-> V.Vector SerialisedKey
805810
-> Table m h
806811
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
807812
lookups resolve ks t = do
808813
traceWith (tableTracer t) $ TraceLookups (V.length ks)
809814
withOpenTable t $ \tEnv ->
810-
RW.withReadAccess (tableContent tEnv) $ \tableContent ->
811-
let !cache = tableCache tableContent in
815+
RW.withReadAccess (tableContent tEnv) $ \tableContent -> do
816+
case tableUnionLevel tableContent of
817+
NoUnion -> regularLevelLookups tEnv tableContent
818+
Union tree -> do
819+
isStructurallyEmpty tree >>= \case
820+
True -> regularLevelLookups tEnv tableContent
821+
False ->
822+
-- TODO: the blob refs returned from the tree can be invalidated
823+
-- by supplyUnionCredits or other operations on any table that
824+
-- shares merging runs or trees. We need to keep open the runs!
825+
-- This could be achieved by storing the LookupTree and only
826+
-- calling MT.releaseLookupTree later, when we are okay with
827+
-- invalidating the blob refs (similar to the LevelsCache).
828+
-- Lookups then use the cached tree, but when should we rebuild
829+
-- the tree? On each call to supplyUnionCredits?
830+
withActionRegistry $ \reg -> do
831+
regularResult <-
832+
-- asynchronously, so tree lookup batches can already be
833+
-- submitted without waiting for the result.
834+
Async.async $ regularLevelLookups tEnv tableContent
835+
treeBatches <- MT.buildLookupTree reg tree
836+
treeResults <- forM treeBatches $ \runs ->
837+
Async.async $ treeBatchLookups tEnv runs
838+
-- TODO: if regular levels are empty, don't add them to tree
839+
res <- MT.foldLookupTree resolve $
840+
MT.mkLookupNode MR.MergeLevel $ V.fromList
841+
[ MT.LookupBatch regularResult
842+
, treeResults
843+
]
844+
MT.releaseLookupTree reg treeBatches
845+
return res
846+
where
847+
regularLevelLookups tEnv tableContent = do
848+
let !cache = tableCache tableContent
812849
lookupsIO
813850
(tableHasBlockIO tEnv)
814851
(tableArenaManager t)
@@ -821,6 +858,17 @@ lookups resolve ks t = do
821858
(cachedKOpsFiles cache)
822859
ks
823860

861+
treeBatchLookups tEnv runs =
862+
lookupsIOWithoutWriteBuffer
863+
(tableHasBlockIO tEnv)
864+
(tableArenaManager t)
865+
resolve
866+
runs
867+
(V.mapStrict (\(DeRef r) -> Run.runFilter r) runs)
868+
(V.mapStrict (\(DeRef r) -> Run.runIndex r) runs)
869+
(V.mapStrict (\(DeRef r) -> Run.runKOpsFile r) runs)
870+
ks
871+
824872
{-# SPECIALISE rangeLookup ::
825873
ResolveSerialisedValue
826874
-> Range SerialisedKey

0 commit comments

Comments
 (0)