Skip to content

Commit cff7a39

Browse files
authored
Merge pull request #608 from IntersectMBO/dcoutts/snapshot-refactoring
Implement snapshots for union tables
2 parents 585b257 + 2080353 commit cff7a39

File tree

275 files changed

+1708
-819
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

275 files changed

+1708
-819
lines changed

bench/macro/lsm-tree-bench-lookups.hs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import Database.LSMTree.Internal.Paths (RunFsPaths (RunFsPaths))
3232
import Database.LSMTree.Internal.Run (Run)
3333
import qualified Database.LSMTree.Internal.Run as Run
3434
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
35+
import Database.LSMTree.Internal.RunBuilder (RunParams (..))
3536
import qualified Database.LSMTree.Internal.RunBuilder as RunBuilder
3637
import Database.LSMTree.Internal.RunNumber
3738
import Database.LSMTree.Internal.Serialise (SerialisedKey,
@@ -349,10 +350,13 @@ lookupsEnv runSizes keyRng0 hfs hbio caching = do
349350
-- create the runs
350351
rbs <- sequence
351352
[ RunBuilder.new hfs hbio
353+
RunParams {
354+
runParamCaching = caching,
355+
runParamAlloc = RunAllocFixed benchmarkNumBitsPerEntry,
356+
runParamIndex = Index.Compact
357+
}
352358
(RunFsPaths (FS.mkFsPath []) (RunNumber i))
353359
(NumEntries numEntries)
354-
(RunAllocFixed benchmarkNumBitsPerEntry)
355-
Index.Compact
356360
| ((numEntries, _), i) <- zip runSizes [0..] ]
357361

358362
-- fill the runs
@@ -373,7 +377,7 @@ lookupsEnv runSizes keyRng0 hfs hbio caching = do
373377
putStr "DONE"
374378

375379
-- return runs
376-
runs <- V.fromList <$> mapM (Run.fromMutable caching) rbs
380+
runs <- V.fromList <$> mapM Run.fromMutable rbs
377381
let blooms = V.map (\(DeRef r) -> Run.runFilter r) runs
378382
indexes = V.map (\(DeRef r) -> Run.runIndex r) runs
379383
handles = V.map (\(DeRef r) -> Run.runKOpsFile r) runs

bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@ import qualified Data.Vector as V
1919
import Database.LSMTree.Extras.Orphans ()
2020
import Database.LSMTree.Extras.Random (frequency, randomByteStringR,
2121
sampleUniformWithReplacement, uniformWithoutReplacement)
22+
import Database.LSMTree.Extras.RunData (defaultRunParams)
2223
import Database.LSMTree.Extras.UTxO
2324
import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..))
24-
import qualified Database.LSMTree.Internal.Index as Index (IndexType (Compact))
2525
import Database.LSMTree.Internal.Lookup (bloomQueries, indexSearches,
2626
intraPageLookups, lookupsIO, prepLookups)
2727
import Database.LSMTree.Internal.Page (getNumPages)
2828
import Database.LSMTree.Internal.Paths (RunFsPaths (..))
2929
import Database.LSMTree.Internal.Run (Run)
3030
import qualified Database.LSMTree.Internal.Run as Run
31-
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
3231
import Database.LSMTree.Internal.RunNumber
3332
import Database.LSMTree.Internal.Serialise
3433
import qualified Database.LSMTree.Internal.WriteBuffer as WB
@@ -192,7 +191,7 @@ lookupsInBatchesEnv Config {..} = do
192191
wbblobs <- WBB.new hasFS (FS.mkFsPath ["0.wbblobs"])
193192
wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob hasFS wbblobs)) storedKeys
194193
let fsps = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
195-
r <- Run.fromWriteBuffer hasFS hasBlockIO caching (RunAllocFixed 10) Index.Compact fsps wb wbblobs
194+
r <- Run.fromWriteBuffer hasFS hasBlockIO defaultRunParams fsps wb wbblobs
196195
let NumEntries nentriesReal = Run.size r
197196
assertEqual nentriesReal nentries $ pure ()
198197
-- 42 to 43 entries per page

bench/micro/Bench/Database/LSMTree/Internal/Merge.hs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import qualified Database.LSMTree.Internal.Merge as Merge
2323
import Database.LSMTree.Internal.Paths (RunFsPaths (..))
2424
import Database.LSMTree.Internal.Run (Run)
2525
import qualified Database.LSMTree.Internal.Run as Run
26-
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
2726
import Database.LSMTree.Internal.RunNumber
2827
import Database.LSMTree.Internal.Serialise
2928
import Database.LSMTree.Internal.UniqCounter
@@ -264,8 +263,7 @@ merge ::
264263
merge fs hbio Config {..} targetPaths runs = do
265264
let f = fromMaybe const mergeMappend
266265
m <- fromMaybe (error "empty inputs, no merge created") <$>
267-
Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) Index.Compact
268-
mergeType f targetPaths runs
266+
Merge.new fs hbio defaultRunParams mergeType f targetPaths runs
269267
Merge.stepsToCompletion m stepSize
270268

271269
fsPath :: FS.FsPath

lsm-tree.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ library
131131
Database.LSMTree.Internal.CRC32C
132132
Database.LSMTree.Internal.Cursor
133133
Database.LSMTree.Internal.Entry
134+
Database.LSMTree.Internal.IncomingRun
134135
Database.LSMTree.Internal.Index
135136
Database.LSMTree.Internal.Index.Compact
136137
Database.LSMTree.Internal.Index.CompactAcc

src-extras/Database/LSMTree/Extras/MergingRunData.hs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
2828
import Database.LSMTree.Internal.MergingRun (MergingRun)
2929
import qualified Database.LSMTree.Internal.MergingRun as MR
3030
import Database.LSMTree.Internal.Paths
31-
import Database.LSMTree.Internal.Run (RunDataCaching (..))
3231
import qualified Database.LSMTree.Internal.Run as Run
33-
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
3432
import Database.LSMTree.Internal.RunNumber
3533
import Database.LSMTree.Internal.Serialise
3634
import Database.LSMTree.Internal.UniqCounter
@@ -88,8 +86,8 @@ unsafeCreateMergingRun hfs hbio resolve indexType path counter = \case
8886
$ \runs -> do
8987
n <- incrUniqCounter counter
9088
let fsPaths = RunFsPaths path (RunNumber (uniqueToInt n))
91-
MR.new hfs hbio resolve CacheRunData (RunAllocFixed 10) indexType
92-
mergeType fsPaths (V.fromList runs)
89+
MR.new hfs hbio resolve defaultRunParams mergeType
90+
fsPaths (V.fromList runs)
9391

9492
{-------------------------------------------------------------------------------
9593
MergingRunData

src-extras/Database/LSMTree/Extras/MergingTreeData.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,17 @@ unsafeCreateMergingTree ::
7474
unsafeCreateMergingTree hfs hbio resolve indexType path counter = go
7575
where
7676
go = \case
77-
CompletedTreeMergeData rd -> do
77+
CompletedTreeMergeData rd ->
7878
withRun hfs hbio indexType path counter rd $ \run ->
79-
MT.mkMergingTree . MT.CompletedTreeMerge =<< dupRef run
80-
OngoingTreeMergeData mrd -> do
79+
MT.newCompletedMerge run
80+
OngoingTreeMergeData mrd ->
8181
withMergingRun hfs hbio resolve indexType path counter mrd $ \mr ->
82-
MT.mkMergingTree . MT.OngoingTreeMerge =<< dupRef mr
83-
PendingLevelMergeData prds mtd -> do
82+
MT.newOngoingMerge mr
83+
PendingLevelMergeData prds mtd ->
8484
withPreExistingRuns prds $ \prs ->
8585
withMaybeTree mtd $ \mt ->
8686
MT.newPendingLevelMerge prs mt
87-
PendingUnionMergeData mtds -> do
87+
PendingUnionMergeData mtds ->
8888
withTrees mtds $ \mts ->
8989
MT.newPendingUnionMerge mts
9090

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,18 @@ deriving stock instance Generic (Run m h)
214214
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
215215
=> NoThunks (Run m h)
216216

217+
deriving stock instance Generic RunParams
218+
deriving anyclass instance NoThunks RunParams
219+
220+
deriving stock instance Generic RunBloomFilterAlloc
221+
deriving anyclass instance NoThunks RunBloomFilterAlloc
222+
217223
deriving stock instance Generic RunDataCaching
218224
deriving anyclass instance NoThunks RunDataCaching
219225

226+
deriving stock instance Generic IndexType
227+
deriving anyclass instance NoThunks IndexType
228+
220229
{-------------------------------------------------------------------------------
221230
Paths
222231
-------------------------------------------------------------------------------}

src-extras/Database/LSMTree/Extras/RunData.hs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
-- from them. Tests and benchmarks should preferably use these utilities instead
33
-- of (re-)defining their own.
44
module Database.LSMTree.Extras.RunData (
5+
-- * RunParams
6+
defaultRunParams
57
-- * Create runs
6-
withRun
8+
, withRun
79
, withRunAt
810
, withRuns
911
, unsafeCreateRun
@@ -48,12 +50,13 @@ import qualified Data.Vector as V
4850
import Database.LSMTree.Extras (showPowersOf10)
4951
import Database.LSMTree.Extras.Generators ()
5052
import Database.LSMTree.Internal.Entry
51-
import Database.LSMTree.Internal.Index (IndexType)
53+
import Database.LSMTree.Internal.Index (IndexType (..))
5254
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
5355
import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries)
5456
import Database.LSMTree.Internal.Paths
5557
import qualified Database.LSMTree.Internal.Paths as Paths
56-
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..))
58+
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..),
59+
RunParams (..))
5760
import qualified Database.LSMTree.Internal.Run as Run
5861
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..),
5962
entryWouldFitInPage)
@@ -69,6 +72,15 @@ import qualified System.FS.BlockIO.API as FS
6972
import System.FS.BlockIO.API (HasBlockIO)
7073
import Test.QuickCheck
7174

75+
76+
defaultRunParams :: RunParams
77+
defaultRunParams =
78+
RunParams {
79+
runParamCaching = CacheRunData,
80+
runParamAlloc = RunAllocFixed 10,
81+
runParamIndex = Compact
82+
}
83+
7284
{-------------------------------------------------------------------------------
7385
Create runs
7486
-------------------------------------------------------------------------------}
@@ -153,7 +165,8 @@ unsafeCreateRunAt fs hbio indexType fsPaths (RunData m) = do
153165
let blobpath = FS.addExtension (runBlobPath fsPaths) ".wb"
154166
bracket (WBB.new fs blobpath) releaseRef $ \wbblobs -> do
155167
wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob fs wbblobs)) m
156-
Run.fromWriteBuffer fs hbio CacheRunData (RunAllocFixed 10) indexType
168+
Run.fromWriteBuffer fs hbio
169+
defaultRunParams { runParamIndex = indexType }
157170
fsPaths wb wbblobs
158171

159172
-- | Create a 'RunFsPaths' using an empty 'FsPath'. The empty path corresponds

src/Database/LSMTree/Internal.hs

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ module Database.LSMTree.Internal (
6464
, openSnapshot
6565
, deleteSnapshot
6666
, listSnapshots
67-
-- * Mutiple writable tables
67+
-- * Multiple writable tables
6868
, duplicate
6969
-- * Table union
7070
, unions
@@ -323,7 +323,7 @@ data SessionState m h =
323323
| SessionClosed
324324

325325
data SessionEnv m h = SessionEnv {
326-
-- | The path to the directory in which this sesion is live. This is a path
326+
-- | The path to the directory in which this session is live. This is a path
327327
-- relative to root of the 'HasFS' instance.
328328
--
329329
-- INVARIANT: the session root is never changed during the lifetime of a
@@ -1234,18 +1234,30 @@ createSnapshot snap label tableType t = do
12341234
let wb = tableWriteBuffer content
12351235
let wbb = tableWriteBufferBlobs content
12361236
snapWriteBufferNumber <- Paths.writeBufferNumber <$>
1237-
snapshotWriteBuffer reg hfs hbio activeUc snapUc activeDir snapDir wb wbb
1237+
snapshotWriteBuffer hfs hbio activeUc snapUc reg activeDir snapDir wb wbb
12381238

12391239
-- Convert to snapshot format
12401240
snapLevels <- toSnapLevels (tableLevels content)
12411241

12421242
-- Hard link runs into the named snapshot directory
1243-
snapLevels' <- snapshotRuns reg snapUc snapDir snapLevels
1243+
snapLevels' <- traverse (snapshotRun hfs hbio snapUc reg snapDir) snapLevels
1244+
1245+
-- If a merging tree exists, do the same hard-linking for the runs within
1246+
mTreeOpt <- case tableUnionLevel content of
1247+
NoUnion -> pure Nothing
1248+
Union mTreeRef -> do
1249+
mTree <- toSnapMergingTree mTreeRef
1250+
Just <$> traverse (snapshotRun hfs hbio snapUc reg snapDir) mTree
12441251

1245-
-- Release the table content
12461252
releaseTableContent reg content
12471253

1248-
let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels'
1254+
let snapMetaData = SnapshotMetaData
1255+
label
1256+
tableType
1257+
(tableConfig t)
1258+
snapWriteBufferNumber
1259+
snapLevels'
1260+
mTreeOpt
12491261
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
12501262
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
12511263
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData
@@ -1290,7 +1302,7 @@ openSnapshot sesh label tableType override snap resolve = do
12901302
Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap)
12911303
Right x -> pure x
12921304

1293-
let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels = snapMetaData
1305+
let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels mTreeOpt = snapMetaData
12941306

12951307
unless (tableType == tableType') $
12961308
throwIO (ErrSnapshotWrongTableType snap tableType tableType')
@@ -1308,19 +1320,26 @@ openSnapshot sesh label tableType override snap resolve = do
13081320
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths
13091321

13101322
-- Hard link runs into the active directory,
1311-
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels
1323+
snapLevels' <- traverse (openRun hfs hbio uc reg snapDir activeDir) snapLevels
1324+
unionLevel <- case mTreeOpt of
1325+
Nothing -> pure NoUnion
1326+
Just mTree -> do
1327+
snapTree <- traverse (openRun hfs hbio uc reg snapDir activeDir) mTree
1328+
mt <- fromSnapMergingTree hfs hbio uc resolve activeDir reg snapTree
1329+
traverse_ (delayedCommit reg . releaseRef) snapTree
1330+
pure (Union mt)
13121331

13131332
-- Convert from the snapshot format, restoring merge progress in the process
1314-
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve activeDir snapLevels'
1315-
releaseRuns reg snapLevels'
1333+
tableLevels <- fromSnapLevels hfs hbio uc conf resolve reg activeDir snapLevels'
1334+
traverse_ (delayedCommit reg . releaseRef) snapLevels'
13161335

13171336
tableCache <- mkLevelsCache reg tableLevels
13181337
newWith reg sesh seshEnv conf' am $! TableContent {
13191338
tableWriteBuffer
13201339
, tableWriteBufferBlobs
13211340
, tableLevels
13221341
, tableCache
1323-
, tableUnionLevel = NoUnion -- TODO: at some point also load union level from snapshot
1342+
, tableUnionLevel = unionLevel
13241343
}
13251344

13261345
{-# SPECIALISE deleteSnapshot ::
@@ -1370,7 +1389,7 @@ listSnapshots sesh = do
13701389
else pure $ Nothing
13711390

13721391
{-------------------------------------------------------------------------------
1373-
Mutiple writable tables
1392+
Multiple writable tables
13741393
-------------------------------------------------------------------------------}
13751394

13761395
{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-}
@@ -1534,28 +1553,19 @@ writeBufferToNewRun SessionEnv {
15341553
sessionHasBlockIO = hbio,
15351554
sessionUniqCounter = uc
15361555
}
1537-
conf@TableConfig {
1538-
confDiskCachePolicy,
1539-
confFencePointerIndex
1540-
}
1556+
conf
15411557
TableContent{
15421558
tableWriteBuffer,
15431559
tableWriteBufferBlobs
15441560
}
15451561
| WB.null tableWriteBuffer = pure Nothing
15461562
| otherwise = Just <$> do
1547-
!n <- incrUniqCounter uc
1548-
let !ln = LevelNo 1
1549-
!cache = diskCachePolicyForLevel confDiskCachePolicy ln
1550-
!alloc = bloomFilterAllocForLevel conf ln
1551-
!indexType = indexTypeForRun confFencePointerIndex
1552-
!path = Paths.runPath (Paths.activeDir root)
1553-
(uniqueToRunNumber n)
1554-
Run.fromWriteBuffer hfs hbio
1555-
cache
1556-
alloc
1557-
indexType
1558-
path
1563+
!uniq <- incrUniqCounter uc
1564+
let (!runParams, !runPaths) = mergingRunParamsForLevel
1565+
(Paths.activeDir root) conf uniq (LevelNo 1)
1566+
Run.fromWriteBuffer
1567+
hfs hbio
1568+
runParams runPaths
15591569
tableWriteBuffer
15601570
tableWriteBufferBlobs
15611571

src/Database/LSMTree/Internal/Config.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import qualified Monkey
4646

4747
newtype LevelNo = LevelNo Int
4848
deriving stock (Show, Eq)
49-
deriving newtype Enum
49+
deriving newtype (Enum, NFData)
5050

5151
{-------------------------------------------------------------------------------
5252
Table configuration

0 commit comments

Comments
 (0)