@@ -7,7 +7,8 @@ module Database.LSMTree.Internal.WriteBufferReader (
77import Control.Concurrent.Class.MonadMVar.Strict
88import Control.Monad.Class.MonadST (MonadST (.. ))
99import Control.Monad.Class.MonadSTM (MonadSTM (.. ))
10- import Control.Monad.Class.MonadThrow (MonadMask , MonadThrow (.. ))
10+ import Control.Monad.Class.MonadThrow (MonadMask , MonadThrow (.. ),
11+ bracketOnError )
1112import Control.Monad.Primitive (PrimMonad (.. ))
1213import Control.RefCount (Ref , dupRef , releaseRef )
1314import Data.Primitive.MutVar (MutVar , newMutVar , readMutVar ,
@@ -43,8 +44,8 @@ import System.FS.BlockIO.API (HasBlockIO)
4344 #-}
4445-- | Read a serialised `WriteBuffer` back into memory.
4546--
46- -- NOTE: The ` BlobFile` argument / must be/ the blob file associated with the
47- -- write buffer; @` readWriteBuffer`@ does not check this.
47+ -- The argument blob file (' BlobFile') must be the file associated with the
48+ -- argument key\/ops file ('ForKOps'). ' readWriteBuffer' does not check this.
4849readWriteBuffer ::
4950 (MonadMVar m , MonadMask m , MonadSTM m , MonadST m )
5051 => ResolveSerialisedValue
@@ -54,7 +55,7 @@ readWriteBuffer ::
5455 -> Ref (BlobFile m h )
5556 -> m WriteBuffer
5657readWriteBuffer resolve hfs hbio kOpsPath blobFile =
57- bracket (new hfs hbio kOpsPath blobFile) close $ readEntries
58+ bracket (new hfs hbio kOpsPath blobFile) close $ readEntries
5859 where
5960 readEntries reader = readEntriesAcc WB. empty
6061 where
@@ -88,30 +89,42 @@ data WriteBufferReader m h = WriteBufferReader {
8889 -> IO (WriteBufferReader IO h)
8990 #-}
9091-- | See 'Database.LSMTree.Internal.RunReader.new'.
92+ --
93+ -- REF: the resulting 'WriteBufferReader' must be closed once it is no longer
94+ -- used.
95+ --
96+ -- ASYNC: this should be called with asynchronous exceptions masked because it
97+ -- allocates/creates resources.
9198new :: forall m h .
9299 (MonadMVar m , MonadST m , MonadMask m )
93100 => HasFS m h
94101 -> HasBlockIO m h
95102 -> ForKOps FS. FsPath
96103 -> Ref (BlobFile m h )
97104 -> m (WriteBufferReader m h )
98- new readerHasFS readerHasBlockIO kOpsPath blobFile = do
99- readerKOpsHandle <- FS. hOpen readerHasFS (unForKOps kOpsPath) FS. ReadMode
100- -- Double the file readahead window (only applies to this file descriptor)
101- FS. hAdviseAll readerHasBlockIO readerKOpsHandle FS. AdviceSequential
102- readerBlobFile <- dupRef blobFile
103- -- Load first page from disk, if it exists.
104- readerCurrentEntryNo <- newPrimVar (0 :: Word16 )
105- firstPage <- readDiskPage readerHasFS readerKOpsHandle
106- readerCurrentPage <- newMutVar firstPage
107- pure $ WriteBufferReader {.. }
105+ new readerHasFS readerHasBlockIO kOpsPath blobFile =
106+ bracketOnError openKOps (FS. hClose readerHasFS) $ \ readerKOpsHandle -> do
107+ -- Double the file readahead window (only applies to this file descriptor)
108+ FS. hAdviseAll readerHasBlockIO readerKOpsHandle FS. AdviceSequential
109+ bracketOnError (dupRef blobFile) releaseRef $ \ readerBlobFile -> do
110+ -- Load first page from disk, if it exists.
111+ readerCurrentEntryNo <- newPrimVar (0 :: Word16 )
112+ firstPage <- readDiskPage readerHasFS readerKOpsHandle
113+ readerCurrentPage <- newMutVar firstPage
114+ pure $ WriteBufferReader {.. }
115+ where
116+ openKOps = FS. hOpen readerHasFS (unForKOps kOpsPath) FS. ReadMode
108117
109118{-# SPECIALISE
110119 next ::
111120 WriteBufferReader IO h
112121 -> IO (Result IO h)
113122 #-}
114123-- | See 'Database.LSMTree.Internal.RunReader.next'.
124+ --
125+ -- TODO: 'next' is currently only used in 'readWriteBuffer', where it is a safe
126+ -- use of an unsafe function. If this function is ever exported and used
127+ -- directly, the TODOs in the body of this function should be addressed first.
115128next :: forall m h .
116129 (MonadSTM m , MonadST m , MonadMask m )
117130 => WriteBufferReader m h
@@ -124,13 +137,21 @@ next WriteBufferReader {..} = do
124137 entryNo <- readPrimVar readerCurrentEntryNo
125138 go entryNo page
126139 where
140+ -- TODO: if 'readerCurrentEntryNo' is incremented but an exception is thrown
141+ -- before the 'Result' is used by the caller of 'next', then we'll lose that
142+ -- 'Result'. The following call to 'next' will not return the 'Result' we
143+ -- missed.
127144 go :: Word16 -> RawPage -> m (Result m h )
128145 go ! entryNo ! page =
129146 -- take entry from current page (resolve blob if necessary)
130147 case rawPageIndex page entryNo of
131148 IndexNotPresent -> do
132149 -- if it is past the last one, load a new page from disk, try again
133150 newPage <- readDiskPage readerHasFS readerKOpsHandle
151+ -- TODO: if the next disk page is read but an (async) exception is
152+ -- thrown just before updating the MutVar below, then we lose the
153+ -- disk page because 'readDiskPage' has already updated its file
154+ -- pointer.
134155 stToIO $ writeMutVar readerCurrentPage newPage
135156 case newPage of
136157 Nothing -> do
@@ -154,10 +175,14 @@ next WriteBufferReader {..} = do
154175 return (ReadEntry key rawEntry)
155176
156177{-# SPECIALISE close :: WriteBufferReader IO h -> IO () #-}
178+ -- | Close the 'WriteBufferReader'.
179+ --
180+ -- ASYNC: this should be called with asynchronous exceptions masked because it
181+ -- releases/removes resources.
157182close ::
158183 (MonadMask m , PrimMonad m )
159184 => WriteBufferReader m h
160185 -> m ()
161186close WriteBufferReader {.. } = do
162187 FS. hClose readerHasFS readerKOpsHandle
163- releaseRef readerBlobFile
188+ `finally` releaseRef readerBlobFile
0 commit comments