Skip to content
4 changes: 1 addition & 3 deletions app/Database/LSMTree/Demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import qualified System.FS.API as FS
import qualified System.FS.BlockIO.API as FS
import qualified System.FS.BlockIO.IO as FS
import qualified System.FS.BlockIO.Sim as FSSim
import qualified System.FS.IO as FS
import qualified System.FS.Sim.MockFS as FSSim
import System.IO.Unsafe (unsafePerformIO)

Expand Down Expand Up @@ -151,8 +150,7 @@ demo = do
print' (fmap getValue os)

do
let hasFS = FS.ioHasFS (FS.MountPoint "")
FS.withIOHasBlockIO hasFS FS.defaultIOCtxParams $ \hasBlockIO -> do
FS.withIOHasBlockIO (FS.MountPoint "") FS.defaultIOCtxParams $ \hasFS hasBlockIO -> do
simpleAction hasFS hasBlockIO
pause -- [16]

Expand Down
9 changes: 4 additions & 5 deletions bench/macro/lsm-tree-bench-lookups.hs
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,10 @@ totalNumEntriesSanityCheck l1 runSizes =
withFS ::
(FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO -> IO a)
-> IO a
withFS action = do
let hfs = FS.ioHasFS (FS.MountPoint "_bench_lookups")
exists <- FS.doesDirectoryExist hfs (FS.mkFsPath [""])
unless exists $ error ("_bench_lookups directory does not exist")
FS.withIOHasBlockIO hfs FS.defaultIOCtxParams $ \hbio ->
withFS action =
FS.withIOHasBlockIO (FS.MountPoint "_bench_lookups") FS.defaultIOCtxParams $ \hfs hbio -> do
exists <- FS.doesDirectoryExist hfs (FS.mkFsPath [""])
unless exists $ error ("_bench_lookups directory does not exist")
action hfs hbio

-- | Input environment for benchmarking lookup functions.
Expand Down
39 changes: 15 additions & 24 deletions bench/macro/utxo-bench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ import qualified Options.Applicative as O
import Prelude hiding (lookup)
import qualified System.Clock as Clock
import qualified System.FS.API as FS
import qualified System.FS.BlockIO.API as FS
import qualified System.FS.BlockIO.IO as FsIO
import qualified System.FS.IO as FsIO
import System.IO
import System.Mem (performMajorGC)
import qualified System.Random as Random
Expand Down Expand Up @@ -438,17 +436,8 @@ doSetup gopts opts = do
void $ timed_ $ doSetup' gopts opts

doSetup' :: GlobalOpts -> SetupOpts -> IO ()
doSetup' gopts opts = do
let mountPoint :: FS.MountPoint
mountPoint = FS.MountPoint (rootDir gopts)

let hasFS :: FS.HasFS IO FsIO.HandleIO
hasFS = FsIO.ioHasFS mountPoint

hasBlockIO <- FsIO.ioHasBlockIO hasFS FS.defaultIOCtxParams

let name = LSM.toSnapshotName ("bench_" ++ show (initialSize gopts))

doSetup' gopts opts =
FsIO.withIOHasBlockIO mountPoint FsIO.defaultIOCtxParams $ \hasFS hasBlockIO ->
LSM.withOpenSession (mkTracer gopts) hasFS hasBlockIO benchSalt (FS.mkFsPath []) $ \session -> do
tbl <- LSM.newTableWith @IO @K @V @B (mkTableConfigSetup gopts opts benchTableConfig) session

Expand All @@ -462,6 +451,12 @@ doSetup' gopts opts = do
]

LSM.saveSnapshot name label tbl
where
mountPoint :: FS.MountPoint
mountPoint = FS.MountPoint (rootDir gopts)

name = LSM.toSnapshotName ("bench_" ++ show (initialSize gopts))


-------------------------------------------------------------------------------
-- dry-run
Expand Down Expand Up @@ -600,17 +595,8 @@ toOperations lookups inserts = (batch1, batch2)
-------------------------------------------------------------------------------

doRun :: GlobalOpts -> RunOpts -> IO ()
doRun gopts opts = do
let mountPoint :: FS.MountPoint
mountPoint = FS.MountPoint (rootDir gopts)

let hasFS :: FS.HasFS IO FsIO.HandleIO
hasFS = FsIO.ioHasFS mountPoint

hasBlockIO <- FsIO.ioHasBlockIO hasFS FS.defaultIOCtxParams

let name = LSM.toSnapshotName "bench"

doRun gopts opts =
FsIO.withIOHasBlockIO mountPoint FsIO.defaultIOCtxParams $ \hasFS hasBlockIO ->
LSM.withOpenSession (mkTracer gopts) hasFS hasBlockIO benchSalt (FS.mkFsPath []) $ \session ->
withLatencyHandle $ \h -> do
-- open snapshot
Expand Down Expand Up @@ -652,6 +638,11 @@ doRun gopts opts = do

let ops = batchCount opts * batchSize opts
printf "Operations per second: %7.01f ops/sec\n" (fromIntegral ops / time)
where
mountPoint :: FS.MountPoint
mountPoint = FS.MountPoint (rootDir gopts)

name = LSM.toSnapshotName ("bench_" ++ show (initialSize gopts))

-------------------------------------------------------------------------------
-- sequential
Expand Down
3 changes: 1 addition & 2 deletions bench/micro/Bench/Database/LSMTree.hs
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,7 @@ mkFiles ::
mkFiles = do
sysTmpDir <- getCanonicalTemporaryDirectory
benchTmpDir <- createTempDirectory sysTmpDir "full"
let hfs = FS.ioHasFS (FS.MountPoint benchTmpDir)
hbio <- FS.ioHasBlockIO hfs FS.defaultIOCtxParams
(hfs, hbio) <- FS.ioHasBlockIO (FS.MountPoint benchTmpDir) FS.defaultIOCtxParams
pure (benchTmpDir, hfs, hbio)

cleanupFiles ::
Expand Down
3 changes: 1 addition & 2 deletions bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ lookupsInBatchesEnv Config {..} = do
sysTmpDir <- getCanonicalTemporaryDirectory
benchTmpDir <- createTempDirectory sysTmpDir "lookupsInBatchesEnv"
(storedKeys, lookupKeys) <- lookupsEnv (mkStdGen 17) nentries npos nneg
let hasFS = FS.ioHasFS (FS.MountPoint benchTmpDir)
hasBlockIO <- FS.ioHasBlockIO hasFS (fromMaybe FS.defaultIOCtxParams ioctxps)
(hasFS, hasBlockIO) <- FS.ioHasBlockIO (FS.MountPoint benchTmpDir) (fromMaybe FS.defaultIOCtxParams ioctxps)
wbblobs <- WBB.new hasFS (FS.mkFsPath ["0.wbblobs"])
wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob hasFS wbblobs)) storedKeys
let fsps = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
Expand Down
3 changes: 1 addition & 2 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ mergeEnv ::
mergeEnv config = do
sysTmpDir <- getCanonicalTemporaryDirectory
benchTmpDir <- createTempDirectory sysTmpDir "mergeEnv"
let hasFS = FS.ioHasFS (FS.MountPoint benchTmpDir)
hasBlockIO <- FS.ioHasBlockIO hasFS FS.defaultIOCtxParams
(hasFS, hasBlockIO) <- FS.ioHasBlockIO (FS.MountPoint benchTmpDir) FS.defaultIOCtxParams
runs <- randomRuns hasFS hasBlockIO config (mkStdGen 17)
pure (benchTmpDir, hasFS, hasBlockIO, runs)

Expand Down
51 changes: 46 additions & 5 deletions blockio/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,49 @@
# blockio

This packages defines an abstract interface for batched, asynchronous I\/O,
for use with the abstract interface for file system I\/O defined by the
[fs-api](https://hackage.haskell.org/package/fs-api) package.
Perform batches of disk I/O operations. Performing batches of disk I/O can lead
to performance improvements over performing each disk I/O operation
individually. Performing batches of disk I/O *concurrently* can lead to an even
bigger performance improvement depending on the implementation of batched I/O.

The /sim/ sub-library of this package defines /simulated/ batched, asynchronous I\/O
for use with the [fs-sim](https://hackage.haskell.org/package/fs-sim) package.
The batched I/O functionality in the library is separated into an *abstract
interface* and *implementations* of that abstract interface. The advantage of
programming against an abstract interface is that code can be agnostic to the
implementation of the interface, allowing implementations to be freely swapped
out. The library provides multiple implementations of batched I/O:
platform-dependent implementations using the *real* file system (using
asynchronous I/O), and a simulated implementation for testing purposes.

See the `System.FS.BlockIO` module for an example of how to use the library.

On Linux systems the *real* implementation is backed by
[blockio-uring](https://hackage.haskell.org/package/blockio-uring), a library
for asynchronous I/O that achieves good performance when performing batches
concurrently. On Windows and MacOS systems the *real* implementation currently
simply performs each I/O operation sequentially, which should achieve about the
same performance as using non-batched I/O, but the library could be extended
with asynchronous I/O implementations for Windows and MacOS as well. The
simulated implementation also performs each I/O operation sequentially.

As mentioned before, the batched I/O functionality is separated into an
*abstract interface* and *implementations* of that abstract interface. The
advantage of programming against an abstract interface is that code can be
agnostic to the implementation of the interface. For example, we could run code
in production using the real file system, but we could also run the same code in
a testing environment using a simulated file system. We could even switch from a
default implementation to a more performant implementation in production if the
performant implementation is available. Lastly, the abstract interface allows us
to program against the file system in a uniform manner across different
platforms, i.e., operating systems.

The `blockio` library defines the abstract interface for batched I/O. The
library is an extension of the
[fs-api](https://hackage.haskell.org/package/fs-api) library, which defines an
abstract interface for (basic) file system I/O. Both `blockio` and `fs-api`
provide an implementation of their interfaces using the real file system in
`IO`.

The `blockio:sim` sub-library defines an implementation of the abstract
interface from `blockio` that *simulates* batched I/O. This sub-library is an
extension of the [fs-sim](https://hackage.haskell.org/package/fs-sim) library,
which defines an implementation of the abstract interface from `fs-api` that
simulates (basic) file system I/O.
42 changes: 27 additions & 15 deletions blockio/blockio.cabal
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
cabal-version: 3.4
name: blockio
version: 0.1.0.0
synopsis: Abstract interface for batched, asynchronous I/O
synopsis: Perform batches of disk I/O operations.
description:
This packages defines an abstract interface for batched, asynchronous I\/O,
for use with the abstract interface for file system I\/O defined by the
[fs-api](https://hackage.haskell.org/package/fs-api) package.

The /sim/ sub-library of this package defines /simulated/ batched, asynchronous I\/O
for use with the [fs-sim](https://hackage.haskell.org/package/fs-sim) package.
Perform batches of disk I\/O operations. Performing batches of disk I\/O can
lead to performance improvements over performing each disk I\/O operation
individually. Performing batches of disk I\/O /concurrently/ can lead to an
even bigger performance improvement depending on the implementation of batched
I\/O.

The batched I\/O functionality in the library is separated into an /abstract/
/interface/ and /implementations/ of that abstract interface. The advantage of
programming against an abstract interface is that code can be agnostic to the
implementation of the interface, allowing implementations to be freely swapped
out. The library provides multiple implementations of batched I\/O:
platform-dependent implementations using the /real/ file system (with
asynchronous I\/O), and a simulated implementation for testing purposes.

See the "System.FS.BlockIO" module for an example of how to use the library.

license: Apache-2.0
license-files:
Expand All @@ -18,7 +27,7 @@ license-files:
author:
Duncan Coutts, Joris Dral, Matthias Heinzel, Wolfgang Jeltsch, Wen Kokke, and Alex Washburn

maintainer: TODO: MAINTAINER EMAIL
maintainer: [email protected]
copyright:
(c) 2023 Input Output Global, Inc. (IOG)
(c) 2023-2025 INTERSECT
Expand All @@ -33,13 +42,11 @@ source-repository head
location: https://github.com/IntersectMBO/lsm-tree
subdir: blockio

-- TODO: this tag obviously does not exist yet because the package has not
-- been published
source-repository this
type: git
location: https://github.com/IntersectMBO/lsm-tree
tag: blockio-0.1.0.0
subdir: blockio
tag: blockio-0.1.0.0

common warnings
ghc-options:
Expand Down Expand Up @@ -67,8 +74,13 @@ library
import: language, warnings
hs-source-dirs: src
exposed-modules:
System.FS.BlockIO
System.FS.BlockIO.API
System.FS.BlockIO.IO
System.FS.BlockIO.Serial.Internal

other-modules:
System.FS.BlockIO.IO.Internal
System.FS.BlockIO.Serial

build-depends:
Expand Down Expand Up @@ -113,7 +125,7 @@ test-suite test
, bytestring
, fs-api
, primitive
, QuickCheck ^>=2.15.0.1
, QuickCheck >=2.15.0.1
, tasty
, tasty-hunit
, tasty-quickcheck
Expand All @@ -128,12 +140,12 @@ library sim
hs-source-dirs: src-sim
exposed-modules: System.FS.BlockIO.Sim
build-depends:
, base >=4.16 && <4.22
, base >=4.16 && <4.22
, blockio
, bytestring ^>=0.11.4.0 || ^>=0.12.1.0
, bytestring ^>=0.11 || ^>=0.12
, fs-api ^>=0.4
, fs-sim ^>=0.4
, io-classes ^>=1.6 || ^>=1.7 || ^>=1.8.0.1
, io-classes ^>=1.6 || ^>=1.7 || ^>=1.8.0.1
, io-classes:strict-stm
, primitive ^>=0.9

Expand Down
9 changes: 5 additions & 4 deletions blockio/src-linux/System/FS/BlockIO/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import System.FS.API (BufferOffset (..), FsErrorPath, FsPath,
import qualified System.FS.BlockIO.API as API
import System.FS.BlockIO.API (IOOp (..), IOResult (..), LockMode,
ioopHandle)
import qualified System.FS.BlockIO.IO.Internal as IOI
import System.FS.IO (HandleIO)
import System.FS.IO.Handle
import qualified System.IO.BlockIO as I
Expand All @@ -32,7 +33,7 @@ asyncHasBlockIO ::
-> (FsPath -> IO ())
-> (FsPath -> FsPath -> IO ())
-> HasFS IO HandleIO
-> API.IOCtxParams
-> IOI.IOCtxParams
-> IO (API.HasBlockIO IO HandleIO)
asyncHasBlockIO hSetNoCache hAdvise hAllocate tryLockFile hSynchronise synchroniseDirectory createHardLink hasFS ctxParams = do
ctx <- I.initIOCtx (ctxParamsConv ctxParams)
Expand All @@ -48,8 +49,8 @@ asyncHasBlockIO hSetNoCache hAdvise hAllocate tryLockFile hSynchronise synchroni
, API.createHardLink
}

ctxParamsConv :: API.IOCtxParams -> I.IOCtxParams
ctxParamsConv API.IOCtxParams{API.ioctxBatchSizeLimit, API.ioctxConcurrencyLimit} =
ctxParamsConv :: IOI.IOCtxParams -> I.IOCtxParams
ctxParamsConv IOI.IOCtxParams{IOI.ioctxBatchSizeLimit, IOI.ioctxConcurrencyLimit} =
I.IOCtxParams {
I.ioctxBatchSizeLimit = ioctxBatchSizeLimit
, I.ioctxConcurrencyLimit = ioctxConcurrencyLimit
Expand All @@ -72,7 +73,7 @@ submitIO hasFS ioctx ioops = do
-- the exception might change between versions of @blockio-uring@.
-- Nonetheless, it's better than nothing.
if isResourceVanishedError e && ioe_location e == "IOCtx closed"
then throwIO (API.mkClosedError (SomeHasFS hasFS) "submitIO")
then throwIO (IOI.mkClosedError (SomeHasFS hasFS) "submitIO")
else throwIO e

rethrowErrno ::
Expand Down
15 changes: 7 additions & 8 deletions blockio/src-linux/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ module System.FS.BlockIO.Internal (

import qualified System.FS.API as FS
import System.FS.API (FsPath, Handle (..), HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (Advice (..), FileOffset, HasBlockIO,
IOCtxParams)
import System.FS.BlockIO.API (Advice (..), FileOffset, HasBlockIO)
import qualified System.FS.BlockIO.IO.Internal as IOI
import System.FS.IO (HandleIO)
import qualified System.FS.IO.Handle as FS
import qualified System.Posix.Fcntl as Fcntl
Expand All @@ -23,29 +22,29 @@ import qualified System.FS.BlockIO.Async as Async

ioHasBlockIO ::
HasFS IO HandleIO
-> IOCtxParams
-> IOI.IOCtxParams
-> IO (HasBlockIO IO HandleIO)
#if SERIALBLOCKIO
ioHasBlockIO hfs _params =
Serial.serialHasBlockIO
hSetNoCache
hAdvise
hAllocate
(FS.tryLockFileIO hfs)
(IOI.tryLockFileIO hfs)
hSynchronise
(synchroniseDirectory hfs)
(FS.createHardLinkIO hfs Unix.createLink)
(IOI.createHardLinkIO hfs Unix.createLink)
hfs
#else
ioHasBlockIO hfs params =
Async.asyncHasBlockIO
hSetNoCache
hAdvise
hAllocate
(FS.tryLockFileIO hfs)
(IOI.tryLockFileIO hfs)
hSynchronise
(synchroniseDirectory hfs)
(FS.createHardLinkIO hfs Unix.createLink)
(IOI.createHardLinkIO hfs Unix.createLink)
hfs
params
#endif
Expand Down
Loading