Skip to content

Commit a2e5fba

Browse files
committed
wip: io
Signed-off-by: Onur Satici <[email protected]>
1 parent b3e8c30 commit a2e5fba

File tree

3 files changed

+145
-0
lines changed

3 files changed

+145
-0
lines changed

vortex-io/src/file/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub mod object_store;
88
mod read;
99
#[cfg(not(target_arch = "wasm32"))]
1010
pub(crate) mod std_file;
11+
#[cfg(all(target_os = "linux", feature = "uring"))]
12+
pub(crate) mod uring_file;
1113

1214
pub(crate) use driver::*;
1315
pub use read::*;

vortex-io/src/file/std_file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use crate::file::IoRequest;
2828
use crate::file::ReadSource;
2929
use crate::file::ReadSourceRef;
3030
use crate::runtime::Handle;
31+
#[cfg(all(target_os = "linux", feature = "uring"))]
32+
use crate::file::uring_file::open_uring_read_source;
3133

3234
/// Read exactly `buffer.len()` bytes from `file` starting at `offset`.
3335
/// This is a platform-specific helper that uses the most efficient method available.
@@ -61,6 +63,11 @@ impl IntoReadSource for PathBuf {
6163

6264
impl IntoReadSource for &Path {
6365
fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
66+
#[cfg(all(target_os = "linux", feature = "uring"))]
67+
if let Some(src) = open_uring_read_source(self, handle.clone())? {
68+
return Ok(src);
69+
}
70+
6471
let uri = self.to_string_lossy().to_string().into();
6572
let file = Arc::new(File::open(self)?);
6673
Ok(Arc::new(FileIoSource { uri, file, handle }))

vortex-io/src/file/uring_file.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Async file source backed by io_uring (monoio).
5+
//!
6+
//! Only built on Linux with the `uring` feature enabled.
7+
8+
#![cfg(all(target_os = "linux", feature = "uring"))]
9+
10+
use std::path::Path;
11+
use std::sync::Arc;
12+
13+
use futures::FutureExt;
14+
use futures::StreamExt;
15+
use futures::future::BoxFuture;
16+
use futures::stream::BoxStream;
17+
use monoio::fs::File;
18+
use oneshot::channel;
19+
use vortex_error::VortexError;
20+
use vortex_error::VortexResult;
21+
22+
use crate::file::CoalesceWindow;
23+
use crate::file::IoRequest;
24+
use crate::file::ReadSource;
25+
use crate::file::ReadSourceRef;
26+
use crate::runtime::Handle;
27+
28+
const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
29+
distance: 8 * 1024, // KB
30+
max_size: 8 * 1024, // KB
31+
};
32+
const CONCURRENCY: usize = 64;
33+
34+
/// Attempt to open a uring-backed read source. Returns `Ok(None)` if the handle does not support
35+
/// local execution.
36+
pub(crate) fn open_uring_read_source(
37+
path: &Path,
38+
handle: Handle,
39+
) -> VortexResult<Option<ReadSourceRef>> {
40+
if handle.as_local_executor().is_none() {
41+
return Ok(None);
42+
}
43+
44+
let std_file = std::fs::File::open(path)?;
45+
let uri = path.to_string_lossy().to_string().into();
46+
47+
Ok(Some(Arc::new(UringFileIoSource {
48+
uri,
49+
std_file: Arc::new(std_file),
50+
handle,
51+
})))
52+
}
53+
54+
pub(crate) struct UringFileIoSource {
55+
uri: Arc<str>,
56+
std_file: Arc<std::fs::File>,
57+
handle: Handle,
58+
}
59+
60+
// Safety: we only drive I/O on the runtime thread via LocalExecutor.
61+
unsafe impl Send for UringFileIoSource {}
62+
unsafe impl Sync for UringFileIoSource {}
63+
64+
impl ReadSource for UringFileIoSource {
65+
fn uri(&self) -> &Arc<str> {
66+
&self.uri
67+
}
68+
69+
fn coalesce_window(&self) -> Option<CoalesceWindow> {
70+
Some(COALESCING_WINDOW)
71+
}
72+
73+
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
74+
let std_file = self.std_file.clone();
75+
futures::future::ready(std_file.metadata().map(|m| m.len()).map_err(VortexError::from))
76+
.boxed()
77+
}
78+
79+
fn drive_send(
80+
self: Arc<Self>,
81+
requests: BoxStream<'static, IoRequest>,
82+
) -> BoxFuture<'static, ()> {
83+
let Some(local) = self.handle.as_local_executor() else {
84+
return async move {
85+
log::warn!("UringFileIoSource used without LocalExecutor; dropping requests");
86+
drop(requests);
87+
}
88+
.boxed();
89+
};
90+
91+
requests
92+
.ready_chunks(1)
93+
.map(move |reqs| {
94+
let std_file = self.std_file.clone();
95+
let (tx, rx) = channel();
96+
local.spawn_local(Box::new(move || {
97+
Box::pin(async move {
98+
// Open a monoio file per chunk to avoid sharing non-Send handles across threads.
99+
let monoio_file = match std_file.try_clone().and_then(File::from_std) {
100+
Ok(f) => Arc::new(f),
101+
Err(e) => {
102+
let kind = e.kind();
103+
let msg = e.to_string();
104+
for req in reqs {
105+
let io_err = std::io::Error::new(kind, msg.clone());
106+
req.resolve(Err(VortexError::from(io_err)));
107+
}
108+
drop(tx.send(()));
109+
return;
110+
}
111+
};
112+
113+
for req in reqs {
114+
let len = req.len();
115+
let offset = req.offset();
116+
let buffer = vec![0u8; len];
117+
118+
let (res, mut buffer) = monoio_file.read_at(buffer, offset).await;
119+
match res {
120+
Ok(n) => {
121+
buffer.truncate(n);
122+
req.resolve(Ok(buffer.into()))
123+
}
124+
Err(e) => req.resolve(Err(VortexError::from(e))),
125+
}
126+
}
127+
drop(tx.send(()));
128+
})
129+
}));
130+
rx.map(|_| ())
131+
})
132+
.buffer_unordered(CONCURRENCY)
133+
.collect::<()>()
134+
.boxed()
135+
}
136+
}

0 commit comments

Comments
 (0)