Skip to content

Commit 54c2348

Browse files
committed
Move impl of retrieveBlobs to new BlobRef.readWeakBlobRefs
Instead of open-coding the block IO stuff in the top level Internal module in retrieveBlobs, it is moved next to BlobRef.readWeakBlobRef, so we have the singular and bulk versions next to each other.
1 parent c918bec commit 54c2348

File tree

2 files changed

+53
-47
lines changed

2 files changed

+53
-47
lines changed

src/Database/LSMTree/Internal.hs

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ import Data.Kind
8787
import Data.Map.Strict (Map)
8888
import qualified Data.Map.Strict as Map
8989
import Data.Maybe (catMaybes)
90-
import qualified Data.Primitive.ByteArray as P
9190
import qualified Data.Set as Set
9291
import Data.Typeable
9392
import qualified Data.Vector as V
@@ -104,7 +103,6 @@ import Database.LSMTree.Internal.Paths (SessionRoot (..),
104103
SnapshotName)
105104
import qualified Database.LSMTree.Internal.Paths as Paths
106105
import Database.LSMTree.Internal.Range (Range (..))
107-
import qualified Database.LSMTree.Internal.RawBytes as RB
108106
import Database.LSMTree.Internal.Run (Run)
109107
import qualified Database.LSMTree.Internal.Run as Run
110108
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
@@ -836,35 +834,10 @@ retrieveBlobs ::
836834
-> m (V.Vector SerialisedBlob)
837835
retrieveBlobs sesh wrefs =
838836
withOpenSession sesh $ \seshEnv ->
839-
handle (\(BlobRef.WeakBlobRefInvalid i) -> throwIO (ErrBlobRefInvalid i)) $
840-
BlobRef.withWeakBlobRefs wrefs $ \refs -> do
841-
842-
-- Prepare the IOOps:
843-
-- We use a single large memory buffer, with appropriate offsets within
844-
-- the buffer.
845-
let bufSize :: Int
846-
!bufSize = V.sum (V.map BlobRef.blobRefSpanSize refs)
847-
848-
{-# INLINE bufOffs #-}
849-
bufOffs :: V.Vector Int
850-
bufOffs = V.scanl (+) 0 (V.map BlobRef.blobRefSpanSize refs)
851-
buf <- P.newPinnedByteArray bufSize
852-
let ioops = V.zipWith (BlobRef.readBlobIOOp buf) bufOffs refs
853-
hbio = sessionHasBlockIO seshEnv
854-
855-
-- Submit the IOOps all in one go:
856-
_ <- FS.submitIO hbio ioops
857-
-- We do not need to inspect the results because IO errors are
858-
-- thrown as exceptions, and the result is just the read length
859-
-- which is already known. Short reads can't happen here.
860-
861-
-- Construct the SerialisedBlobs results:
862-
-- This is just the different offsets within the shared buffer.
863-
ba <- P.unsafeFreezeByteArray buf
864-
pure $! V.zipWith
865-
(\off len -> SerialisedBlob (RB.fromByteArray off len ba))
866-
bufOffs
867-
(V.map BlobRef.blobRefSpanSize refs)
837+
let hbio = sessionHasBlockIO seshEnv in
838+
handle (\(BlobRef.WeakBlobRefInvalid i) ->
839+
throwIO (ErrBlobRefInvalid i)) $
840+
BlobRef.readWeakBlobRefs hbio wrefs
868841

869842
{-------------------------------------------------------------------------------
870843
Cursors

src/Database/LSMTree/Internal/BlobRef.hs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ module Database.LSMTree.Internal.BlobRef (
1818
, removeReferences
1919
, readRawBlobRef
2020
, readWeakBlobRef
21-
, readBlobIOOp
21+
, readWeakBlobRefs
2222
) where
2323

2424
import Control.DeepSeq (NFData (..))
@@ -27,15 +27,18 @@ import Control.Monad.Class.MonadThrow (Exception, MonadMask,
2727
MonadThrow (..), bracket, throwIO)
2828
import Control.Monad.Primitive
2929
import qualified Control.RefCount as RC
30-
import qualified Data.Primitive.ByteArray as P (MutableByteArray)
30+
import qualified Data.Primitive.ByteArray as P (newPinnedByteArray,
31+
unsafeFreezeByteArray)
3132
import qualified Data.Vector as V
3233
import qualified Data.Vector.Mutable as VM
3334
import Database.LSMTree.Internal.BlobFile (BlobFile (..), BlobSpan (..))
3435
import qualified Database.LSMTree.Internal.BlobFile as BlobFile
36+
import qualified Database.LSMTree.Internal.RawBytes as RB
3537
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..))
3638
import qualified System.FS.API as FS
3739
import System.FS.API (HasFS)
3840
import qualified System.FS.BlockIO.API as FS
41+
import System.FS.BlockIO.API (HasBlockIO)
3942

4043

4144
-- | A raw blob reference is a reference to a blob within a blob file.
@@ -199,17 +202,47 @@ readWeakBlobRef fs wref =
199202
\StrongBlobRef {strongBlobRefFile, strongBlobRefSpan} ->
200203
BlobFile.readBlobFile fs strongBlobRefFile strongBlobRefSpan
201204

202-
readBlobIOOp ::
203-
P.MutableByteArray s -> Int
204-
-> StrongBlobRef m h
205-
-> FS.IOOp s h
206-
readBlobIOOp buf bufoff
207-
StrongBlobRef {
208-
strongBlobRefFile = BlobFile {blobFileHandle},
209-
strongBlobRefSpan = BlobSpan {blobSpanOffset, blobSpanSize}
210-
} =
211-
FS.IOOpRead
212-
blobFileHandle
213-
(fromIntegral blobSpanOffset :: FS.FileOffset)
214-
buf (FS.BufferOffset bufoff)
215-
(fromIntegral blobSpanSize :: FS.ByteCount)
205+
{-# SPECIALISE readWeakBlobRefs :: HasBlockIO IO h -> V.Vector (WeakBlobRef IO h) -> IO (V.Vector SerialisedBlob) #-}
206+
readWeakBlobRefs ::
207+
(MonadMask m, PrimMonad m)
208+
=> HasBlockIO m h
209+
-> V.Vector (WeakBlobRef m h)
210+
-> m (V.Vector SerialisedBlob)
211+
readWeakBlobRefs hbio wrefs =
212+
bracket (deRefWeakBlobRefs wrefs) (V.mapM_ removeReference) $ \refs -> do
213+
-- Prepare the IOOps:
214+
-- We use a single large memory buffer, with appropriate offsets within
215+
-- the buffer.
216+
let bufSize :: Int
217+
!bufSize = V.sum (V.map blobRefSpanSize refs)
218+
219+
{-# INLINE bufOffs #-}
220+
bufOffs :: V.Vector Int
221+
bufOffs = V.scanl (+) 0 (V.map blobRefSpanSize refs)
222+
buf <- P.newPinnedByteArray bufSize
223+
224+
-- Submit the IOOps all in one go:
225+
_ <- FS.submitIO hbio $
226+
V.zipWith
227+
(\bufoff
228+
StrongBlobRef {
229+
strongBlobRefFile = BlobFile {blobFileHandle},
230+
strongBlobRefSpan = BlobSpan {blobSpanOffset, blobSpanSize}
231+
} ->
232+
FS.IOOpRead
233+
blobFileHandle
234+
(fromIntegral blobSpanOffset :: FS.FileOffset)
235+
buf (FS.BufferOffset bufoff)
236+
(fromIntegral blobSpanSize :: FS.ByteCount))
237+
bufOffs refs
238+
-- We do not need to inspect the results because IO errors are
239+
-- thrown as exceptions, and the result is just the read length
240+
-- which is already known. Short reads can't happen here.
241+
242+
-- Construct the SerialisedBlobs results:
243+
-- This is just the different offsets within the shared buffer.
244+
ba <- P.unsafeFreezeByteArray buf
245+
pure $! V.zipWith
246+
(\off len -> SerialisedBlob (RB.fromByteArray off len ba))
247+
bufOffs
248+
(V.map blobRefSpanSize refs)

0 commit comments

Comments
 (0)