Skip to content

Commit 6dafd16

Browse files
committed
drive_local
Signed-off-by: Onur Satici <[email protected]>
1 parent 1238b41 commit 6dafd16

File tree

2 files changed

+73
-47
lines changed

2 files changed

+73
-47
lines changed

vortex-io/src/file/uring_file.rs

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::sync::Arc;
1313
use futures::FutureExt;
1414
use futures::StreamExt;
1515
use futures::future::BoxFuture;
16+
use futures::future::LocalBoxFuture;
1617
use futures::stream::BoxStream;
1718
use futures::channel::oneshot;
1819
use monoio::fs::File;
@@ -100,55 +101,73 @@ impl ReadSource for UringFileIoSource {
100101
let std_file = self.std_file.clone();
101102
local.spawn_local(Box::new(move || {
102103
Box::pin(async move {
103-
let monoio_file = match std_file.try_clone().and_then(File::from_std) {
104-
Ok(f) => Arc::new(f),
105-
Err(e) => {
106-
let kind = e.kind();
107-
let msg = e.to_string();
108-
requests
109-
.for_each(|req| {
110-
let io_err = std::io::Error::new(kind, msg.clone());
111-
req.resolve(Err(VortexError::from(io_err)));
112-
futures::future::ready(())
113-
})
114-
.await;
115-
let _ = done_tx.send(());
116-
return;
117-
}
118-
};
119-
120-
requests
121-
.map(|req| {
122-
let monoio_file = monoio_file.clone();
123-
async move {
124-
let len = req.len();
125-
let offset = req.offset();
126-
let alignment = req.alignment();
127-
128-
// Pre-allocate an aligned buffer so we don't have to copy on resolve.
129-
let buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
130-
let mut bytes_mut = buffer.into_bytes_mut();
131-
bytes_mut.resize(len, 0);
132-
133-
let (res, mut bytes_mut) = monoio_file.read_at(bytes_mut, offset).await;
134-
match res {
135-
Ok(n) => {
136-
bytes_mut.truncate(n);
137-
let bytes = bytes_mut.freeze();
138-
req.resolve(Ok(ByteBuffer::from(bytes)));
139-
}
140-
Err(e) => req.resolve(Err(VortexError::from(e))),
141-
}
142-
}
143-
})
144-
.buffer_unordered(CONCURRENCY)
145-
.collect::<()>()
146-
.await;
147-
104+
self.drive_local_impl(std_file, requests).await;
148105
let _ = done_tx.send(());
149106
})
150107
}));
151108

152109
done_rx.map(|res| res.unwrap_or(())).boxed()
153110
}
111+
112+
fn drive_local(
113+
self: Arc<Self>,
114+
requests: BoxStream<'static, IoRequest>,
115+
) -> LocalBoxFuture<'static, ()> {
116+
self.clone()
117+
.drive_local_impl(self.std_file.clone(), requests)
118+
}
119+
}
120+
121+
impl UringFileIoSource {
122+
fn drive_local_impl(
123+
self: Arc<Self>,
124+
std_file: Arc<std::fs::File>,
125+
requests: BoxStream<'static, IoRequest>,
126+
) -> LocalBoxFuture<'static, ()> {
127+
Box::pin(async move {
128+
let monoio_file = match std_file.try_clone().and_then(File::from_std) {
129+
Ok(f) => Arc::new(f),
130+
Err(e) => {
131+
let kind = e.kind();
132+
let msg = e.to_string();
133+
requests
134+
.for_each(|req| {
135+
let io_err = std::io::Error::new(kind, msg.clone());
136+
req.resolve(Err(VortexError::from(io_err)));
137+
futures::future::ready(())
138+
})
139+
.await;
140+
return;
141+
}
142+
};
143+
144+
requests
145+
.map(|req| {
146+
let monoio_file = monoio_file.clone();
147+
async move {
148+
let len = req.len();
149+
let offset = req.offset();
150+
let alignment = req.alignment();
151+
152+
// Pre-allocate an aligned buffer so we don't have to copy on resolve.
153+
let buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
154+
let mut bytes_mut = buffer.into_bytes_mut();
155+
bytes_mut.resize(len, 0);
156+
157+
let (res, mut bytes_mut) = monoio_file.read_at(bytes_mut, offset).await;
158+
match res {
159+
Ok(n) => {
160+
bytes_mut.truncate(n);
161+
let bytes = bytes_mut.freeze();
162+
req.resolve(Ok(ByteBuffer::from(bytes)));
163+
}
164+
Err(e) => req.resolve(Err(VortexError::from(e))),
165+
}
166+
}
167+
})
168+
.buffer_unordered(CONCURRENCY)
169+
.collect::<()>()
170+
.await;
171+
})
172+
}
154173
}

vortex-io/src/runtime/uring.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,10 @@ impl Executor for UringRuntime {
170170
}
171171

172172
fn spawn_io(&self, task: IoTask) {
173-
let _ = self.sender.send(Command::SpawnIo(task));
173+
// Drive I/O on the runtime thread using the local future to allow !Send implementations.
174+
let _ = self.sender.send(Command::SpawnLocal(Box::new(move || {
175+
Box::pin(async move { task.source.drive_local(task.stream).await })
176+
})));
174177
}
175178

176179
fn as_local_executor(&self) -> Option<Arc<dyn LocalExecutor>> {
@@ -187,9 +190,13 @@ impl LocalExecutor for UringRuntime {
187190

188191
fn run_runtime(receiver: Receiver<Command>) {
189192
// Use the IoUring driver explicitly to avoid ambiguity with feature combinations.
193+
let pool_size = thread::available_parallelism()
194+
.map(|n| n.get())
195+
.unwrap_or(8);
196+
190197
let mut rt = RuntimeBuilder::<IoUringDriver>::new()
191198
.enable_timer()
192-
.attach_thread_pool(Box::new(DefaultThreadPool::new(8)))
199+
.attach_thread_pool(Box::new(DefaultThreadPool::new(pool_size)))
193200
.build()
194201
.expect("failed to build uring runtime");
195202

0 commit comments

Comments
 (0)