Skip to content

Commit 01b734f

Browse files
authored
Merge pull request #454 from IntersectMBO/dcoutts/blob-ref-blob-file-refactor
Refactor blob refs
2 parents d45f32c + 1c2269e commit 01b734f

File tree

23 files changed

+467
-367
lines changed

23 files changed

+467
-367
lines changed

lsm-tree.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ library
119119
Database.LSMTree.Internal
120120
Database.LSMTree.Internal.Assertions
121121
Database.LSMTree.Internal.BitMath
122+
Database.LSMTree.Internal.BlobFile
122123
Database.LSMTree.Internal.BlobRef
123124
Database.LSMTree.Internal.BloomFilter
124125
Database.LSMTree.Internal.BloomFilterQuery1

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import qualified Data.Vector.Primitive as VP
3434
import qualified Data.Vector.Unboxed.Mutable as VUM
3535
import Data.Word
3636
import Database.LSMTree.Internal as Internal
37+
import Database.LSMTree.Internal.BlobFile
3738
import Database.LSMTree.Internal.BlobRef
3839
import Database.LSMTree.Internal.Config
3940
import Database.LSMTree.Internal.CRC32C
@@ -394,9 +395,9 @@ deriving stock instance Generic (RunReader m h)
394395
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
395396
=> NoThunks (RunReader m h)
396397

397-
deriving stock instance Generic (Reader.Entry m (Handle h))
398+
deriving stock instance Generic (Reader.Entry m h)
398399
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
399-
=> NoThunks (Reader.Entry m (Handle h))
400+
=> NoThunks (Reader.Entry m h)
400401

401402
{-------------------------------------------------------------------------------
402403
RawPage
@@ -416,13 +417,21 @@ deriving anyclass instance NoThunks RawOverflowPage
416417
BlobRef
417418
-------------------------------------------------------------------------------}
418419

419-
deriving stock instance Generic (BlobRef m h)
420-
deriving anyclass instance (NoThunks h, Typeable (PrimState m))
421-
=> NoThunks (BlobRef m h)
422-
423420
deriving stock instance Generic BlobSpan
424421
deriving anyclass instance NoThunks BlobSpan
425422

423+
deriving stock instance Generic (BlobFile m h)
424+
deriving anyclass instance (Typeable h, Typeable (PrimState m))
425+
=> NoThunks (BlobFile m h)
426+
427+
deriving stock instance Generic (RawBlobRef m h)
428+
deriving anyclass instance (Typeable h, Typeable (PrimState m))
429+
=> NoThunks (RawBlobRef m h)
430+
431+
deriving stock instance Generic (WeakBlobRef m h)
432+
deriving anyclass instance (Typeable h, Typeable (PrimState m))
433+
=> NoThunks (WeakBlobRef m h)
434+
426435
{-------------------------------------------------------------------------------
427436
Arena
428437
-------------------------------------------------------------------------------}

src/Database/LSMTree/Common.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ import qualified Database.LSMTree.Internal.MergeSchedule as Internal
6363
import qualified Database.LSMTree.Internal.Paths as Internal
6464
import qualified Database.LSMTree.Internal.Range as Internal
6565
import Database.LSMTree.Internal.Serialise.Class
66-
import System.FS.API (FsPath, Handle, HasFS)
66+
import System.FS.API (FsPath, HasFS)
6767
import System.FS.BlockIO.API (HasBlockIO)
6868
import System.FS.IO (HandleIO)
6969

@@ -268,7 +268,7 @@ type BlobRef :: (Type -> Type) -> Type -> Type
268268
type role BlobRef nominal nominal
269269
data BlobRef m blob where
270270
BlobRef :: Typeable h
271-
=> Internal.WeakBlobRef m (Handle h)
271+
=> Internal.WeakBlobRef m h
272272
-> BlobRef m blob
273273

274274
instance Show (BlobRef m blob) where

src/Database/LSMTree/Internal.hs

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ import Data.Kind
8787
import Data.Map.Strict (Map)
8888
import qualified Data.Map.Strict as Map
8989
import Data.Maybe (catMaybes)
90-
import qualified Data.Primitive.ByteArray as P
9190
import qualified Data.Set as Set
9291
import Data.Typeable
9392
import qualified Data.Vector as V
@@ -104,7 +103,6 @@ import Database.LSMTree.Internal.Paths (SessionRoot (..),
104103
SnapshotName)
105104
import qualified Database.LSMTree.Internal.Paths as Paths
106105
import Database.LSMTree.Internal.Range (Range (..))
107-
import qualified Database.LSMTree.Internal.RawBytes as RB
108106
import Database.LSMTree.Internal.Run (Run)
109107
import qualified Database.LSMTree.Internal.Run as Run
110108
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
@@ -116,8 +114,7 @@ import Database.LSMTree.Internal.UniqCounter
116114
import qualified Database.LSMTree.Internal.WriteBuffer as WB
117115
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
118116
import qualified System.FS.API as FS
119-
import System.FS.API (FsError, FsErrorPath (..), FsPath, Handle,
120-
HasFS)
117+
import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS)
121118
import qualified System.FS.API.Lazy as FS
122119
import qualified System.FS.BlockIO.API as FS
123120
import System.FS.BlockIO.API (HasBlockIO)
@@ -724,14 +721,14 @@ close t = do
724721
ResolveSerialisedValue
725722
-> V.Vector SerialisedKey
726723
-> Table IO h
727-
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h))))) #-}
724+
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-}
728725
-- | See 'Database.LSMTree.Normal.lookups'.
729726
lookups ::
730727
(MonadST m, MonadSTM m, MonadThrow m)
731728
=> ResolveSerialisedValue
732729
-> V.Vector SerialisedKey
733730
-> Table m h
734-
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h)))))
731+
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
735732
lookups resolve ks t = do
736733
traceWith (tableTracer t) $ TraceLookups (V.length ks)
737734
withOpenTable t $ \thEnv ->
@@ -753,15 +750,15 @@ lookups resolve ks t = do
753750
ResolveSerialisedValue
754751
-> Range SerialisedKey
755752
-> Table IO h
756-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
753+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
757754
-> IO (V.Vector res) #-}
758755
-- | See 'Database.LSMTree.Normal.rangeLookup'.
759756
rangeLookup ::
760757
(MonadFix m, MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
761758
=> ResolveSerialisedValue
762759
-> Range SerialisedKey
763760
-> Table m h
764-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
761+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
765762
-- ^ How to map to a query result, different for normal/monoidal
766763
-> m (V.Vector res)
767764
rangeLookup resolve range t fromEntry = do
@@ -828,44 +825,19 @@ updates resolve es t = do
828825

829826
{-# SPECIALISE retrieveBlobs ::
830827
Session IO h
831-
-> V.Vector (WeakBlobRef IO (FS.Handle h))
828+
-> V.Vector (WeakBlobRef IO h)
832829
-> IO (V.Vector SerialisedBlob) #-}
833830
retrieveBlobs ::
834-
(MonadFix m, MonadMask m, MonadST m, MonadSTM m)
831+
(MonadMask m, MonadST m, MonadSTM m)
835832
=> Session m h
836-
-> V.Vector (WeakBlobRef m (FS.Handle h))
833+
-> V.Vector (WeakBlobRef m h)
837834
-> m (V.Vector SerialisedBlob)
838835
retrieveBlobs sesh wrefs =
839836
withOpenSession sesh $ \seshEnv ->
840-
handle (\(BlobRef.WeakBlobRefInvalid i) -> throwIO (ErrBlobRefInvalid i)) $
841-
BlobRef.withWeakBlobRefs wrefs $ \refs -> do
842-
843-
-- Prepare the IOOps:
844-
-- We use a single large memory buffer, with appropriate offsets within
845-
-- the buffer.
846-
let bufSize :: Int
847-
!bufSize = V.sum (V.map BlobRef.blobRefSpanSize refs)
848-
849-
{-# INLINE bufOffs #-}
850-
bufOffs :: V.Vector Int
851-
bufOffs = V.scanl (+) 0 (V.map BlobRef.blobRefSpanSize refs)
852-
buf <- P.newPinnedByteArray bufSize
853-
let ioops = V.zipWith (BlobRef.readBlobIOOp buf) bufOffs refs
854-
hbio = sessionHasBlockIO seshEnv
855-
856-
-- Submit the IOOps all in one go:
857-
_ <- FS.submitIO hbio ioops
858-
-- We do not need to inspect the results because IO errors are
859-
-- thrown as exceptions, and the result is just the read length
860-
-- which is already known. Short reads can't happen here.
861-
862-
-- Construct the SerialisedBlobs results:
863-
-- This is just the different offsets within the shared buffer.
864-
ba <- P.unsafeFreezeByteArray buf
865-
pure $! V.zipWith
866-
(\off len -> SerialisedBlob (RB.fromByteArray off len ba))
867-
bufOffs
868-
(V.map BlobRef.blobRefSpanSize refs)
837+
let hbio = sessionHasBlockIO seshEnv in
838+
handle (\(BlobRef.WeakBlobRefInvalid i) ->
839+
throwIO (ErrBlobRefInvalid i)) $
840+
BlobRef.readWeakBlobRefs hbio wrefs
869841

870842
{-------------------------------------------------------------------------------
871843
Cursors
@@ -1023,7 +995,7 @@ closeCursor Cursor {..} = do
1023995
ResolveSerialisedValue
1024996
-> Int
1025997
-> Cursor IO h
1026-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
998+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
1027999
-> IO (V.Vector res) #-}
10281000
-- | See 'Database.LSMTree.Normal.readCursor'.
10291001
readCursor ::
@@ -1032,7 +1004,7 @@ readCursor ::
10321004
=> ResolveSerialisedValue
10331005
-> Int -- ^ Maximum number of entries to read
10341006
-> Cursor m h
1035-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
1007+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
10361008
-- ^ How to map to a query result, different for normal/monoidal
10371009
-> m (V.Vector res)
10381010
readCursor resolve n cursor fromEntry =
@@ -1043,7 +1015,7 @@ readCursor resolve n cursor fromEntry =
10431015
-> (SerialisedKey -> Bool)
10441016
-> Int
10451017
-> Cursor IO h
1046-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res)
1018+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
10471019
-> IO (V.Vector res) #-}
10481020
-- | @readCursorWhile _ p n cursor _@ reads elements until either:
10491021
--
@@ -1060,7 +1032,7 @@ readCursorWhile ::
10601032
-> (SerialisedKey -> Bool) -- ^ Only read as long as this predicate holds
10611033
-> Int -- ^ Maximum number of entries to read
10621034
-> Cursor m h
1063-
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
1035+
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
10641036
-- ^ How to map to a query result, different for normal/monoidal
10651037
-> m (V.Vector res)
10661038
readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
module Database.LSMTree.Internal.BlobFile (
2+
BlobFile (..)
3+
, BlobSpan (..)
4+
, removeReference
5+
, RemoveFileOnClose (..)
6+
, openBlobFile
7+
, readBlob
8+
, writeBlob
9+
) where
10+
11+
import Control.DeepSeq (NFData (..))
12+
import Control.Monad (unless)
13+
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow)
14+
import Control.Monad.Primitive (PrimMonad)
15+
import Control.RefCount (RefCounter)
16+
import qualified Control.RefCount as RC
17+
import qualified Data.Primitive.ByteArray as P
18+
import qualified Data.Vector.Primitive as VP
19+
import Data.Word (Word32, Word64)
20+
import qualified Database.LSMTree.Internal.RawBytes as RB
21+
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..))
22+
import qualified System.FS.API as FS
23+
import System.FS.API (HasFS)
24+
import qualified System.FS.BlockIO.API as FS
25+
26+
-- | A handle to a file containing blobs.
27+
--
28+
-- This is a reference counted object. Upon finalisation, the file is closed
29+
-- and deleted.
30+
--
31+
data BlobFile m h = BlobFile {
32+
blobFileHandle :: {-# UNPACK #-} !(FS.Handle h),
33+
blobFileRefCounter :: {-# UNPACK #-} !(RefCounter m)
34+
}
35+
deriving stock (Show)
36+
37+
instance NFData h => NFData (BlobFile m h) where
38+
rnf (BlobFile a b) = rnf a `seq` rnf b
39+
40+
-- | The location of a blob inside a blob file.
41+
data BlobSpan = BlobSpan {
42+
blobSpanOffset :: {-# UNPACK #-} !Word64
43+
, blobSpanSize :: {-# UNPACK #-} !Word32
44+
}
45+
deriving stock (Show, Eq)
46+
47+
instance NFData BlobSpan where
48+
rnf (BlobSpan a b) = rnf a `seq` rnf b
49+
50+
{-# INLINE removeReference #-}
51+
removeReference ::
52+
(MonadMask m, PrimMonad m)
53+
=> BlobFile m h
54+
-> m ()
55+
removeReference BlobFile{blobFileRefCounter} =
56+
RC.removeReference blobFileRefCounter
57+
58+
-- | TODO: this hack can be removed once snapshots are done properly and so
59+
-- runs can delete their files on close.
60+
data RemoveFileOnClose = RemoveFileOnClose | DoNotRemoveFileOnClose
61+
deriving stock Eq
62+
63+
-- | Open the given file to make a 'BlobFile'. The finaliser will close and
64+
-- delete the file.
65+
--
66+
-- TODO: Temporarily we have a 'RemoveFileOnClose' flag, which can be removed
67+
-- once 'Run' no longer needs it, when snapshots are implemented.
68+
--
69+
{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> RemoveFileOnClose -> IO (BlobFile IO h) #-}
70+
openBlobFile ::
71+
PrimMonad m
72+
=> HasFS m h
73+
-> FS.FsPath
74+
-> FS.OpenMode
75+
-> RemoveFileOnClose
76+
-> m (BlobFile m h)
77+
openBlobFile fs path mode remove = do
78+
blobFileHandle <- FS.hOpen fs path mode
79+
let finaliser = do
80+
FS.hClose fs blobFileHandle
81+
unless (remove == DoNotRemoveFileOnClose) $
82+
FS.removeFile fs (FS.handlePath blobFileHandle)
83+
blobFileRefCounter <- RC.mkRefCounter1 (Just finaliser)
84+
return BlobFile {
85+
blobFileHandle,
86+
blobFileRefCounter
87+
}
88+
89+
{-# SPECIALISE readBlob :: HasFS IO h -> BlobFile IO h -> BlobSpan -> IO SerialisedBlob #-}
90+
readBlob ::
91+
(MonadThrow m, PrimMonad m)
92+
=> HasFS m h
93+
-> BlobFile m h
94+
-> BlobSpan
95+
-> m SerialisedBlob
96+
readBlob fs BlobFile {blobFileHandle}
97+
BlobSpan {blobSpanOffset, blobSpanSize} = do
98+
let off = FS.AbsOffset blobSpanOffset
99+
len :: Int
100+
len = fromIntegral blobSpanSize
101+
mba <- P.newPinnedByteArray len
102+
_ <- FS.hGetBufExactlyAt fs blobFileHandle mba 0
103+
(fromIntegral len :: FS.ByteCount) off
104+
ba <- P.unsafeFreezeByteArray mba
105+
let !rb = RB.fromByteArray 0 len ba
106+
return (SerialisedBlob rb)
107+
108+
{-# SPECIALISE writeBlob :: HasFS IO h -> BlobFile IO h -> SerialisedBlob -> Word64 -> IO () #-}
109+
writeBlob ::
110+
(MonadThrow m, PrimMonad m)
111+
=> HasFS m h
112+
-> BlobFile m h
113+
-> SerialisedBlob
114+
-> Word64
115+
-> m ()
116+
writeBlob fs BlobFile {blobFileHandle}
117+
(SerialisedBlob' (VP.Vector boff blen ba)) off = do
118+
mba <- P.unsafeThawByteArray ba
119+
_ <- FS.hPutBufExactlyAt
120+
fs blobFileHandle mba
121+
(FS.BufferOffset boff)
122+
(fromIntegral blen :: FS.ByteCount)
123+
(FS.AbsOffset off)
124+
return ()

0 commit comments

Comments
 (0)