Skip to content

Commit 28b2114

Browse files
Make the VortexReadAt::size method return an io::Result (#1471)
1 parent dc62461 commit 28b2114

File tree

7 files changed

+24
-42
lines changed

7 files changed

+24
-42
lines changed

pyvortex/src/dataset.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(
6161
}
6262

6363
pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> VortexResult<DType> {
64-
let initial_read = read_initial_bytes(&reader, reader.size().await).await?;
64+
let initial_read = read_initial_bytes(&reader, reader.size().await?).await?;
6565
initial_read.lazy_dtype()?.value().cloned()
6666
}
6767

vortex-file/src/read/builder/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
116116

117117
pub async fn build(self) -> VortexResult<VortexFileArrayStream<R>> {
118118
// we do a large enough initial read to get footer, layout, and schema
119-
let initial_read = read_initial_bytes(&self.read_at, self.size().await).await?;
119+
let initial_read = read_initial_bytes(&self.read_at, self.size().await?).await?;
120120

121121
let layout = initial_read.fb_layout()?;
122122

@@ -181,10 +181,10 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
181181
))
182182
}
183183

184-
async fn size(&self) -> u64 {
185-
match self.size {
184+
async fn size(&self) -> VortexResult<u64> {
185+
Ok(match self.size {
186186
Some(s) => s,
187-
None => self.read_at.size().await,
188-
}
187+
None => self.read_at.size().await?,
188+
})
189189
}
190190
}

vortex-io/src/compio.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use compio::buf::{IoBuf, IoBufMut, SetBufInit};
66
use compio::fs::File;
77
use compio::io::AsyncReadAtExt;
88
use compio::BufResult;
9-
use vortex_error::vortex_panic;
109

1110
use crate::aligned::{AlignedBytesMut, PowerOfTwo};
1211
use crate::{VortexReadAt, ALIGNMENT};
@@ -67,14 +66,9 @@ impl VortexReadAt for File {
6766
}
6867
}
6968

70-
fn size(&self) -> impl Future<Output = u64> + 'static {
69+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
7170
let this = self.clone();
72-
async move {
73-
this.metadata()
74-
.await
75-
.map(|metadata| metadata.len())
76-
.unwrap_or_else(|e| vortex_panic!("compio File::size: {e}"))
77-
}
71+
async move { this.metadata().await.map(|metadata| metadata.len()) }
7872
}
7973
}
8074

vortex-io/src/object_store.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use object_store::path::Path;
99
use object_store::{GetOptions, GetRange, ObjectStore, WriteMultipart};
1010
use vortex_buffer::io_buf::IoBuf;
1111
use vortex_buffer::Buffer;
12-
use vortex_error::{vortex_panic, VortexError, VortexResult};
12+
use vortex_error::VortexResult;
1313

1414
use crate::aligned::AlignedBytesMut;
1515
use crate::{VortexBufReader, VortexReadAt, VortexWrite, ALIGNMENT};
@@ -103,19 +103,16 @@ impl VortexReadAt for ObjectStoreReadAt {
103103
}
104104

105105
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
106-
fn size(&self) -> impl Future<Output = u64> + 'static {
106+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
107107
let object_store = self.object_store.clone();
108108
let location = self.location.clone();
109109

110110
Box::pin(async move {
111111
object_store
112112
.head(&location)
113113
.await
114-
.map_err(VortexError::ObjectStore)
115-
.unwrap_or_else(|err| {
116-
vortex_panic!(err, "Failed to get size of object at location {}", location)
117-
})
118-
.size as u64
114+
.map(|obj| obj.size as u64)
115+
.map_err(io::Error::other)
119116
})
120117
}
121118
}

vortex-io/src/offset.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::future::Future;
2+
use std::io;
23

34
use bytes::Bytes;
45
use futures::FutureExt;
@@ -34,16 +35,16 @@ impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
3435
&self,
3536
pos: u64,
3637
len: u64,
37-
) -> impl Future<Output = std::io::Result<Bytes>> + 'static {
38+
) -> impl Future<Output = io::Result<Bytes>> + 'static {
3839
self.read.read_byte_range(pos + self.offset, len)
3940
}
4041

4142
fn performance_hint(&self) -> usize {
4243
self.read.performance_hint()
4344
}
4445

45-
fn size(&self) -> impl Future<Output = u64> + 'static {
46+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
4647
let offset = self.offset;
47-
self.read.size().map(move |len| len - offset)
48+
self.read.size().map(move |len| len.map(|len| len - offset))
4849
}
4950
}

vortex-io/src/read.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub trait VortexReadAt: Send + Sync + Clone + 'static {
3838
///
3939
/// For a file it will be the size in bytes, for an object in an
4040
/// `ObjectStore` it will be the `ObjectMeta::size`.
41-
fn size(&self) -> impl Future<Output = u64> + 'static;
41+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static;
4242
}
4343

4444
impl<T: VortexReadAt> VortexReadAt for Arc<T> {
@@ -54,7 +54,7 @@ impl<T: VortexReadAt> VortexReadAt for Arc<T> {
5454
T::performance_hint(self)
5555
}
5656

57-
fn size(&self) -> impl Future<Output = u64> + 'static {
57+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
5858
T::size(self)
5959
}
6060
}
@@ -80,8 +80,8 @@ impl VortexReadAt for Buffer {
8080
}
8181
}
8282

83-
fn size(&self) -> impl Future<Output = u64> + 'static {
84-
future::ready(self.len() as u64)
83+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
84+
future::ready(Ok(self.len() as u64))
8585
}
8686
}
8787

@@ -102,8 +102,8 @@ impl VortexReadAt for Bytes {
102102
}
103103
}
104104

105-
fn size(&self) -> impl Future<Output = u64> + 'static {
106-
let len = self.len() as u64;
105+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
106+
let len = Ok(self.len() as u64);
107107
future::ready(len)
108108
}
109109
}

vortex-io/src/tokio.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::sync::Arc;
99
use bytes::Bytes;
1010
use tokio::io::{AsyncWrite, AsyncWriteExt};
1111
use vortex_buffer::io_buf::IoBuf;
12-
use vortex_error::vortex_panic;
1312

1413
use crate::aligned::AlignedBytesMut;
1514
use crate::{VortexReadAt, VortexWrite, ALIGNMENT};
@@ -81,19 +80,10 @@ impl VortexReadAt for TokioFile {
8180
}
8281

8382
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
84-
fn size(&self) -> impl Future<Output = u64> + 'static {
83+
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
8584
let this = self.clone();
8685

87-
async move {
88-
let res = tokio::task::spawn_blocking(move || {
89-
this.metadata()
90-
.unwrap_or_else(|e| vortex_panic!("access TokioFile metadata: {e}"))
91-
.len()
92-
})
93-
.await;
94-
95-
res.unwrap_or_else(|e| vortex_panic!("Joining spawn_blocking: size: {e}"))
96-
}
86+
async move { this.metadata().map(|metadata| metadata.len()) }
9787
}
9888
}
9989

0 commit comments

Comments
 (0)