Skip to content

Commit d135f4a

Browse files
authored
Make footer and segment cache sizes configurable in Datafusion (#4491)
Aside from allowing configuring these two values now, this is a step towards a more flexible integration with DataFusion. Signed-off-by: Adam Gutglick <[email protected]>
1 parent 90be23e commit d135f4a

File tree

4 files changed

+84
-39
lines changed

4 files changed

+84
-39
lines changed

bench-vortex/src/engines/df/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub fn get_session_context(disable_datafusion_cache: bool) -> SessionContext {
6666
.build_arc()
6767
.expect("could not build runtime environment");
6868

69-
let factory = VortexFormatFactory::default();
69+
let factory = VortexFormatFactory::new();
7070

7171
let mut session_state_builder = SessionStateBuilder::new()
7272
.with_config(SessionConfig::default())

vortex-datafusion/src/persistent/format.rs

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ use std::sync::Arc;
88
use arrow_schema::{Schema, SchemaRef};
99
use async_trait::async_trait;
1010
use datafusion_catalog::Session;
11+
use datafusion_common::config::ConfigField;
1112
use datafusion_common::parsers::CompressionTypeVariant;
1213
use datafusion_common::stats::Precision;
1314
use datafusion_common::{
14-
ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics,
15-
config_datafusion_err, not_impl_err,
15+
ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics, config_namespace,
16+
not_impl_err,
1617
};
1718
use datafusion_common_runtime::SpawnedTask;
1819
use datafusion_datasource::file::FileSource;
@@ -48,7 +49,7 @@ use crate::convert::TryToDataFusion;
4849
pub struct VortexFormat {
4950
session: Arc<VortexSession>,
5051
file_cache: VortexFileCache,
51-
opts: VortexFormatOptions,
52+
opts: VortexOptions,
5253
}
5354

5455
impl Debug for VortexFormat {
@@ -59,28 +60,27 @@ impl Debug for VortexFormat {
5960
}
6061
}
6162

62-
/// Options to configure the [`VortexFormat`].
63-
#[derive(Debug)]
64-
pub struct VortexFormatOptions {
65-
/// The size of the in-memory [`vortex::file::Footer`] cache.
66-
pub footer_cache_size_mb: usize,
67-
/// The size of the in-memory segment cache.
68-
pub segment_cache_size_mb: usize,
69-
}
70-
71-
impl Default for VortexFormatOptions {
72-
fn default() -> Self {
73-
Self {
74-
footer_cache_size_mb: 64,
75-
segment_cache_size_mb: 0,
76-
}
63+
config_namespace! {
64+
/// Options to configure the [`VortexFormat`].
65+
///
66+
/// Can be set through a DataFusion [`SessionConfig`].
67+
///
68+
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
69+
pub struct VortexOptions {
70+
/// The size of the in-memory [`vortex::file::Footer`] cache.
71+
pub footer_cache_size_mb: usize, default = 64
72+
/// The size of the in-memory segment cache.
73+
pub segment_cache_size_mb: usize, default = 0
7774
}
7875
}
7976

77+
impl Eq for VortexOptions {}
78+
8079
/// Minimal factory to create [`VortexFormat`] instances.
81-
#[derive(Default, Debug)]
80+
#[derive(Debug)]
8281
pub struct VortexFormatFactory {
8382
session: Arc<VortexSession>,
83+
options: Option<VortexOptions>,
8484
}
8585

8686
impl GetExt for VortexFormatFactory {
@@ -89,20 +89,60 @@ impl GetExt for VortexFormatFactory {
8989
}
9090
}
9191

92+
impl VortexFormatFactory {
93+
/// Creates a new instance with a default [`VortexSession`] and default options.
94+
#[allow(clippy::new_without_default)] // FormatFactory defines `default` method, so having `Default` implementation is confusing.
95+
pub fn new() -> Self {
96+
Self {
97+
session: Arc::new(VortexSession::default()),
98+
options: None,
99+
}
100+
}
101+
102+
/// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
103+
///
104+
/// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
105+
pub fn new_with_options(session: Arc<VortexSession>, options: VortexOptions) -> Self {
106+
Self {
107+
session,
108+
options: Some(options),
109+
}
110+
}
111+
112+
/// Override the default options for this factory.
113+
///
114+
/// For example:
115+
/// ```rust
116+
/// use vortex_datafusion::{VortexFormatFactory, VortexOptions};
117+
///
118+
/// let factory = VortexFormatFactory::new().with_options(VortexOptions::default());
119+
/// ```
120+
pub fn with_options(mut self, options: VortexOptions) -> Self {
121+
self.options = Some(options);
122+
self
123+
}
124+
}
125+
92126
impl FileFormatFactory for VortexFormatFactory {
93127
#[allow(clippy::disallowed_types)]
94128
fn create(
95129
&self,
96130
_state: &dyn Session,
97131
format_options: &std::collections::HashMap<String, String>,
98132
) -> DFResult<Arc<dyn FileFormat>> {
99-
if !format_options.is_empty() {
100-
return Err(config_datafusion_err!(
101-
"Vortex tables don't support any options"
102-
));
133+
let mut opts = self.options.clone().unwrap_or_default();
134+
for (key, value) in format_options {
135+
if let Some(key) = key.strip_prefix("format.") {
136+
opts.set(key, value)?;
137+
} else {
138+
tracing::trace!("Ignoring options '{key}'");
139+
}
103140
}
104141

105-
Ok(Arc::new(VortexFormat::new(self.session.clone())))
142+
Ok(Arc::new(VortexFormat::new_with_options(
143+
self.session.clone(),
144+
opts,
145+
)))
106146
}
107147

108148
fn default(&self) -> Arc<dyn FileFormat> {
@@ -121,9 +161,13 @@ impl Default for VortexFormat {
121161
}
122162

123163
impl VortexFormat {
124-
/// Create a new instance of the [`VortexFormat`].
164+
/// Create a new instance with default options.
125165
pub fn new(session: Arc<VortexSession>) -> Self {
126-
let opts = VortexFormatOptions::default();
166+
Self::new_with_options(session, VortexOptions::default())
167+
}
168+
169+
/// Creates a new instance with configured by a [`VortexOptions`].
170+
pub fn new_with_options(session: Arc<VortexSession>, opts: VortexOptions) -> Self {
127171
Self {
128172
session: session.clone(),
129173
file_cache: VortexFileCache::new(
@@ -136,7 +180,7 @@ impl VortexFormat {
136180
}
137181

138182
/// Return the format specific configuration
139-
pub fn options(&self) -> &VortexFormatOptions {
183+
pub fn options(&self) -> &VortexOptions {
140184
&self.opts
141185
}
142186
}
@@ -389,7 +433,7 @@ mod tests {
389433
async fn create_table() {
390434
let dir = TempDir::new().unwrap();
391435

392-
let factory: VortexFormatFactory = Default::default();
436+
let factory: VortexFormatFactory = VortexFormatFactory::new();
393437
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
394438
register_vortex_format_factory(factory, &mut session_state_builder);
395439
let session = SessionContext::new_with_state(session_state_builder.build());
@@ -408,11 +452,10 @@ mod tests {
408452
}
409453

410454
#[tokio::test]
411-
#[should_panic]
412-
async fn fail_table_config() {
455+
async fn configure_format_source() {
413456
let dir = TempDir::new().unwrap();
414457

415-
let factory: VortexFormatFactory = Default::default();
458+
let factory = VortexFormatFactory::new();
416459
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
417460
register_vortex_format_factory(factory, &mut session_state_builder);
418461
let session = SessionContext::new_with_state(session_state_builder.build());
@@ -422,7 +465,7 @@ mod tests {
422465
"CREATE EXTERNAL TABLE my_tbl \
423466
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
424467
STORED AS vortex LOCATION '{}' \
425-
OPTIONS( some_key 'value' );",
468+
OPTIONS( segment_cache_size_mb '5' );",
426469
dir.path().to_str().unwrap()
427470
))
428471
.await

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod opener;
99
mod sink;
1010
mod source;
1111

12-
pub use format::{VortexFormat, VortexFormatFactory, VortexFormatOptions};
12+
pub use format::{VortexFormat, VortexFormatFactory, VortexOptions};
1313
pub use source::VortexSource;
1414

1515
#[cfg(test)]

vortex-datafusion/src/persistent/sink.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ mod tests {
172172
async fn test_insert_into() {
173173
let dir = TempDir::new().unwrap();
174174

175-
let factory = VortexFormatFactory::default();
175+
let factory = VortexFormatFactory::new();
176+
176177
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
177178
register_vortex_format_factory(factory, &mut session_state_builder);
178179
let session = SessionContext::new_with_state(session_state_builder.build());
@@ -181,7 +182,7 @@ mod tests {
181182
.sql(&format!(
182183
"CREATE EXTERNAL TABLE my_tbl \
183184
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
184-
STORED AS vortex
185+
STORED AS vortex \
185186
LOCATION '{}/';",
186187
dir.path().to_str().unwrap()
187188
))
@@ -255,7 +256,8 @@ mod tests {
255256

256257
let dir = TempDir::new()?;
257258

258-
let factory = VortexFormatFactory::default();
259+
let factory = VortexFormatFactory::new();
260+
259261
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
260262
register_vortex_format_factory(factory, &mut session_state_builder);
261263
let session = SessionContext::new_with_state(session_state_builder.build());
@@ -268,7 +270,7 @@ mod tests {
268270
let logical_plan = LogicalPlanBuilder::copy_to(
269271
data.logical_plan().clone(),
270272
dir.path().to_str().unwrap().to_string(),
271-
format_as_file_type(Arc::new(VortexFormatFactory::default())),
273+
format_as_file_type(Arc::new(VortexFormatFactory::new())),
272274
Default::default(),
273275
vec![],
274276
)?
@@ -285,7 +287,7 @@ mod tests {
285287
.sql(&format!(
286288
"CREATE EXTERNAL TABLE written_data \
287289
(a TINYINT NOT NULL) \
288-
STORED AS vortex
290+
STORED AS vortex \
289291
LOCATION '{}/';",
290292
dir.path().to_str().unwrap()
291293
))

0 commit comments

Comments
 (0)