Skip to content

Commit b944764

Browse files
committed
implement lookups on the union level
1 parent 9915ae3 commit b944764

File tree

7 files changed

+443
-31
lines changed

7 files changed

+443
-31
lines changed

lsm-tree.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ library
141141
Database.LSMTree.Internal.MergeSchedule
142142
Database.LSMTree.Internal.MergingRun
143143
Database.LSMTree.Internal.MergingTree
144+
Database.LSMTree.Internal.MergingTree.Lookup
144145
Database.LSMTree.Internal.Page
145146
Database.LSMTree.Internal.PageAcc
146147
Database.LSMTree.Internal.PageAcc1

src/Database/LSMTree/Common.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ module Database.LSMTree.Common (
4949
) where
5050

5151
import Control.Concurrent.Class.MonadMVar.Strict
52-
import Control.Concurrent.Class.MonadSTM (MonadSTM, STM)
52+
import Control.Concurrent.Class.MonadSTM (STM)
53+
import Control.Monad.Class.MonadAsync
5354
import Control.Monad.Class.MonadST
5455
import Control.Monad.Class.MonadThrow
5556
import Control.Monad.Primitive (PrimMonad)
@@ -74,8 +75,8 @@ import System.FS.IO (HandleIO)
7475
-------------------------------------------------------------------------------}
7576

7677
-- | Utility class for grouping @io-classes@ constraints.
77-
class ( MonadMVar m, MonadSTM m, MonadThrow (STM m), MonadThrow m, MonadCatch m
78-
, MonadMask m, PrimMonad m, MonadST m
78+
class ( MonadAsync m, MonadMVar m, MonadThrow (STM m), MonadThrow m
79+
, MonadCatch m , MonadMask m, PrimMonad m, MonadST m
7980
) => IOLike m
8081

8182
instance IOLike IO

src/Database/LSMTree/Internal.hs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ import Control.Concurrent.Class.MonadSTM.RWVar (RWVar)
8282
import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW
8383
import Control.DeepSeq
8484
import Control.Monad (forM, unless, void)
85+
import Control.Monad.Class.MonadAsync as Async
8586
import Control.Monad.Class.MonadST (MonadST (..))
8687
import Control.Monad.Class.MonadThrow
8788
import Control.Monad.Primitive
@@ -105,9 +106,12 @@ import Database.LSMTree.Internal.Config
105106
import qualified Database.LSMTree.Internal.Cursor as Cursor
106107
import Database.LSMTree.Internal.Entry (Entry)
107108
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
108-
ResolveSerialisedValue, lookupsIO)
109+
ResolveSerialisedValue, lookupsIO,
110+
lookupsIOWithoutWriteBuffer)
109111
import Database.LSMTree.Internal.MergeSchedule
112+
import qualified Database.LSMTree.Internal.MergingRun as MR
110113
import Database.LSMTree.Internal.MergingTree
114+
import qualified Database.LSMTree.Internal.MergingTree.Lookup as MT
111115
import Database.LSMTree.Internal.Paths (SessionRoot (..),
112116
SnapshotMetaDataChecksumFile (..),
113117
SnapshotMetaDataFile (..), SnapshotName)
@@ -123,6 +127,7 @@ import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
123127
import Database.LSMTree.Internal.Snapshot
124128
import Database.LSMTree.Internal.Snapshot.Codec
125129
import Database.LSMTree.Internal.UniqCounter
130+
import qualified Database.LSMTree.Internal.Vector as V
126131
import qualified Database.LSMTree.Internal.WriteBuffer as WB
127132
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
128133
import qualified System.FS.API as FS
@@ -799,16 +804,48 @@ close t = do
799804
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-}
800805
-- | See 'Database.LSMTree.Normal.lookups'.
801806
lookups ::
802-
(MonadST m, MonadSTM m, MonadThrow m)
807+
(MonadAsync m, MonadMask m, MonadMVar m, MonadST m)
803808
=> ResolveSerialisedValue
804809
-> V.Vector SerialisedKey
805810
-> Table m h
806811
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
807812
lookups resolve ks t = do
808813
traceWith (tableTracer t) $ TraceLookups (V.length ks)
809814
withOpenTable t $ \tEnv ->
810-
RW.withReadAccess (tableContent tEnv) $ \tableContent ->
811-
let !cache = tableCache tableContent in
815+
RW.withReadAccess (tableContent tEnv) $ \tableContent -> do
816+
case tableUnionLevel tableContent of
817+
NoUnion -> regularLevelLookups tEnv tableContent
818+
Union tree -> do
819+
isStructurallyEmpty tree >>= \case
820+
True -> regularLevelLookups tEnv tableContent
821+
False ->
822+
-- TODO: the blob refs returned from the tree can be invalidated
823+
-- by supplyUnionCredits or other operations on any table that
824+
-- shares merging runs or trees. We need to keep open the runs!
825+
-- This could be achieved by storing the LookupTree and only
826+
-- calling MT.releaseLookupTree later, when we are okay with
827+
-- invalidating the blob refs (similar to the LevelsCache).
828+
-- Lookups then use the cached tree, but when should we rebuild
829+
-- the tree? On each call to supplyUnionCredits?
830+
withActionRegistry $ \reg -> do
831+
regularResult <-
832+
-- asynchronously, so tree lookup batches can already be
833+
-- submitted without waiting for the result.
834+
Async.async $ regularLevelLookups tEnv tableContent
835+
treeBatches <- MT.buildLookupTree reg tree
836+
treeResults <- forM treeBatches $ \runs ->
837+
Async.async $ treeBatchLookups tEnv runs
838+
-- TODO: if regular levels are empty, don't add them to tree
839+
res <- MT.foldLookupTree resolve $
840+
MT.mkLookupNode MR.MergeLevel $ V.fromList
841+
[ MT.LookupBatch regularResult
842+
, treeResults
843+
]
844+
MT.releaseLookupTree reg treeBatches
845+
return res
846+
where
847+
regularLevelLookups tEnv tableContent = do
848+
let !cache = tableCache tableContent
812849
lookupsIO
813850
(tableHasBlockIO tEnv)
814851
(tableArenaManager t)
@@ -821,6 +858,17 @@ lookups resolve ks t = do
821858
(cachedKOpsFiles cache)
822859
ks
823860

861+
treeBatchLookups tEnv runs =
862+
lookupsIOWithoutWriteBuffer
863+
(tableHasBlockIO tEnv)
864+
(tableArenaManager t)
865+
resolve
866+
runs
867+
(V.mapStrict (\(DeRef r) -> Run.runFilter r) runs)
868+
(V.mapStrict (\(DeRef r) -> Run.runIndex r) runs)
869+
(V.mapStrict (\(DeRef r) -> Run.runKOpsFile r) runs)
870+
ks
871+
824872
{-# SPECIALISE rangeLookup ::
825873
ResolveSerialisedValue
826874
-> Range SerialisedKey

src/Database/LSMTree/Internal/Lookup.hs

Lines changed: 107 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
1-
{-# LANGUAGE BangPatterns #-}
2-
{-# LANGUAGE CPP #-}
3-
{-# LANGUAGE DeriveAnyClass #-}
4-
{-# LANGUAGE DeriveFunctor #-}
5-
{-# LANGUAGE DerivingStrategies #-}
6-
{-# LANGUAGE FlexibleContexts #-}
7-
{-# LANGUAGE NamedFieldPuns #-}
8-
{-# LANGUAGE ScopedTypeVariables #-}
9-
{-# LANGUAGE TupleSections #-}
1+
{-# LANGUAGE CPP #-}
102

113
module Database.LSMTree.Internal.Lookup (
124
ResolveSerialisedValue
135
, ByteCountDiscrepancy (..)
6+
, LookupAcc
147
, lookupsIO
8+
, lookupsIOWithoutWriteBuffer
159
-- * Internal: exposed for tests and benchmarks
1610
, RunIx
1711
, KeyIx
@@ -33,7 +27,7 @@ import qualified Data.Vector.Unboxed as VU
3327

3428
import Control.Exception (Exception, assert)
3529
import Control.Monad
36-
import Control.Monad.Class.MonadST as Class
30+
import Control.Monad.Class.MonadST as ST
3731
import Control.Monad.Class.MonadThrow (MonadThrow (..))
3832
import Control.Monad.Primitive
3933
import Control.Monad.ST.Strict
@@ -154,6 +148,8 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy {
154148
deriving stock (Show, Eq)
155149
deriving anyclass (Exception)
156150

151+
type LookupAcc m h = V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
152+
157153
{-# SPECIALIZE lookupsIO ::
158154
HasBlockIO IO h
159155
-> ArenaManager RealWorld
@@ -165,7 +161,7 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy {
165161
-> V.Vector Index
166162
-> V.Vector (Handle h)
167163
-> V.Vector SerialisedKey
168-
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h))))
164+
-> IO (LookupAcc IO h)
169165
#-}
170166
-- | Batched lookups in I\/O.
171167
--
@@ -185,11 +181,11 @@ lookupsIO ::
185181
-> V.Vector Index -- ^ The indexes inside @rs@
186182
-> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@
187183
-> V.Vector SerialisedKey
188-
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
184+
-> m (LookupAcc m h)
189185
lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks =
190186
assert precondition $
191187
withArena mgr $ \arena -> do
192-
(rkixs, ioops) <- Class.stToIO $ prepLookups arena blooms indexes kopsFiles ks
188+
(rkixs, ioops) <- ST.stToIO $ prepLookups arena blooms indexes kopsFiles ks
193189
ioress <- submitIO hbio ioops
194190
intraPageLookups resolveV wb wbblobs rs ks rkixs ioops ioress
195191
where
@@ -201,6 +197,49 @@ lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks
201197
assert (V.length rs == V.length kopsFiles) $
202198
True
203199

200+
{-# SPECIALIZE lookupsIOWithoutWriteBuffer ::
201+
HasBlockIO IO h
202+
-> ArenaManager RealWorld
203+
-> ResolveSerialisedValue
204+
-> V.Vector (Ref (Run IO h))
205+
-> V.Vector (Bloom SerialisedKey)
206+
-> V.Vector Index
207+
-> V.Vector (Handle h)
208+
-> V.Vector SerialisedKey
209+
-> IO (LookupAcc IO h)
210+
#-}
211+
-- | Batched lookups in I\/O.
212+
--
213+
-- See Note [Batched lookups, buffer strategy and restrictions]
214+
--
215+
-- PRECONDITION: the vectors of bloom filters, indexes and file handles
216+
-- should pointwise match with the vectors of runs.
217+
lookupsIOWithoutWriteBuffer ::
218+
forall m h. (MonadThrow m, MonadST m)
219+
=> HasBlockIO m h
220+
-> ArenaManager (PrimState m)
221+
-> ResolveSerialisedValue
222+
-> V.Vector (Ref (Run m h)) -- ^ Runs @rs@
223+
-> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@
224+
-> V.Vector Index -- ^ The indexes inside @rs@
225+
-> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@
226+
-> V.Vector SerialisedKey
227+
-> m (LookupAcc m h)
228+
lookupsIOWithoutWriteBuffer !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks =
229+
assert precondition $
230+
withArena mgr $ \arena -> do
231+
(rkixs, ioops) <- ST.stToIO $ prepLookups arena blooms indexes kopsFiles ks
232+
ioress <- submitIO hbio ioops
233+
intraPageLookupsOn resolveV (V.map (const Nothing) ks) rs ks rkixs ioops ioress
234+
where
235+
-- we check only that the lengths match, because checking the contents is
236+
-- too expensive.
237+
precondition =
238+
assert (V.length rs == V.length blooms) $
239+
assert (V.length rs == V.length indexes) $
240+
assert (V.length rs == V.length kopsFiles) $
241+
True
242+
204243
{-# SPECIALIZE intraPageLookups ::
205244
ResolveSerialisedValue
206245
-> WB.WriteBuffer
@@ -210,7 +249,7 @@ lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks
210249
-> VP.Vector RunIxKeyIx
211250
-> V.Vector (IOOp RealWorld h)
212251
-> VU.Vector IOResult
213-
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h))))
252+
-> IO (LookupAcc IO h)
214253
#-}
215254
-- | Intra-page lookups, and combining lookup results from multiple runs and
216255
-- the write buffer.
@@ -229,7 +268,7 @@ intraPageLookups ::
229268
-> VP.Vector RunIxKeyIx
230269
-> V.Vector (IOOp (PrimState m) h)
231270
-> VU.Vector IOResult
232-
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
271+
-> m (LookupAcc m h)
233272
intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do
234273
-- We accumulate results into the 'res' vector. When there are several
235274
-- lookup hits for the same key then we combine the results. The combining
@@ -244,12 +283,59 @@ intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do
244283
-- the surface API so that all the conversions can be done in one pass
245284
-- without intermediate allocations.
246285
--
247-
res <- VM.generateM (V.length ks) $ \ki ->
248-
case WB.lookup wb (V.unsafeIndex ks ki) of
249-
Nothing -> pure Nothing
250-
Just e -> pure $! Just $! fmap (WBB.mkWeakBlobRef wbblobs) e
251-
-- TODO: ^^ we should be able to avoid this allocation by
252-
-- combining the conversion with other later conversions.
286+
287+
acc0 <-
288+
V.generateM (V.length ks) $ \ki ->
289+
case WB.lookup wb (V.unsafeIndex ks ki) of
290+
Nothing -> pure Nothing
291+
Just e -> pure $! Just $! fmap (WBB.mkWeakBlobRef wbblobs) e
292+
-- TODO: ^^ we should be able to avoid this allocation by
293+
-- combining the conversion with other later conversions.
294+
intraPageLookupsOn resolveV acc0 rs ks rkixs ioops ioress
295+
296+
{-# SPECIALIZE intraPageLookupsOn ::
297+
ResolveSerialisedValue
298+
-> LookupAcc IO h
299+
-> V.Vector (Ref (Run IO h))
300+
-> V.Vector SerialisedKey
301+
-> VP.Vector RunIxKeyIx
302+
-> V.Vector (IOOp RealWorld h)
303+
-> VU.Vector IOResult
304+
-> IO (LookupAcc IO h)
305+
#-}
306+
-- | Intra-page lookups, and combining lookup results from multiple runs and
307+
-- the write buffer.
308+
--
309+
-- This function assumes that @rkixs@ is ordered such that newer runs are
310+
-- handled first. The order matters for resolving cases where we find the same
311+
-- key in multiple runs.
312+
--
313+
intraPageLookupsOn ::
314+
forall m h. (PrimMonad m, MonadThrow m)
315+
=> ResolveSerialisedValue
316+
-> LookupAcc m h
317+
-> V.Vector (Ref (Run m h))
318+
-> V.Vector SerialisedKey
319+
-> VP.Vector RunIxKeyIx
320+
-> V.Vector (IOOp (PrimState m) h)
321+
-> VU.Vector IOResult
322+
-> m (LookupAcc m h)
323+
intraPageLookupsOn !resolveV !acc0 !rs !ks !rkixs !ioops !ioress =
324+
assert (V.length acc0 == V.length ks) $ do
325+
-- We accumulate results into the 'res' vector. When there are several
326+
-- lookup hits for the same key then we combine the results. The combining
327+
-- operator is associative but not commutative, so we must do this in the
328+
-- right order. We start with the write buffer lookup results and then go
329+
-- through the run lookup results in rkixs, which must be ordered by run.
330+
--
331+
-- TODO: reassess the representation of the result vector to try to reduce
332+
-- intermediate allocations. For example use a less convenient
333+
-- representation with several vectors (e.g. separate blob info) and
334+
-- convert to the final convenient representation in a single pass near
335+
-- the surface API so that all the conversions can be done in one pass
336+
-- without intermediate allocations.
337+
--
338+
res <- V.unsafeThaw acc0
253339
loop res 0
254340
V.unsafeFreeze res
255341
where

src/Database/LSMTree/Internal/MergingRun.hs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Database.LSMTree.Internal.MergingRun (
1515
, snapshot
1616
, numRuns
1717
, totalMergeDebt
18+
, mergeType
1819

1920
-- * Merge types
2021
, IsMergeType (..)
@@ -35,6 +36,7 @@ module Database.LSMTree.Internal.MergingRun (
3536

3637
-- * Internal state
3738
, pattern MergingRun
39+
, mergeState
3840
, MergingRunState (..)
3941
, MergeKnownCompleted (..)
4042
, CreditsVar (..)
@@ -156,12 +158,12 @@ new ::
156158
-> RunFsPaths
157159
-> V.Vector (Ref (Run m h))
158160
-> m (Ref (MergingRun t m h))
159-
new hfs hbio resolve caching alloc indexType mergeType runPaths inputRuns =
161+
new hfs hbio resolve caching alloc indexType ty runPaths inputRuns =
160162
-- If creating the Merge fails, we must release the references again.
161163
withActionRegistry $ \reg -> do
162164
runs <- V.mapM (\r -> withRollback reg (dupRef r) releaseRef) inputRuns
163165
merge <- fromMaybe (error "newMerge: merges can not be empty")
164-
<$> Merge.new hfs hbio caching alloc indexType mergeType resolve runPaths runs
166+
<$> Merge.new hfs hbio caching alloc indexType ty resolve runPaths runs
165167
let numInputRuns = NumRuns $ V.length runs
166168
let mergeDebt = numEntriesToMergeDebt (V.foldMap' Run.size runs)
167169
unsafeNew
@@ -243,7 +245,8 @@ unsafeNew mergeNumRuns mergeDebt (SpentCredits spentCredits)
243245

244246
-- | Create references to the runs that should be queried for lookups.
245247
-- In particular, if the merge is not complete, these are the input runs.
246-
{-# SPECIALISE duplicateRuns :: Ref (MergingRun t IO h) -> IO (V.Vector (Ref (Run IO h))) #-}
248+
{-# SPECIALISE duplicateRuns ::
249+
Ref (MergingRun t IO h) -> IO (V.Vector (Ref (Run IO h))) #-}
247250
duplicateRuns ::
248251
(PrimMonad m, MonadMVar m, MonadMask m)
249252
=> Ref (MergingRun t m h)
@@ -291,6 +294,14 @@ numRuns (DeRef MergingRun {mergeNumRuns}) = mergeNumRuns
291294
totalMergeDebt :: Ref (MergingRun t m h) -> MergeDebt
292295
totalMergeDebt (DeRef MergingRun {mergeDebt}) = mergeDebt
293296

297+
{-# INLINE mergeType #-}
298+
mergeType :: MonadMVar m => Ref (MergingRun t m h) -> m (Maybe t)
299+
mergeType (DeRef mr) = do
300+
s <- readMVar (mergeState mr)
301+
return $ case s of
302+
CompletedMerge _ -> Nothing
303+
OngoingMerge _ m -> Just (Merge.mergeType m)
304+
294305
{-------------------------------------------------------------------------------
295306
Credits
296307
-------------------------------------------------------------------------------}

0 commit comments

Comments
 (0)