Skip to content

Commit dc62461

Browse files
authored
fix: allocate aligned buffers for VortexReadAt impls (#1456)
Before this change, we did not enforce alignment on memory we allocate with `Bytes`, so for example, you could get this: <img width="1506" alt="image" src="https://github.com/user-attachments/assets/403d04d3-c0d6-4cd1-93fd-37f2468e6a2a"> The solution this PR chooses is to round our allocation up to the alignment size, and then apply padding manually. Then you get happy output like <img width="1335" alt="image" src="https://github.com/user-attachments/assets/7621549c-28e7-46ae-8a83-1f8098810704">
1 parent 3bb2a7b commit dc62461

File tree

8 files changed

+250
-21
lines changed

8 files changed

+250
-21
lines changed

vortex-file/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ mod dtype_reader;
140140

141141
pub use dtype_reader::*;
142142

143-
pub const ALIGNMENT: usize = 64;
144-
145143
mod read;
146144
mod write;
147145

vortex-io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ categories.workspace = true
1616
[dependencies]
1717
bytes = { workspace = true }
1818
compio = { workspace = true, features = ["bytes", "macros"], optional = true }
19-
tokio = { workspace = true, features = ["fs"], optional = true }
19+
tokio = { workspace = true, features = ["fs", "io-util", "rt"], optional = true }
2020
tracing = { workspace = true, optional = true }
2121
futures = { workspace = true, features = ["std"] }
2222
futures-util = { workspace = true }

vortex-io/src/aligned.rs

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
#![allow(dead_code)]
2+
use std::ops::{Deref, DerefMut};
3+
4+
use bytes::Bytes;
5+
6+
pub trait PowerOfTwo<const N: usize> {}
7+
impl<const N: usize> PowerOfTwo<N> for usize where usize: sealed::Sealed<N> {}
8+
9+
mod sealed {
10+
pub trait Sealed<const N: usize> {}
11+
12+
impl Sealed<1> for usize {}
13+
impl Sealed<2> for usize {}
14+
impl Sealed<4> for usize {}
15+
impl Sealed<8> for usize {}
16+
impl Sealed<16> for usize {}
17+
impl Sealed<32> for usize {}
18+
impl Sealed<64> for usize {}
19+
impl Sealed<128> for usize {}
20+
impl Sealed<256> for usize {}
21+
impl Sealed<512> for usize {}
22+
}
23+
24+
/// A variant of [`BytesMut`][bytes::BytesMut] that freezes into a [`Bytes`] that is guaranteed
25+
/// to begin at a multiple of a target byte-alignment.
26+
///
27+
/// Internally, it accomplishes this by over-allocating by up to the alignment size, padding the
28+
/// front as necessary. Reads and writes will only be able to access the region after the padding.
29+
///
30+
/// It is required for the alignment to be a valid power of 2 <= 512, any other value will be
31+
/// a compile-time failure.
32+
pub(crate) struct AlignedBytesMut<const ALIGN: usize> {
33+
buf: Vec<u8>,
34+
padding: usize,
35+
capacity: usize,
36+
}
37+
38+
impl<const ALIGN: usize> AlignedBytesMut<ALIGN>
39+
where
40+
usize: PowerOfTwo<ALIGN>,
41+
{
42+
/// Allocate a new mutable buffer with capacity to hold at least `capacity` bytes.
43+
///
44+
/// The mutable buffer may allocate more than the requested amount to pad the memory for
45+
/// alignment.
46+
pub fn with_capacity(capacity: usize) -> Self {
47+
// Allocate up to `ALIGN` extra bytes, in case we need to pad the returned pointer.
48+
let allocation_size = (capacity + ALIGN - 1).next_multiple_of(ALIGN);
49+
let mut buf = Vec::<u8>::with_capacity(allocation_size);
50+
let padding = buf.as_ptr().align_offset(ALIGN);
51+
unsafe {
52+
buf.set_len(padding);
53+
}
54+
55+
Self {
56+
buf,
57+
padding,
58+
capacity,
59+
}
60+
}
61+
62+
/// Usable capacity of this buffer.
63+
pub fn capacity(&self) -> usize {
64+
self.capacity
65+
}
66+
67+
/// Set the length of the mutable buffer directly.
68+
///
69+
/// # Safety
70+
///
71+
/// The caller is responsible for ensuring that the provided length fits within the original
72+
/// capacity request.
73+
///
74+
/// Failure to do so could cause uninitialized memory to be readable.
75+
pub unsafe fn set_len(&mut self, len: usize) {
76+
assert!(
77+
len <= self.capacity,
78+
"set_len call out of bounds: {} > {}",
79+
len,
80+
self.capacity
81+
);
82+
unsafe { self.buf.set_len(len + self.padding) }
83+
}
84+
85+
/// Extend this mutable buffer with the contents of the provided slice.
86+
pub fn extend_from_slice(&mut self, slice: &[u8]) {
87+
// The internal `buf` is padded, so appends will land after the padded region.
88+
self.buf.extend_from_slice(slice)
89+
}
90+
91+
/// Freeze the existing allocation into a readonly [`Bytes`], guaranteed to be aligned to
92+
/// the target [`ALIGN`] size.
93+
pub fn freeze(self) -> Bytes {
94+
// bytes_unaligned will contain the entire allocation, so that on Drop the entire buf
95+
// is freed.
96+
//
97+
// bytes_aligned is a sliced view on top of bytes_unaligned.
98+
//
99+
// bytes_aligned
100+
// | parent \ *ptr
101+
// v |
102+
// bytes_unaligned |
103+
// | |
104+
// | *ptr |
105+
// v v
106+
// +------------+------------------+----------------+
107+
// | padding | content | spare capacity |
108+
// +------------+------------------+----------------+
109+
let bytes_unaligned = Bytes::from(self.buf);
110+
let bytes_aligned = bytes_unaligned.slice(self.padding..);
111+
112+
assert_eq!(
113+
bytes_aligned.as_ptr().align_offset(ALIGN),
114+
0,
115+
"bytes_aligned must be aligned to {}",
116+
ALIGN
117+
);
118+
119+
bytes_aligned
120+
}
121+
}
122+
123+
impl<const ALIGN: usize> Deref for AlignedBytesMut<ALIGN>
124+
where
125+
usize: PowerOfTwo<ALIGN>,
126+
{
127+
type Target = [u8];
128+
129+
fn deref(&self) -> &Self::Target {
130+
&self.buf[self.padding..]
131+
}
132+
}
133+
134+
impl<const ALIGN: usize> DerefMut for AlignedBytesMut<ALIGN>
135+
where
136+
usize: PowerOfTwo<ALIGN>,
137+
{
138+
fn deref_mut(&mut self) -> &mut Self::Target {
139+
&mut self.buf[self.padding..]
140+
}
141+
}
142+
143+
#[cfg(test)]
144+
mod tests {
145+
use crate::aligned::AlignedBytesMut;
146+
147+
#[test]
148+
fn test_align() {
149+
let mut buf = AlignedBytesMut::<128>::with_capacity(1);
150+
buf.extend_from_slice(b"a");
151+
152+
let data = buf.freeze();
153+
154+
assert_eq!(data.as_ref(), b"a");
155+
assert_eq!(data.as_ptr().align_offset(128), 0);
156+
}
157+
158+
#[test]
159+
fn test_extend() {
160+
let mut buf = AlignedBytesMut::<128>::with_capacity(256);
161+
buf.extend_from_slice(b"a");
162+
buf.extend_from_slice(b"bcdefgh");
163+
164+
let data = buf.freeze();
165+
assert_eq!(data.as_ref(), b"abcdefgh");
166+
}
167+
}

vortex-io/src/compio.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,56 @@
11
use std::future::Future;
22
use std::io;
33

4-
use bytes::{Bytes, BytesMut};
4+
use bytes::Bytes;
5+
use compio::buf::{IoBuf, IoBufMut, SetBufInit};
56
use compio::fs::File;
67
use compio::io::AsyncReadAtExt;
78
use compio::BufResult;
89
use vortex_error::vortex_panic;
910

10-
use super::VortexReadAt;
11+
use crate::aligned::{AlignedBytesMut, PowerOfTwo};
12+
use crate::{VortexReadAt, ALIGNMENT};
13+
14+
unsafe impl<const ALIGN: usize> IoBuf for AlignedBytesMut<ALIGN>
15+
where
16+
usize: PowerOfTwo<ALIGN>,
17+
{
18+
fn as_buf_ptr(&self) -> *const u8 {
19+
self.as_ptr()
20+
}
21+
22+
fn buf_len(&self) -> usize {
23+
self.len()
24+
}
25+
26+
fn buf_capacity(&self) -> usize {
27+
self.capacity()
28+
}
29+
}
30+
31+
impl<const ALIGN: usize> SetBufInit for AlignedBytesMut<ALIGN>
32+
where
33+
usize: PowerOfTwo<ALIGN>,
34+
{
35+
unsafe fn set_buf_init(&mut self, len: usize) {
36+
// The contract of this trait specifies that providing a `len` <= the current len should
37+
// do nothing. AlignedBytesMut by default will set the len directly without checking this.
38+
if self.len() < len {
39+
unsafe {
40+
self.set_len(len);
41+
}
42+
}
43+
}
44+
}
45+
46+
unsafe impl<const ALIGN: usize> IoBufMut for AlignedBytesMut<ALIGN>
47+
where
48+
usize: PowerOfTwo<ALIGN>,
49+
{
50+
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
51+
self.as_mut_ptr()
52+
}
53+
}
1154

1255
impl VortexReadAt for File {
1356
fn read_byte_range(
@@ -16,10 +59,7 @@ impl VortexReadAt for File {
1659
len: u64,
1760
) -> impl Future<Output = io::Result<Bytes>> + 'static {
1861
let this = self.clone();
19-
let mut buffer = BytesMut::with_capacity(len as usize);
20-
unsafe {
21-
buffer.set_len(len as usize);
22-
}
62+
let buffer = AlignedBytesMut::<ALIGNMENT>::with_capacity(len as usize);
2363
async move {
2464
// Turn the buffer into a static slice.
2565
let BufResult(res, buffer) = this.read_exact_at(buffer, pos).await;

vortex-io/src/dispatcher/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,18 @@ enum Inner {
6767
}
6868

6969
impl Dispatch for IoDispatcher {
70+
#[allow(unused_variables)]
7071
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<oneshot::Receiver<R>>
7172
where
7273
F: (FnOnce() -> Fut) + Send + 'static,
7374
Fut: Future<Output = R> + 'static,
7475
R: Send + 'static,
7576
{
76-
match &self.0 {
77+
match self.0 {
7778
#[cfg(feature = "tokio")]
78-
Inner::Tokio(tokio_dispatch) => tokio_dispatch.dispatch(task),
79+
Inner::Tokio(ref tokio_dispatch) => tokio_dispatch.dispatch(task),
7980
#[cfg(feature = "compio")]
80-
Inner::Compio(compio_dispatch) => compio_dispatch.dispatch(task),
81+
Inner::Compio(ref compio_dispatch) => compio_dispatch.dispatch(task),
8182
}
8283
}
8384

vortex-io/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub use read::*;
1616
pub use tokio::*;
1717
pub use write::*;
1818

19+
mod aligned;
1920
mod buf;
2021
#[cfg(feature = "compio")]
2122
mod compio;
@@ -27,3 +28,6 @@ mod read;
2728
#[cfg(feature = "tokio")]
2829
mod tokio;
2930
mod write;
31+
32+
/// Required alignment for all custom buffer allocations.
33+
pub const ALIGNMENT: usize = 64;

vortex-io/src/object_store.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ use std::sync::Arc;
44
use std::{io, mem};
55

66
use bytes::Bytes;
7+
use futures_util::StreamExt;
78
use object_store::path::Path;
8-
use object_store::{ObjectStore, WriteMultipart};
9+
use object_store::{GetOptions, GetRange, ObjectStore, WriteMultipart};
910
use vortex_buffer::io_buf::IoBuf;
1011
use vortex_buffer::Buffer;
1112
use vortex_error::{vortex_panic, VortexError, VortexResult};
1213

13-
use crate::{VortexBufReader, VortexReadAt, VortexWrite};
14+
use crate::aligned::AlignedBytesMut;
15+
use crate::{VortexBufReader, VortexReadAt, VortexWrite, ALIGNMENT};
1416

1517
pub trait ObjectStoreExt {
1618
fn vortex_read(
@@ -76,10 +78,27 @@ impl VortexReadAt for ObjectStoreReadAt {
7678

7779
Box::pin(async move {
7880
let start_range = pos as usize;
79-
let bytes = object_store
80-
.get_range(&location, start_range..(start_range + len as usize))
81+
82+
let mut buf = AlignedBytesMut::<ALIGNMENT>::with_capacity(len as _);
83+
84+
let get_range = start_range..(start_range + len as usize);
85+
let response = object_store
86+
.get_opts(
87+
&location,
88+
GetOptions {
89+
range: Some(GetRange::Bounded(get_range)),
90+
..Default::default()
91+
},
92+
)
8193
.await?;
82-
Ok(bytes)
94+
95+
let mut byte_stream = response.into_stream();
96+
while let Some(bytes) = byte_stream.next().await {
97+
let bytes = bytes?;
98+
buf.extend_from_slice(&bytes);
99+
}
100+
101+
Ok(buf.freeze())
83102
})
84103
}
85104

vortex-io/src/tokio.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use std::os::unix::fs::FileExt;
66
use std::path::Path;
77
use std::sync::Arc;
88

9-
use bytes::{Bytes, BytesMut};
9+
use bytes::Bytes;
1010
use tokio::io::{AsyncWrite, AsyncWriteExt};
1111
use vortex_buffer::io_buf::IoBuf;
1212
use vortex_error::vortex_panic;
1313

14-
use super::VortexReadAt;
15-
use crate::VortexWrite;
14+
use crate::aligned::AlignedBytesMut;
15+
use crate::{VortexReadAt, VortexWrite, ALIGNMENT};
1616

1717
pub struct TokioAdapter<IO>(pub IO);
1818

@@ -70,7 +70,7 @@ impl VortexReadAt for TokioFile {
7070
) -> impl Future<Output = io::Result<Bytes>> + 'static {
7171
let this = self.clone();
7272

73-
let mut buffer = BytesMut::with_capacity(len as usize);
73+
let mut buffer = AlignedBytesMut::<ALIGNMENT>::with_capacity(len as usize);
7474
unsafe {
7575
buffer.set_len(len as usize);
7676
}

0 commit comments

Comments
 (0)