Skip to content

Commit f6980bf

Browse files
authored
feat: eliminate VortexRead, replace with struct VortexBufReader (#1349)
This PR unifies all reads behind the `VortexReadAt` trait. All readers must implement random read operations via this trait. The `VortexRead` buffered reading trait is eliminate, replaced with a `VortexBufReader` that wraps a random reader.
1 parent 4272eb4 commit f6980bf

File tree

16 files changed

+155
-130
lines changed

16 files changed

+155
-130
lines changed

bench-vortex/benches/bytes_at.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Benchmark for the `bytes_at` operation on a VarBinView.
22
//! This measures the performance of accessing an individual byte-slice in a VarBinViewArray.
33
4-
use std::io::Cursor;
54
use std::sync::Arc;
65

76
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
@@ -10,6 +9,7 @@ use futures::StreamExt;
109
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
1110
use vortex::buffer::Buffer;
1211
use vortex::dtype::{DType, Nullability};
12+
use vortex::io::VortexBufReader;
1313
use vortex::ipc::stream_reader::StreamArrayReader;
1414
use vortex::ipc::stream_writer::StreamArrayWriter;
1515
use vortex::validity::Validity;
@@ -35,7 +35,11 @@ fn array_view_fixture() -> VarBinViewArray {
3535
let buffer = Buffer::from(buffer);
3636

3737
let ctx = Arc::new(Context::default());
38-
let reader = block_on(StreamArrayReader::try_new(Cursor::new(buffer), ctx.clone())).unwrap();
38+
let reader = block_on(StreamArrayReader::try_new(
39+
VortexBufReader::new(buffer),
40+
ctx.clone(),
41+
))
42+
.unwrap();
3943
let reader = block_on(reader.load_dtype()).unwrap();
4044

4145
let mut stream = Box::pin(reader.into_array_stream());

vortex-file/src/chunked_reader/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
use std::io::Cursor;
21
use std::sync::Arc;
32

43
use vortex_array::compute::unary::scalar_at;
54
use vortex_array::stream::ArrayStream;
65
use vortex_array::{ArrayData, Context};
76
use vortex_dtype::DType;
87
use vortex_error::{vortex_bail, VortexExpect as _, VortexResult};
9-
use vortex_io::VortexReadAt;
8+
use vortex_io::{VortexBufReader, VortexReadAt};
109
use vortex_ipc::stream_reader::StreamArrayReader;
1110

1211
mod take_rows;
@@ -54,13 +53,14 @@ impl<R: VortexReadAt> ChunkedArrayReader<R> {
5453
// Making a new ArrayStream requires us to clone the reader to make
5554
// multiple streams that can each use the reader.
5655
pub async fn array_stream(&mut self) -> impl ArrayStream + '_ {
57-
let mut cursor = Cursor::new(self.read.clone());
5856
let byte_offset = scalar_at(&self.byte_offsets, 0)
5957
.and_then(|s| u64::try_from(&s))
6058
.vortex_expect("Failed to convert byte_offset to u64");
6159

62-
cursor.set_position(byte_offset);
63-
StreamArrayReader::try_new(cursor, self.context.clone())
60+
let mut buf_reader = VortexBufReader::new(self.read.clone());
61+
buf_reader.set_position(byte_offset);
62+
63+
StreamArrayReader::try_new(buf_reader, self.context.clone())
6464
.await
6565
.vortex_expect("Failed to create stream array reader")
6666
.with_dtype(self.dtype.clone())

vortex-file/src/chunked_reader/take_rows.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::io::Cursor;
21
use std::ops::Range;
32

43
use futures_util::{stream, StreamExt, TryStreamExt};
@@ -12,7 +11,7 @@ use vortex_array::stream::{ArrayStream, ArrayStreamExt};
1211
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
1312
use vortex_dtype::PType;
1413
use vortex_error::{vortex_bail, vortex_err, VortexResult};
15-
use vortex_io::VortexReadAt;
14+
use vortex_io::{VortexBufReader, VortexReadAt};
1615
use vortex_ipc::stream_reader::StreamArrayReader;
1716
use vortex_scalar::Scalar;
1817

@@ -145,7 +144,9 @@ impl<R: VortexReadAt> ChunkedArrayReader<R> {
145144
.read_byte_range(byte_range.start, range_byte_len)
146145
.await?;
147146

148-
let reader = StreamArrayReader::try_new(Cursor::new(buffer), self.context.clone())
147+
let buf_reader = VortexBufReader::new(buffer);
148+
149+
let reader = StreamArrayReader::try_new(buf_reader, self.context.clone())
149150
.await?
150151
.with_dtype(self.dtype.clone());
151152

@@ -211,7 +212,6 @@ struct ChunkIndices {
211212
#[cfg(test)]
212213
#[allow(clippy::panic_in_result_fn)]
213214
mod test {
214-
use std::io::Cursor;
215215
use std::sync::Arc;
216216

217217
use futures_executor::block_on;
@@ -221,6 +221,7 @@ mod test {
221221
use vortex_buffer::Buffer;
222222
use vortex_dtype::PType;
223223
use vortex_error::VortexResult;
224+
use vortex_io::VortexBufReader;
224225
use vortex_ipc::messages::reader::MessageReader;
225226
use vortex_ipc::stream_writer::StreamArrayWriter;
226227

@@ -248,7 +249,7 @@ mod test {
248249
let buffer = Buffer::from(writer.into_inner());
249250

250251
let mut msgs =
251-
block_on(async { MessageReader::try_new(Cursor::new(buffer.clone())).await })?;
252+
block_on(async { MessageReader::try_new(VortexBufReader::new(buffer.clone())).await })?;
252253
let dtype = Arc::new(block_on(async { msgs.read_dtype().await })?);
253254

254255
let mut reader = ChunkedArrayReader::try_new(

vortex-file/src/dtype_reader.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use vortex_dtype::DType;
22
use vortex_error::VortexResult;
3-
use vortex_io::VortexRead;
3+
use vortex_io::{VortexBufReader, VortexReadAt};
44
use vortex_ipc::messages::reader::MessageReader;
55

66
/// Reader for serialized dtype messages
7-
pub struct DTypeReader<R: VortexRead> {
7+
pub struct DTypeReader<R: VortexReadAt> {
88
msgs: MessageReader<R>,
99
}
1010

11-
impl<R: VortexRead> DTypeReader<R> {
11+
impl<R: VortexReadAt> DTypeReader<R> {
1212
/// Create new [DTypeReader] given readable contents
13-
pub async fn new(read: R) -> VortexResult<Self> {
13+
pub async fn new(read: VortexBufReader<R>) -> VortexResult<Self> {
1414
Ok(Self {
1515
msgs: MessageReader::try_new(read).await?,
1616
})
@@ -22,7 +22,7 @@ impl<R: VortexRead> DTypeReader<R> {
2222
}
2323

2424
/// Deconstruct this reader into its underlying contents for further reuse
25-
pub fn into_inner(self) -> R {
25+
pub fn into_inner(self) -> VortexBufReader<R> {
2626
self.msgs.into_inner()
2727
}
2828
}

vortex-file/src/lib.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,9 @@ pub use write::*;
204204
#[cfg(test)]
205205
#[allow(clippy::panic_in_result_fn)]
206206
mod test {
207-
use std::io::Cursor;
208207
use std::sync::Arc;
209208

209+
use bytes::Bytes;
210210
use futures_executor::block_on;
211211
use futures_util::{pin_mut, StreamExt, TryStreamExt};
212212
use itertools::Itertools;
@@ -216,7 +216,7 @@ mod test {
216216
use vortex_array::{ArrayDType, Context, IntoArrayData};
217217
use vortex_buffer::Buffer;
218218
use vortex_error::VortexResult;
219-
use vortex_io::TokioAdapter;
219+
use vortex_io::VortexBufReader;
220220
use vortex_ipc::stream_reader::StreamArrayReader;
221221
use vortex_ipc::stream_writer::StreamArrayWriter;
222222

@@ -239,12 +239,13 @@ mod test {
239239
let indices = PrimitiveArray::from(vec![1, 2, 10]).into_array();
240240

241241
let ctx = Arc::new(Context::default());
242-
let stream_reader = StreamArrayReader::try_new(TokioAdapter(buffer.as_slice()), ctx)
243-
.await
244-
.unwrap()
245-
.load_dtype()
246-
.await
247-
.unwrap();
242+
let stream_reader =
243+
StreamArrayReader::try_new(VortexBufReader::new(Bytes::from(buffer)), ctx)
244+
.await
245+
.unwrap()
246+
.load_dtype()
247+
.await
248+
.unwrap();
248249
let reader = stream_reader.into_array_stream();
249250

250251
let result_iter = reader.take_rows(indices)?;
@@ -271,7 +272,7 @@ mod test {
271272
let buffer = Buffer::from(buffer);
272273

273274
let ctx = Arc::new(Context::default());
274-
let stream_reader = StreamArrayReader::try_new(TokioAdapter(Cursor::new(buffer)), ctx)
275+
let stream_reader = StreamArrayReader::try_new(VortexBufReader::new(buffer), ctx)
275276
.await
276277
.unwrap()
277278
.load_dtype()

vortex-io/src/buf.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use std::io;
2+
3+
use bytes::Bytes;
4+
5+
use crate::VortexReadAt;
6+
7+
/// A stateful asynchronous reader that wraps an internal [stateless reader][VortexReadAt].
8+
///
9+
/// Read operations will advance the cursor.
10+
#[derive(Clone)]
11+
pub struct VortexBufReader<R> {
12+
inner: R,
13+
pos: u64,
14+
}
15+
16+
impl<R> VortexBufReader<R> {
17+
/// Create a new buffered reader wrapping a stateless reader, with reads
18+
/// beginning at offset 0.
19+
pub fn new(inner: R) -> Self {
20+
Self { inner, pos: 0 }
21+
}
22+
23+
/// Set the position of the next `read_bytes` call directly.
24+
///
25+
/// Note: this method will not fail if the position is past the end of the valid range,
26+
/// the failure will occur at read time and result in an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error.
27+
pub fn set_position(&mut self, pos: u64) {
28+
self.pos = pos;
29+
}
30+
}
31+
32+
impl<R: VortexReadAt> VortexBufReader<R> {
33+
/// Perform an exactly-sized read at the current cursor position, advancing
34+
/// the cursor and returning the bytes.
35+
///
36+
/// If there are not enough bytes available to fulfill the request, an
37+
/// [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error is returned.
38+
///
39+
/// See also [`VortexReadAt::read_byte_range`].
40+
pub async fn read_bytes(&mut self, len: u64) -> io::Result<Bytes> {
41+
let result = self.inner.read_byte_range(self.pos, len).await?;
42+
self.pos += len;
43+
Ok(result)
44+
}
45+
}
46+
47+
#[cfg(test)]
48+
mod tests {
49+
use std::io;
50+
51+
use bytes::Bytes;
52+
53+
use crate::VortexBufReader;
54+
55+
#[tokio::test]
56+
async fn test_buf_reader() {
57+
let reader = Bytes::from("0123456789".as_bytes());
58+
let mut buf_reader = VortexBufReader::new(reader);
59+
60+
let first2 = buf_reader.read_bytes(2).await.unwrap();
61+
assert_eq!(first2.as_ref(), "01".as_bytes());
62+
63+
buf_reader.set_position(8);
64+
let last2 = buf_reader.read_bytes(2).await.unwrap();
65+
assert_eq!(last2.as_ref(), "89".as_bytes());
66+
}
67+
68+
#[tokio::test]
69+
async fn test_eof() {
70+
let reader = Bytes::from("0123456789".as_bytes());
71+
let mut buf_reader = VortexBufReader::new(reader);
72+
73+
// Read past end of internal reader
74+
buf_reader.set_position(10);
75+
76+
assert_eq!(
77+
buf_reader.read_bytes(1).await.unwrap_err().kind(),
78+
io::ErrorKind::UnexpectedEof,
79+
);
80+
}
81+
}

vortex-io/src/futures.rs

Lines changed: 0 additions & 19 deletions
This file was deleted.

vortex-io/src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@
77
//! This crate provides core traits for positioned and streaming IO, and via feature
88
//! flags implements the core traits for several common async runtimes and backing stores.
99
10-
#[cfg(feature = "futures")]
11-
pub use futures::*;
10+
pub use buf::*;
1211
#[cfg(feature = "object_store")]
1312
pub use object_store::*;
1413
pub use read::*;
1514
#[cfg(feature = "tokio")]
1615
pub use tokio::*;
1716
pub use write::*;
1817

18+
mod buf;
1919
#[cfg(feature = "compio")]
2020
mod compio;
21-
#[cfg(feature = "futures")]
22-
mod futures;
2321
#[cfg(feature = "object_store")]
2422
mod object_store;
2523
pub mod offset;

vortex-io/src/object_store.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::future::Future;
2-
use std::io::Cursor;
32
use std::ops::Range;
43
use std::sync::Arc;
54
use std::{io, mem};
@@ -11,14 +10,14 @@ use vortex_buffer::io_buf::IoBuf;
1110
use vortex_buffer::Buffer;
1211
use vortex_error::{vortex_panic, VortexError, VortexResult};
1312

14-
use crate::{VortexRead, VortexReadAt, VortexWrite};
13+
use crate::{VortexBufReader, VortexReadAt, VortexWrite};
1514

1615
pub trait ObjectStoreExt {
1716
fn vortex_read(
1817
&self,
1918
location: &Path,
2019
range: Range<usize>,
21-
) -> impl Future<Output = VortexResult<impl VortexRead>>;
20+
) -> impl Future<Output = VortexResult<VortexBufReader<impl VortexReadAt>>>;
2221

2322
fn vortex_reader(&self, location: &Path) -> impl VortexReadAt;
2423

@@ -33,9 +32,9 @@ impl ObjectStoreExt for Arc<dyn ObjectStore> {
3332
&self,
3433
location: &Path,
3534
range: Range<usize>,
36-
) -> VortexResult<impl VortexRead> {
35+
) -> VortexResult<VortexBufReader<impl VortexReadAt>> {
3736
let bytes = self.get_range(location, range).await?;
38-
Ok(Cursor::new(Buffer::from(bytes)))
37+
Ok(VortexBufReader::new(Buffer::from(bytes)))
3938
}
4039

4140
fn vortex_reader(&self, location: &Path) -> impl VortexReadAt {

0 commit comments

Comments
 (0)