Skip to content

Commit 1238b41

Browse files
committed
keep the file around
Signed-off-by: Onur Satici <[email protected]>
1 parent dba0411 commit 1238b41

File tree

2 files changed

+54
-34
lines changed

2 files changed

+54
-34
lines changed

vortex-buffer/src/buffer_mut.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,13 @@ impl Write for ByteBufferMut {
746746
}
747747
}
748748

749+
impl ByteBufferMut {
750+
/// Consume the buffer and return the underlying `BytesMut`, preserving alignment.
751+
pub fn into_bytes_mut(self) -> BytesMut {
752+
self.bytes
753+
}
754+
}
755+
749756
#[cfg(test)]
750757
mod test {
751758
use bytes::Buf;

vortex-io/src/file/uring_file.rs

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ use futures::FutureExt;
1414
use futures::StreamExt;
1515
use futures::future::BoxFuture;
1616
use futures::stream::BoxStream;
17+
use futures::channel::oneshot;
1718
use monoio::fs::File;
18-
use oneshot::channel;
19+
use vortex_buffer::ByteBuffer;
20+
use vortex_buffer::ByteBufferMut;
1921
use vortex_error::VortexError;
2022
use vortex_error::VortexResult;
2123

@@ -93,49 +95,60 @@ impl ReadSource for UringFileIoSource {
9395
.boxed();
9496
};
9597

96-
requests
97-
.ready_chunks(1)
98-
.map(move |reqs| {
99-
let std_file = self.std_file.clone();
100-
let (tx, rx) = channel();
101-
local.spawn_local(Box::new(move || {
102-
Box::pin(async move {
103-
// Open a monoio file per chunk to avoid sharing non-Send handles across threads.
104-
let monoio_file = match std_file.try_clone().and_then(File::from_std) {
105-
Ok(f) => Arc::new(f),
106-
Err(e) => {
107-
let kind = e.kind();
108-
let msg = e.to_string();
109-
for req in reqs {
110-
let io_err = std::io::Error::new(kind, msg.clone());
111-
req.resolve(Err(VortexError::from(io_err)));
112-
}
113-
drop(tx.send(()));
114-
return;
115-
}
116-
};
117-
118-
for req in reqs {
98+
// Move the work onto the runtime thread; the returned future only waits on completion and is Send.
99+
let (done_tx, done_rx) = oneshot::channel();
100+
let std_file = self.std_file.clone();
101+
local.spawn_local(Box::new(move || {
102+
Box::pin(async move {
103+
let monoio_file = match std_file.try_clone().and_then(File::from_std) {
104+
Ok(f) => Arc::new(f),
105+
Err(e) => {
106+
let kind = e.kind();
107+
let msg = e.to_string();
108+
requests
109+
.for_each(|req| {
110+
let io_err = std::io::Error::new(kind, msg.clone());
111+
req.resolve(Err(VortexError::from(io_err)));
112+
futures::future::ready(())
113+
})
114+
.await;
115+
let _ = done_tx.send(());
116+
return;
117+
}
118+
};
119+
120+
requests
121+
.map(|req| {
122+
let monoio_file = monoio_file.clone();
123+
async move {
119124
let len = req.len();
120125
let offset = req.offset();
121-
let buffer = vec![0u8; len];
126+
let alignment = req.alignment();
122127

123-
let (res, mut buffer) = monoio_file.read_at(buffer, offset).await;
128+
// Pre-allocate an aligned buffer so we don't have to copy on resolve.
129+
let buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
130+
let mut bytes_mut = buffer.into_bytes_mut();
131+
bytes_mut.resize(len, 0);
132+
133+
let (res, mut bytes_mut) = monoio_file.read_at(bytes_mut, offset).await;
124134
match res {
125135
Ok(n) => {
126-
buffer.truncate(n);
127-
req.resolve(Ok(buffer.into()))
136+
bytes_mut.truncate(n);
137+
let bytes = bytes_mut.freeze();
138+
req.resolve(Ok(ByteBuffer::from(bytes)));
128139
}
129140
Err(e) => req.resolve(Err(VortexError::from(e))),
130141
}
131142
}
132-
drop(tx.send(()));
133143
})
134-
}));
135-
rx.map(|_| ())
144+
.buffer_unordered(CONCURRENCY)
145+
.collect::<()>()
146+
.await;
147+
148+
let _ = done_tx.send(());
136149
})
137-
.buffer_unordered(CONCURRENCY)
138-
.collect::<()>()
139-
.boxed()
150+
}));
151+
152+
done_rx.map(|res| res.unwrap_or(())).boxed()
140153
}
141154
}

0 commit comments

Comments
 (0)