Skip to content

Commit 2db5274

Browse files
recursion-ninjadcouttsjorisdral
committed
Add MergingTree to the snapshot representation and functions
The union level contains an optional MergingTree. This is now included in the snapshot representation. Update codecs and tests. This changes the snapshot file format. The golden test files are updated in the next patch. Co-authored-by: Duncan Coutts <[email protected]> Co-authored-by: Joris Dral <[email protected]>
1 parent 8a39751 commit 2db5274

File tree

6 files changed

+480
-27
lines changed

6 files changed

+480
-27
lines changed

src/Database/LSMTree/Internal.hs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ module Database.LSMTree.Internal (
6464
, openSnapshot
6565
, deleteSnapshot
6666
, listSnapshots
67-
-- * Mutiple writable tables
67+
-- * Multiple writable tables
6868
, duplicate
6969
-- * Table union
7070
, unions
@@ -323,7 +323,7 @@ data SessionState m h =
323323
| SessionClosed
324324

325325
data SessionEnv m h = SessionEnv {
326-
-- | The path to the directory in which this sesion is live. This is a path
326+
-- | The path to the directory in which this session is live. This is a path
327327
-- relative to root of the 'HasFS' instance.
328328
--
329329
-- INVARIANT: the session root is never changed during the lifetime of a
@@ -1242,10 +1242,22 @@ createSnapshot snap label tableType t = do
12421242
-- Hard link runs into the named snapshot directory
12431243
snapLevels' <- traverse (snapshotRun hfs hbio snapUc reg snapDir) snapLevels
12441244

1245-
-- Release the table content
1245+
-- If a merging tree exists, do the same hard-linking for the runs within
1246+
mTreeOpt <- case tableUnionLevel content of
1247+
NoUnion -> pure Nothing
1248+
Union mTreeRef -> do
1249+
mTree <- toSnapMergingTree mTreeRef
1250+
Just <$> traverse (snapshotRun hfs hbio snapUc reg snapDir) mTree
1251+
12461252
releaseTableContent reg content
12471253

1248-
let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels'
1254+
let snapMetaData = SnapshotMetaData
1255+
label
1256+
tableType
1257+
(tableConfig t)
1258+
snapWriteBufferNumber
1259+
snapLevels'
1260+
mTreeOpt
12491261
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
12501262
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
12511263
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData
@@ -1290,7 +1302,7 @@ openSnapshot sesh label tableType override snap resolve = do
12901302
Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap)
12911303
Right x -> pure x
12921304

1293-
let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels = snapMetaData
1305+
let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels mTreeOpt = snapMetaData
12941306

12951307
unless (tableType == tableType') $
12961308
throwIO (ErrSnapshotWrongTableType snap tableType tableType')
@@ -1309,18 +1321,24 @@ openSnapshot sesh label tableType override snap resolve = do
13091321

13101322
-- Hard link runs into the active directory,
13111323
snapLevels' <- traverse (openRun hfs hbio uc reg snapDir activeDir) snapLevels
1324+
unionLevel <- case mTreeOpt of
1325+
Nothing -> pure NoUnion
1326+
Just mTree -> do
1327+
snapTree <- traverse (openRun hfs hbio uc reg snapDir activeDir) mTree
1328+
Union <$> fromSnapMergingTree reg hfs hbio conf uc resolve activeDir snapTree
13121329

13131330
-- Convert from the snapshot format, restoring merge progress in the process
13141331
tableLevels <- fromSnapLevels hfs hbio uc conf resolve reg activeDir snapLevels'
13151332
traverse_ (delayedCommit reg . releaseRef) snapLevels'
1333+
--TODO: also delayedCommit unionLevel
13161334

13171335
tableCache <- mkLevelsCache reg tableLevels
13181336
newWith reg sesh seshEnv conf' am $! TableContent {
13191337
tableWriteBuffer
13201338
, tableWriteBufferBlobs
13211339
, tableLevels
13221340
, tableCache
1323-
, tableUnionLevel = NoUnion -- TODO: at some point also load union level from snapshot
1341+
, tableUnionLevel = unionLevel
13241342
}
13251343

13261344
{-# SPECIALISE deleteSnapshot ::
@@ -1370,7 +1388,7 @@ listSnapshots sesh = do
13701388
else pure $ Nothing
13711389

13721390
{-------------------------------------------------------------------------------
1373-
Mutiple writable tables
1391+
Multiple writable tables
13741392
-------------------------------------------------------------------------------}
13751393

13761394
{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-}

src/Database/LSMTree/Internal/MergingTree.hs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
module Database.LSMTree.Internal.MergingTree (
33
-- $mergingtrees
44
MergingTree (..)
5+
, newOngoingMerge
56
, PreExistingRun (..)
67
, newPendingLevelMerge
78
, newPendingUnionMerge
@@ -102,6 +103,15 @@ data PreExistingRun m h =
102103
PreExistingRun !(Ref (Run m h))
103104
| PreExistingMergingRun !(Ref (MergingRun MR.LevelMergeType m h))
104105

106+
-- | Create a new 'MergingTree' representing the merge of an ongoing run.
107+
-- The usage of this function is primarily to facilitate the reloading of an
108+
-- ongoing merge from a persistent snapshot.
109+
newOngoingMerge ::
110+
(MonadMVar m, PrimMonad m, MonadMask m)
111+
=> Ref (MergingRun MR.TreeMergeType m h)
112+
-> m (Ref (MergingTree m h))
113+
newOngoingMerge = mkMergingTree . OngoingTreeMerge
114+
105115
-- | Create a new 'MergingTree' representing the merge of a sequence of
106116
-- pre-existing runs (completed or ongoing, plus a optional final tree).
107117
-- This is for merging the entire contents of a table down to a single run
@@ -183,7 +193,7 @@ newPendingUnionMerge ::
183193
-> m (Ref (MergingTree m h))
184194
newPendingUnionMerge mts = do
185195
mts' <- V.filterM (fmap not . isStructurallyEmpty) (V.fromList mts)
186-
-- isStructurallyEmpty is interruptable even with async exceptions masked,
196+
-- isStructurallyEmpty is interruptible even with async exceptions masked,
187197
-- but we use it before allocating new references.
188198
mts'' <- V.mapM dupRef mts'
189199
case V.uncons mts'' of

src/Database/LSMTree/Internal/Snapshot.hs

Lines changed: 164 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,27 @@ module Database.LSMTree.Internal.Snapshot (
88
, SnapLevel (..)
99
, SnapIncomingRun (..)
1010
, SnapMergingRunState (..)
11+
-- * MergeTree snapshot format
12+
, SnapMergingTree(..)
13+
, SnapMergingTreeState(..)
14+
, SnapPendingMerge(..)
15+
, SnapPreExistingRun(..)
1116
-- * Conversion to levels snapshot format
1217
, toSnapLevels
18+
-- * Conversion to merging tree snapshot format
19+
, toSnapMergingTree
1320
-- * Write buffer
1421
, snapshotWriteBuffer
1522
, openWriteBuffer
1623
-- * Run
1724
, SnapshotRun (..)
1825
, snapshotRun
1926
, openRun
20-
-- * Opening from levels snapshot format
27+
-- * Opening snapshot formats
28+
-- ** Levels format
2129
, fromSnapLevels
30+
-- ** Merging Tree format
31+
, fromSnapMergingTree
2232
-- * Hard links
2333
, hardLinkRunFiles
2434
) where
@@ -43,6 +53,7 @@ import qualified Database.LSMTree.Internal.Merge as Merge
4353
import Database.LSMTree.Internal.MergeSchedule
4454
import Database.LSMTree.Internal.MergingRun (NumRuns (..))
4555
import qualified Database.LSMTree.Internal.MergingRun as MR
56+
import qualified Database.LSMTree.Internal.MergingTree as MT
4657
import Database.LSMTree.Internal.Paths (ActiveDir (..), ForBlob (..),
4758
ForKOps (..), NamedSnapshotDir (..), RunFsPaths (..),
4859
WriteBufferFsPaths (..),
@@ -93,7 +104,7 @@ data SnapshotMetaData = SnapshotMetaData {
93104
--
94105
-- One could argue that the 'SnapshotName' could be used to to hold this
95106
-- type information, but the file name of snapshot metadata is not guarded
96-
-- by a checksum, wherease the contents of the file are. Therefore using the
107+
-- by a checksum, whereas the contents of the file are. Therefore using the
97108
-- 'SnapshotLabel' is safer.
98109
snapMetaLabel :: !SnapshotLabel
99110
-- | Whether a table is normal or monoidal.
@@ -110,11 +121,15 @@ data SnapshotMetaData = SnapshotMetaData {
110121
, snapWriteBuffer :: !RunNumber
111122
-- | The shape of the levels of the LSM tree.
112123
, snapMetaLevels :: !(SnapLevels SnapshotRun)
124+
-- | The state of tree merging of the LSM tree.
125+
, snapMergingTree :: !(Maybe (SnapMergingTree SnapshotRun))
113126
}
114127
deriving stock Eq
115128

116129
instance NFData SnapshotMetaData where
117-
rnf (SnapshotMetaData a b c d e) = rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e
130+
rnf (SnapshotMetaData a b c d e f) =
131+
rnf a `seq` rnf b `seq` rnf c `seq`
132+
rnf d `seq` rnf e `seq` rnf f
118133

119134
{-------------------------------------------------------------------------------
120135
Levels snapshot format
@@ -182,6 +197,151 @@ instance (NFData t, NFData r) => NFData (SnapMergingRunState t r) where
182197
rnf (SnapCompletedMerge a b c) = rnf a `seq` rnf b `seq` rnf c
183198
rnf (SnapOngoingMerge a b c d) = rnf a `seq` rnf b `seq` rnf c `seq` rnf d
184199

200+
{-------------------------------------------------------------------------------
201+
Snapshot MergingTree
202+
-------------------------------------------------------------------------------}
203+
204+
newtype SnapMergingTree r = SnapMergingTree (SnapMergingTreeState r)
205+
deriving stock (Eq, Functor, Foldable, Traversable)
206+
deriving newtype NFData
207+
208+
data SnapMergingTreeState r =
209+
SnapCompletedTreeMerge !r
210+
| SnapPendingTreeMerge !(SnapPendingMerge r)
211+
| SnapOngoingTreeMerge
212+
!(SnapMergingRunState MR.TreeMergeType r)
213+
deriving stock (Eq, Functor, Foldable, Traversable)
214+
215+
instance NFData r => NFData (SnapMergingTreeState r) where
216+
rnf (SnapCompletedTreeMerge a) = rnf a
217+
rnf (SnapPendingTreeMerge a) = rnf a
218+
rnf (SnapOngoingTreeMerge a) = rnf a
219+
220+
data SnapPendingMerge r =
221+
SnapPendingLevelMerge
222+
![SnapPreExistingRun r]
223+
!(Maybe (SnapMergingTree r))
224+
| SnapPendingUnionMerge
225+
![SnapMergingTree r]
226+
deriving stock (Eq, Functor, Foldable, Traversable)
227+
228+
instance NFData r => NFData (SnapPendingMerge r) where
229+
rnf (SnapPendingLevelMerge a b) = rnf a `seq` rnf b
230+
rnf (SnapPendingUnionMerge a) = rnf a
231+
232+
data SnapPreExistingRun r =
233+
SnapPreExistingRun !r
234+
| SnapPreExistingMergingRun
235+
!(SnapMergingRunState MR.LevelMergeType r)
236+
deriving stock (Eq, Functor, Foldable, Traversable)
237+
238+
instance NFData r => NFData (SnapPreExistingRun r) where
239+
rnf (SnapPreExistingRun a) = rnf a
240+
rnf (SnapPreExistingMergingRun a) = rnf a
241+
242+
{-------------------------------------------------------------------------------
243+
Opening from merging tree snapshot format
244+
-------------------------------------------------------------------------------}
245+
246+
{-# SPECIALISE fromSnapMergingTree ::
247+
ActionRegistry IO
248+
-> HasFS IO h
249+
-> HasBlockIO IO h
250+
-> TableConfig
251+
-> UniqCounter IO
252+
-> ResolveSerialisedValue
253+
-> ActiveDir
254+
-> SnapMergingTree (Ref (Run IO h))
255+
-> IO (Ref (MT.MergingTree IO h))
256+
#-}
257+
-- | Duplicates runs and re-creates merging runs.
258+
fromSnapMergingTree ::
259+
forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
260+
=> ActionRegistry m
261+
-> HasFS m h
262+
-> HasBlockIO m h
263+
-> TableConfig
264+
-> UniqCounter m
265+
-> ResolveSerialisedValue
266+
-> ActiveDir
267+
-> SnapMergingTree (Ref (Run m h))
268+
-> m (Ref (MT.MergingTree m h))
269+
fromSnapMergingTree reg hfs hbio conf uc resolve dir (SnapMergingTree snapTreeState) =
270+
fromSnapTreeState snapTreeState
271+
where
272+
-- Partially applied functions for convenience
273+
recurrence :: SnapMergingTree (Ref (Run m h)) -> m (Ref (MT.MergingTree m h))
274+
recurrence = fromSnapMergingTree reg hfs hbio conf uc resolve dir
275+
276+
getSnapMergingRunState
277+
:: forall t.
278+
MR.IsMergeType t
279+
=> SnapMergingRunState t (Ref (Run m h))
280+
-> m (Ref (MR.MergingRun t m h))
281+
getSnapMergingRunState = fromSnapMergingRunState hfs hbio uc resolve dir
282+
283+
-- Conversion definitions
284+
fromSnapTreeState :: SnapMergingTreeState (Ref (Run m h)) -> m (Ref (MT.MergingTree m h))
285+
fromSnapTreeState (SnapCompletedTreeMerge run) =
286+
MT.newPendingLevelMerge [MT.PreExistingRun run] Nothing
287+
fromSnapTreeState (SnapPendingTreeMerge pMerge) = case pMerge of
288+
SnapPendingLevelMerge peRuns maybeMergeTree -> do
289+
peRuns' <- traverse fromSnapPreExistingRun peRuns
290+
maybeMergeTree' <- traverse recurrence maybeMergeTree
291+
MT.newPendingLevelMerge peRuns' maybeMergeTree'
292+
SnapPendingUnionMerge mergeTrees ->
293+
MT.newPendingUnionMerge =<< traverse recurrence mergeTrees
294+
fromSnapTreeState (SnapOngoingTreeMerge smrs) =
295+
MT.newOngoingMerge =<< getSnapMergingRunState smrs
296+
297+
fromSnapPreExistingRun :: SnapPreExistingRun (Ref (Run m h)) -> m (MT.PreExistingRun m h)
298+
fromSnapPreExistingRun (SnapPreExistingRun run) = pure $ MT.PreExistingRun run
299+
fromSnapPreExistingRun (SnapPreExistingMergingRun smrs) =
300+
MT.PreExistingMergingRun <$> getSnapMergingRunState smrs
301+
302+
{-------------------------------------------------------------------------------
303+
Conversion to merge tree snapshot format
304+
-------------------------------------------------------------------------------}
305+
306+
{-# SPECIALISE toSnapMergingTree :: Ref (MT.MergingTree IO h) -> IO (SnapMergingTree (Ref (Run IO h))) #-}
307+
toSnapMergingTree ::
308+
(PrimMonad m, MonadMVar m)
309+
=> Ref (MT.MergingTree m h)
310+
-> m (SnapMergingTree (Ref (Run m h)))
311+
toSnapMergingTree (DeRef (MT.MergingTree mStateVar _mCounter)) =
312+
withMVar mStateVar $ \mState -> SnapMergingTree <$> toSnapMergingTreeState mState
313+
314+
{-# SPECIALISE toSnapMergingTreeState :: MT.MergingTreeState IO h -> IO (SnapMergingTreeState (Ref (Run IO h))) #-}
315+
toSnapMergingTreeState ::
316+
(PrimMonad m, MonadMVar m)
317+
=> MT.MergingTreeState m h
318+
-> m (SnapMergingTreeState (Ref (Run m h)))
319+
toSnapMergingTreeState (MT.CompletedTreeMerge r) = pure $ SnapCompletedTreeMerge r
320+
toSnapMergingTreeState (MT.PendingTreeMerge p) = SnapPendingTreeMerge <$> toSnapPendingMerge p
321+
toSnapMergingTreeState (MT.OngoingTreeMerge mergingRun) =
322+
SnapOngoingTreeMerge <$> toSnapMergingRunState mergingRun
323+
324+
{-# SPECIALISE toSnapPendingMerge :: MT.PendingMerge IO h -> IO (SnapPendingMerge (Ref (Run IO h))) #-}
325+
toSnapPendingMerge ::
326+
(PrimMonad m, MonadMVar m)
327+
=> MT.PendingMerge m h
328+
-> m (SnapPendingMerge (Ref (Run m h)))
329+
toSnapPendingMerge (MT.PendingUnionMerge mts) =
330+
SnapPendingUnionMerge <$> traverse toSnapMergingTree (V.toList mts)
331+
toSnapPendingMerge (MT.PendingLevelMerge pes mmt) = do
332+
pes' <- traverse toSnapPreExistingRun pes
333+
mmt' <- traverse toSnapMergingTree mmt
334+
pure $ SnapPendingLevelMerge (V.toList pes') mmt'
335+
336+
{-# SPECIALISE toSnapPreExistingRun :: MT.PreExistingRun IO h -> IO (SnapPreExistingRun (Ref (Run IO h))) #-}
337+
toSnapPreExistingRun ::
338+
(PrimMonad m, MonadMVar m)
339+
=> MT.PreExistingRun m h
340+
-> m (SnapPreExistingRun (Ref (Run m h)))
341+
toSnapPreExistingRun (MT.PreExistingRun run) = pure $ SnapPreExistingRun run
342+
toSnapPreExistingRun (MT.PreExistingMergingRun peMergingRun) =
343+
SnapPreExistingMergingRun <$> toSnapMergingRunState peMergingRun
344+
185345
{-------------------------------------------------------------------------------
186346
Conversion to levels snapshot format
187347
-------------------------------------------------------------------------------}
@@ -235,7 +395,7 @@ toSnapMergingRunState ::
235395
-> m (SnapMergingRunState t (Ref (Run m h)))
236396
toSnapMergingRunState !mr = do
237397
-- TODO: MR.snapshot needs to return duplicated run references, and we
238-
-- need to arrange to release them when the snapshoting is done.
398+
-- need to arrange to release them when the snapshotting is done.
239399
(numRuns, mergeDebt, mergeCredits, state) <- MR.snapshot mr
240400
case state of
241401
MR.CompletedMerge r ->

0 commit comments

Comments
 (0)