@@ -68,7 +68,6 @@ import Control.Monad.Primitive
6868import Control.TempRegistry
6969import Control.Tracer
7070import Data.Arena (ArenaManager , newArenaManager )
71- import Data.Bifunctor (Bifunctor (.. ))
7271import qualified Data.ByteString.Char8 as BSC
7372import Data.Char (isNumber )
7473import Data.Foldable
@@ -90,19 +89,19 @@ import qualified Database.LSMTree.Internal.Entry as Entry
9089import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy ,
9190 ResolveSerialisedValue , lookupsIO )
9291import Database.LSMTree.Internal.MergeSchedule
93- import Database.LSMTree.Internal.Paths (RunFsPaths (.. ),
94- SessionRoot ( .. ), SnapshotName )
92+ import Database.LSMTree.Internal.Paths (SessionRoot (.. ),
93+ SnapshotName )
9594import qualified Database.LSMTree.Internal.Paths as Paths
9695import Database.LSMTree.Internal.Range (Range (.. ))
9796import qualified Database.LSMTree.Internal.RawBytes as RB
9897import Database.LSMTree.Internal.Run (Run )
9998import qualified Database.LSMTree.Internal.Run as Run
100- import Database.LSMTree.Internal.RunNumber
10199import qualified Database.LSMTree.Internal.RunReader as Reader
102100import Database.LSMTree.Internal.RunReaders (OffsetKey (.. ))
103101import qualified Database.LSMTree.Internal.RunReaders as Readers
104102import Database.LSMTree.Internal.Serialise (SerialisedBlob (.. ),
105103 SerialisedKey , SerialisedValue )
104+ import Database.LSMTree.Internal.Snapshot
106105import Database.LSMTree.Internal.UniqCounter
107106import qualified Database.LSMTree.Internal.Vector as V
108107import qualified Database.LSMTree.Internal.WriteBuffer as WB
@@ -1201,14 +1200,18 @@ snapshot resolve snap label th = do
12011200 traceWith (tableTracer th) $ TraceSnapshot snap
12021201 let conf = tableConfig th
12031202 withOpenTable th $ \ thEnv -> do
1203+ let hfs = tableHasFS thEnv
1204+ let snapPath = Paths. snapshot (tableSessionRoot thEnv) snap
1205+ FS. doesFileExist (tableHasFS thEnv) snapPath >>= \ b ->
1206+ when b $ throwIO (ErrSnapshotExists snap)
1207+
12041208 -- For the temporary implementation it is okay to just flush the buffer
12051209 -- before taking the snapshot.
1206- let hfs = tableHasFS thEnv
12071210 content <- modifyWithTempRegistry
12081211 (RW. unsafeAcquireWriteAccess (tableContent thEnv))
12091212 (atomically . RW. unsafeReleaseWriteAccess (tableContent thEnv))
12101213 $ \ reg content -> do
1211- r <- flushWriteBuffer
1214+ content' <- flushWriteBuffer
12121215 (TraceMerge `contramap` tableTracer th)
12131216 conf
12141217 resolve
@@ -1218,29 +1221,22 @@ snapshot resolve snap label th = do
12181221 (tableSessionUniqCounter thEnv)
12191222 reg
12201223 content
1221- pure (r, r )
1224+ pure (content', content' )
12221225 -- At this point, we've flushed the write buffer but we haven't created the
12231226 -- snapshot file yet. If an asynchronous exception happens beyond this
12241227 -- point, we'll take that loss, as the inner state of the table is still
12251228 -- consistent.
1226- runNumbers <- V. forM (tableLevels content) $ \ (Level mr rs) -> do
1227- (,V. map (runNumber . Run. runRunFsPaths) rs) <$>
1228- case mr of
1229- SingleRun r -> pure (True , runNumber (Run. runRunFsPaths r))
1230- MergingRun var -> do
1231- withMVar var $ \ case
1232- CompletedMerge r -> pure (False , runNumber (Run. runRunFsPaths r))
1233- OngoingMerge {} -> error " snapshot: OngoingMerge not yet supported" -- TODO: implement
1234- let snapPath = Paths. snapshot (tableSessionRoot thEnv) snap
1235- FS. doesFileExist (tableHasFS thEnv) snapPath >>= \ b ->
1236- when b $ throwIO (ErrSnapshotExists snap)
1229+
1230+ snappedLevels <- snapLevels (tableLevels content)
1231+ let snapContents = BSC. pack $ show (label, snappedLevels, tableConfig th)
1232+
12371233 FS. withFile
12381234 (tableHasFS thEnv)
12391235 snapPath
12401236 (FS. WriteMode FS. MustBeNew ) $ \ h ->
1241- void $ FS. hPutAllStrict (tableHasFS thEnv) h
1242- ( BSC. pack $ show (label, runNumbers, tableConfig th))
1243- pure $! V. sum ( V. map ( \ ((_ :: ( Bool , RunNumber )), rs) -> 1 + V. length rs) runNumbers)
1237+ void $ FS. hPutAllStrict (tableHasFS thEnv) h snapContents
1238+
1239+ pure $! numSnapRuns snappedLevels
12441240
12451241{-# SPECIALISE open ::
12461242 Session IO h
@@ -1270,26 +1266,17 @@ open sesh label override snap = do
12701266 snapPath
12711267 FS. ReadMode $ \ h ->
12721268 FS. hGetAll (sessionHasFS seshEnv) h
1273- let (label', runNumbers, conf) =
1274- -- why we are using read for this?
1275- -- apparently this is a temporary solution, to be done properly in WP15
1276- read @ (SnapshotLabel , V. Vector ((Bool , RunNumber ), V. Vector RunNumber ), TableConfig ) $
1277- BSC. unpack $ BSC. toStrict $ bs
1278-
1279- let conf' = applyOverride override conf
1269+ let (label', snappedLevels, conf) = read $ BSC. unpack $ BSC. toStrict $ bs
12801270 unless (label == label') $ throwIO (ErrSnapshotWrongType snap)
1281- let runPaths = V. map (bimap (second $ RunFsPaths (Paths. activeDir $ sessionRoot seshEnv))
1282- (V. map (RunFsPaths (Paths. activeDir $ sessionRoot seshEnv))))
1283- runNumbers
1284-
1271+ let conf' = applyOverride override conf
12851272 am <- newArenaManager
12861273 blobpath <- Paths. tableBlobPath (sessionRoot seshEnv) <$>
12871274 incrUniqCounter (sessionUniqCounter seshEnv)
12881275 tableWriteBufferBlobs
12891276 <- allocateTemp reg
12901277 (WBB. new hfs blobpath)
12911278 WBB. removeReference
1292- tableLevels <- openLevels reg hfs hbio (confDiskCachePolicy conf') runPaths
1279+ tableLevels <- openLevels reg hfs hbio conf (sessionRoot seshEnv) snappedLevels
12931280 tableCache <- mkLevelsCache reg tableLevels
12941281 newWith reg sesh seshEnv conf' am $! TableContent {
12951282 tableWriteBuffer = WB. empty
@@ -1298,37 +1285,6 @@ open sesh label override snap = do
12981285 , tableCache
12991286 }
13001287
1301- {-# SPECIALISE openLevels ::
1302- TempRegistry IO
1303- -> HasFS IO h
1304- -> HasBlockIO IO h
1305- -> DiskCachePolicy
1306- -> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths)
1307- -> IO (Levels IO h) #-}
1308- -- | Open multiple levels.
1309- openLevels ::
1310- (MonadFix m , MonadMask m , MonadMVar m , MonadSTM m , PrimMonad m )
1311- => TempRegistry m
1312- -> HasFS m h
1313- -> HasBlockIO m h
1314- -> DiskCachePolicy
1315- -> V. Vector ((Bool , RunFsPaths ), V. Vector RunFsPaths )
1316- -> m (Levels m h )
1317- openLevels reg hfs hbio diskCachePolicy levels =
1318- flip V. imapMStrict levels $ \ i (mrPath, rsPaths) -> do
1319- let ln = LevelNo (i+ 1 ) -- level 0 is the write buffer
1320- caching = diskCachePolicyForLevel diskCachePolicy ln
1321- ! r <- allocateTemp reg
1322- (Run. openFromDisk hfs hbio caching (snd mrPath))
1323- Run. removeReference
1324- ! rs <- flip V. mapMStrict rsPaths $ \ run ->
1325- allocateTemp reg
1326- (Run. openFromDisk hfs hbio caching run)
1327- Run. removeReference
1328- var <- newMVar (CompletedMerge r)
1329- let ! mr = if fst mrPath then SingleRun r else MergingRun var
1330- pure $! Level mr rs
1331-
13321288{-# SPECIALISE deleteSnapshot ::
13331289 Session IO h
13341290 -> SnapshotName
0 commit comments