Skip to content

Commit 71fbf3e

Browse files
authored
ObjectStoreReadAt to optionally accept scheme for more accurate perf hints (#2343)
1 parent 4964d9f commit 71fbf3e

File tree

8 files changed

+62
-16
lines changed

8 files changed

+62
-16
lines changed

bench-vortex/benches/random_access.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ fn random_access_vortex(c: &mut Criterion) {
4242
b.to_async(Runtime::new().unwrap()).iter_with_setup(
4343
|| (local_fs.clone(), local_fs_path.clone(), indices.clone()),
4444
|(fs, path, indices)| async {
45-
take_vortex_object_store(fs, path, indices).await.unwrap()
45+
take_vortex_object_store(object_store::ObjectStoreScheme::Local, fs, path, indices)
46+
.await
47+
.unwrap()
4648
},
4749
)
4850
});
@@ -70,7 +72,14 @@ fn random_access_vortex(c: &mut Criterion) {
7072
b.to_async(Runtime::new().unwrap()).iter_with_setup(
7173
|| (r2_fs.clone(), r2_path.clone(), indices.clone()),
7274
|(fs, path, indices)| async {
73-
take_vortex_object_store(fs, path, indices).await.unwrap()
75+
take_vortex_object_store(
76+
object_store::ObjectStoreScheme::AmazonS3,
77+
fs,
78+
path,
79+
indices,
80+
)
81+
.await
82+
.unwrap()
7483
},
7584
)
7685
});

bench-vortex/src/reader.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use arrow_select::take::take_record_batch;
1414
use futures::stream;
1515
use itertools::Itertools;
1616
use log::info;
17-
use object_store::ObjectStore;
17+
use object_store::{ObjectStore, ObjectStoreScheme};
1818
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
1919
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
2020
use parquet::arrow::ParquetRecordBatchStreamBuilder;
@@ -112,11 +112,16 @@ async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
112112
}
113113

114114
pub async fn take_vortex_object_store(
115+
scheme: ObjectStoreScheme,
115116
fs: Arc<dyn ObjectStore>,
116117
path: object_store::path::Path,
117118
indices: Buffer<u64>,
118119
) -> VortexResult<Array> {
119-
take_vortex(ObjectStoreReadAt::new(fs.clone(), path), indices).await
120+
take_vortex(
121+
ObjectStoreReadAt::new(fs.clone(), path, Some(scheme)),
122+
indices,
123+
)
124+
.await
120125
}
121126

122127
pub async fn take_vortex_tokio(path: &Path, indices: Buffer<u64>) -> VortexResult<Array> {

pyvortex/src/object_store_urls.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use url::Url;
1111
use vortex::error::{vortex_bail, VortexResult};
1212
use vortex::io::ObjectStoreReadAt;
1313

14-
fn better_parse_url(url_str: &str) -> VortexResult<(Box<dyn ObjectStore>, Path)> {
14+
fn better_parse_url(
15+
url_str: &str,
16+
) -> VortexResult<(ObjectStoreScheme, Box<dyn ObjectStore>, Path)> {
1517
let url = Url::parse(url_str)?;
1618

1719
let (scheme, path) = ObjectStoreScheme::parse(&url).map_err(object_store::Error::from)?;
@@ -38,10 +40,14 @@ fn better_parse_url(url_str: &str) -> VortexResult<(Box<dyn ObjectStore>, Path)>
3840
otherwise => vortex_bail!("unrecognized object store scheme: {:?}", otherwise),
3941
};
4042

41-
Ok((store, path))
43+
Ok((scheme, store, path))
4244
}
4345

4446
pub async fn vortex_read_at_from_url(url: &str) -> VortexResult<ObjectStoreReadAt> {
45-
let (object_store, location) = better_parse_url(url)?;
46-
Ok(ObjectStoreReadAt::new(Arc::from(object_store), location))
47+
let (scheme, object_store, location) = better_parse_url(url)?;
48+
Ok(ObjectStoreReadAt::new(
49+
Arc::from(object_store),
50+
location,
51+
Some(scheme),
52+
))
4753
}

vortex-datafusion/src/persistent/cache.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ impl FileLayoutCache {
8080
) -> VortexResult<FileLayout> {
8181
self.inner
8282
.try_get_with(Key::from(object), async {
83-
let os_read_at = ObjectStoreReadAt::new(object_store, object.location.clone());
83+
let os_read_at =
84+
ObjectStoreReadAt::new(object_store, object.location.clone(), None);
8485
let vxf = VortexOpenOptions::file(os_read_at)
8586
.with_ctx(self.context.clone())
8687
.with_file_size(object.size as u64)

vortex-datafusion/src/persistent/execution.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
1212
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
1313
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1414
use itertools::Itertools;
15+
use object_store::ObjectStoreScheme;
1516
use vortex_array::ContextRef;
1617
use vortex_expr::datafusion::convert_expr_to_vortex;
1718
use vortex_expr::{and, VortexExpr};
@@ -124,9 +125,12 @@ impl ExecutionPlan for VortexExec {
124125
let object_store = context
125126
.runtime_env()
126127
.object_store(&self.file_scan_config.object_store_url)?;
128+
let (scheme, _) = ObjectStoreScheme::parse(self.file_scan_config.object_store_url.as_ref())
129+
.map_err(object_store::Error::from)?;
127130

128131
let opener = VortexFileOpener::new(
129132
self.ctx.clone(),
133+
scheme,
130134
object_store,
131135
self.projection.clone(),
132136
self.predicate.clone(),

vortex-datafusion/src/persistent/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl FileFormat for VortexFormat {
192192
table_schema: SchemaRef,
193193
object: &ObjectMeta,
194194
) -> DFResult<Statistics> {
195-
let read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
195+
let read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone(), None);
196196
let file_layout = self
197197
.file_layout_cache
198198
.try_get(object, store.clone())

vortex-datafusion/src/persistent/opener.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use arrow_schema::SchemaRef;
44
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
55
use datafusion_common::Result as DFResult;
66
use futures::{FutureExt as _, StreamExt};
7-
use object_store::ObjectStore;
7+
use object_store::{ObjectStore, ObjectStoreScheme};
88
use vortex_array::{ContextRef, IntoArrayVariant};
99
use vortex_error::VortexResult;
1010
use vortex_expr::{ExprRef, VortexExpr};
@@ -16,6 +16,7 @@ use super::cache::FileLayoutCache;
1616
#[derive(Clone)]
1717
pub(crate) struct VortexFileOpener {
1818
pub ctx: ContextRef,
19+
pub scheme: ObjectStoreScheme,
1920
pub object_store: Arc<dyn ObjectStore>,
2021
pub projection: ExprRef,
2122
pub filter: Option<ExprRef>,
@@ -25,8 +26,10 @@ pub(crate) struct VortexFileOpener {
2526
}
2627

2728
impl VortexFileOpener {
29+
#[allow(clippy::too_many_arguments)]
2830
pub fn new(
2931
ctx: ContextRef,
32+
scheme: ObjectStoreScheme,
3033
object_store: Arc<dyn ObjectStore>,
3134
projection: Arc<dyn VortexExpr>,
3235
filter: Option<Arc<dyn VortexExpr>>,
@@ -36,6 +39,7 @@ impl VortexFileOpener {
3639
) -> VortexResult<Self> {
3740
Ok(Self {
3841
ctx,
42+
scheme,
3943
object_store,
4044
projection,
4145
filter,
@@ -48,8 +52,11 @@ impl VortexFileOpener {
4852

4953
impl FileOpener for VortexFileOpener {
5054
fn open(&self, file_meta: FileMeta) -> DFResult<FileOpenFuture> {
51-
let read_at =
52-
ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone());
55+
let read_at = ObjectStoreReadAt::new(
56+
self.object_store.clone(),
57+
file_meta.location().clone(),
58+
Some(self.scheme.clone()),
59+
);
5360

5461
let filter = self.filter.clone();
5562
let projection = self.projection.clone();

vortex-io/src/object_store.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,31 @@ use bytes::Bytes;
77
use futures_util::StreamExt;
88
use object_store::path::Path;
99
use object_store::{
10-
GetOptions, GetRange, GetResultPayload, MultipartUpload, ObjectStore, PutPayload,
10+
GetOptions, GetRange, GetResultPayload, MultipartUpload, ObjectStore, ObjectStoreScheme,
11+
PutPayload,
1112
};
1213
use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
1314
use vortex_error::{VortexExpect, VortexResult};
1415

15-
use crate::{IoBuf, VortexReadAt, VortexWrite};
16+
use crate::{IoBuf, PerformanceHint, VortexReadAt, VortexWrite};
1617

1718
#[derive(Clone)]
1819
pub struct ObjectStoreReadAt {
1920
object_store: Arc<dyn ObjectStore>,
2021
location: Path,
22+
scheme: Option<ObjectStoreScheme>,
2123
}
2224

2325
impl ObjectStoreReadAt {
24-
pub fn new(object_store: Arc<dyn ObjectStore>, location: Path) -> Self {
26+
pub fn new(
27+
object_store: Arc<dyn ObjectStore>,
28+
location: Path,
29+
scheme: Option<ObjectStoreScheme>,
30+
) -> Self {
2531
Self {
2632
object_store,
2733
location,
34+
scheme,
2835
}
2936
}
3037
}
@@ -84,6 +91,13 @@ impl VortexReadAt for ObjectStoreReadAt {
8491
let location = self.location.clone();
8592
Ok(object_store.head(&location).await?.size as u64)
8693
}
94+
95+
fn performance_hint(&self) -> PerformanceHint {
96+
match &self.scheme {
97+
Some(ObjectStoreScheme::Local | ObjectStoreScheme::Memory) => PerformanceHint::local(),
98+
_ => PerformanceHint::default(),
99+
}
100+
}
87101
}
88102

89103
pub struct ObjectStoreWriter {

0 commit comments

Comments
 (0)